Add Elasticsearch sink

Finally, we want to send the derived insights to Elasticsearch and Kibana for visualization. To this end, we use an Elasticsearch sink that has been extended to sign the requests with IAM credentials so that they are accepted by Amazon Elasticsearch Service.

129if (parameter.has("ElasticsearchEndpoint")) {
130  String elasticsearchEndpoint = parameter.get("ElasticsearchEndpoint");
131  final String region = parameter.get("Region", DEFAULT_REGION_NAME);
132
133  //remove trailling /
134  if (elasticsearchEndpoint.endsWith(("/"))) {
135    elasticsearchEndpoint = elasticsearchEndpoint.substring(0, elasticsearchEndpoint.length()-1);
136  }
137
138  pickupCounts.addSink(AmazonElasticsearchSink.buildElasticsearchSink(
139      elasticsearchEndpoint, region, "pickup_count", "pickup_count"));
140
141  tripDurations.addSink(AmazonElasticsearchSink.buildElasticsearchSink(
142      elasticsearchEndpoint, region, "trip_duration", "trip_duration"));
143}