To localize places in New York that are currently requesting a high number of taxi trips and to generate average trip durations to the airports, we use time windows. Every time window will span an hour according to event time and we can then consider all events within that one hour window to generate the statistics we are interested in for this particular hour.
To identify popular pickup locations, we’ll generate a geo hash with a reduced precision. Therefore, events in a similar geographic location will have the same geo hash. We then group together all events with the same geo hash with
keyBy and apply a time window of one hour (remember that this is according to event time, not actual processing time) to the resulting stream. In this way, all events with the same geo hash will be collected in the same window and we can easily determine the overall count for that time window. Finally, we actually count the number of events and emit a new
PickupCount event that contains the resulting count for each time window and geo hash.
112DataStream<PickupCount> pickupCounts = trips 113 .map(new TripToGeoHash()) 114 .keyBy("geoHash") 115 .timeWindow(Time.hours(1)) 116 .apply(new CountByGeoHash()); 117 118pickupCounts.print();
To compute the average trip durations, we’ll transform the incoming trip events and only emit
TripDuration events for trips that arrive at one of the airports. We then
keyBy the pickup location and the destination, so that all trips that started at the same location and ended at the same airport are grouped together. Subsequently, we apply a time window of one hour and aggregate the duration of all
TripDuration events in that time window that arrive to the same airport into an
122DataStream<AverageTripDuration> tripDurations = trips 123 .flatMap(new TripToTripDuration()) 124 .keyBy("pickupGeoHash", "airportCode") 125 .timeWindow(Time.hours(1)) 126 .apply(new TripDurationToAverageTripDuration());