Beam on KDA

While running Apache Beam applications on top of Kinesis Data Analytics is no different from running Beam applications in any Apache Flink environment, there are a few important aspects that developers need to keep in mind.

Passing parameters into the pipeline

Properties for your Kinesis Data Analytics application can be configured via the AWS console (or the CLI, CFN, Terraform, etc…):

Overview BEAM Architecture

And these properties can be accessed from your code by extending FlinkPipelineOptions as so:

public interface TaxiCountOptions extends FlinkPipelineOptions, AwsOptions {
  Logger LOG = LoggerFactory.getLogger(TaxiCountOptions.class);

  @Description("Name of the Kinesis Data Stream to read from")
  String getInputStreamName();

  void setInputStreamName(String value);

  @Description("S3 bucket name and prefix that contains the historic data")
  String getInputS3Pattern();

  void setInputS3Pattern(String value);

  String getSource();

  void setSource(String value);

  boolean getOutputBoroughs();

  void setOutputBoroughs(boolean value);

  static String[] argsFromKinesisApplicationProperties(String[] args, String applicationPropertiesName) {
    Properties beamProperties = null;

    try {
      Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

You’ll notice the KinesisAnalyticsRuntime class above; in order to access this class in your code, be sure to add the following dependency in your pom.xml:


Dependency Shading

When running your Beam applications on Kinesis Data Analytics, it’s important to shade your dependencies to prevent dependency conflicts. Here’s a snippet involving jackson from the taxi consumer sample:


Configuring Credentials

You can configure your Beam IO connectors to pull credentials from the role that has been configured for the Kinesis Data Analytics application, instead of hard coding credentials in your code. Here’s a snippet illustrating how to configure the KinesisIO connector:

        input = p
            .apply("Kinesis source", KinesisIO
                .withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(Regions.fromName(options.getAwsRegion())))
            .apply("Parse Kinesis events", ParDo.of(new EventParser.KinesisParser()));