Add Elasticsearch sink

Finally, we want to send the derived insights to Elasticsearch and Kibana for visualization. To this end, we use an Elasticsearch sink that has been extended to sign the requests with IAM credentials so that they are accepted by Amazon Elasticsearch Service.

if (parameter.has("ElasticsearchEndpoint")) {
  String elasticsearchEndpoint = parameter.get("ElasticsearchEndpoint");
  final String region = parameter.get("Region", DEFAULT_REGION_NAME);

  //remove trailling /
  if (elasticsearchEndpoint.endsWith(("/"))) {
    elasticsearchEndpoint = elasticsearchEndpoint.substring(0, elasticsearchEndpoint.length()-1);
  }

  pickupCounts.addSink(AmazonElasticsearchSink.buildElasticsearchSink(
      elasticsearchEndpoint, region, "pickup_count", "_doc"));

  tripDurations.addSink(AmazonElasticsearchSink.buildElasticsearchSink(
      elasticsearchEndpoint, region, "trip_duration", "_doc"));
}