Amazon EMR

You can run Transformer pipelines using Spark deployed on an Amazon EMR cluster. Transformer supports several EMR versions. For a complete list, see Cluster Compatibility Matrix.

To run a pipeline on an EMR cluster, in the pipeline properties, you configure the pipeline to use EMR as the cluster manager type, then configure the cluster properties.

Important: The EMR cluster must be able to access Transformer to send the status, metrics, and offsets for running pipelines. Grant the cluster access to the Transformer URL, as described in Granting the Spark Cluster Access to Transformer.

When you configure a pipeline to run on an EMR cluster, you can specify an existing Spark cluster to use or you can have Transformer provision a cluster to run the pipeline. When provisioning a cluster, you can optionally define bootstrap actions, enable logging, make the cluster visible to all users, and have Transformer terminate the cluster after the pipeline stops.

Provisioning a cluster that terminates after the pipeline stops is a cost-effective method of running a Transformer pipeline. Running multiple pipelines on a single existing cluster can also reduce costs.

You define an S3 staging URI and a staging directory within the cluster to store the Transformer libraries and resources needed to run the pipeline.

You can configure the pipeline to use an instance profile or AWS access keys to authenticate with the EMR cluster. When you start the pipeline, Transformer uses the specified instance profile or AWS access key to launch the Spark application. You can also configure server-side encryption for data stored in the staging directory.

You can configure the maximum number of retry attempts for failed requests and related properties. You can also use a connection to configure the pipeline.

Note: Pipelines that run on Kerberos-enabled EMR clusters cannot include stages for the Hadoop ecosystem.

The following image shows part of the Cluster tab of a pipeline configured to run on an EMR cluster:

Transformer Installation Location

When you use EMR as a cluster manager, Transformer must be installed in a location that allows submitting Spark jobs to the cluster.

StreamSets recommends installing Transformer on an Amazon EC2 instance.

Authentication Method

You can specify the authentication method that Transformer uses to connect to an EMR cluster. When you start the pipeline, Transformer uses the specified instance profile or AWS access key to launch the Spark application.

Transformer can authenticate in the following ways:
Instance profile
When Transformer runs on an Amazon EC2 instance that has an associated instance profile, Transformer uses the instance profile credentials to automatically authenticate with AWS.
For more information about associating an instance profile with an EC2 instance, see the Amazon EC2 documentation.
AWS access keys
When Transformer does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you can authenticate using an AWS access key pair. When using an AWS access key pair, you specify the access key ID and secret access key to use.
Tip: To secure sensitive information, you can use credential stores or runtime resources.
None
You can optionally choose to connect anonymously using no authentication.

You can also temporarily assume a specified role to connect to the Amazon EMR cluster. For more information, see Assume Another Role.

Server-Side Encryption

You can configure an EMR cluster to use Amazon Web Services server-side encryption (SSE). When configured for server-side encryption, Transformer passes required server-side encryption configuration values to Amazon S3. Amazon S3 uses the values to encrypt files loaded to the Amazon S3 staging directory.

When you enable server-side encryption for the cluster, you select one of the following ways that Amazon S3 manages the encryption keys:
Amazon S3-Managed Encryption Keys (SSE-S3)
When you use server-side encryption with Amazon S3-managed keys, Amazon S3 manages the encryption keys for you.
AWS KMS-Managed Encryption Keys (SSE-KMS)
When you use server-side encryption with AWS Key Management Service (KMS), you specify the Amazon resource name (ARN) of the AWS KMS master encryption key that you want to use.

For more information about using server-side encryption to protect data in Amazon S3, see the Amazon S3 documentation.

KMS Requirements

When you use server-side encryption with AWS Key Management Service (KMS), you must ensure that the appropriate permissions are granted.

The nodes that run in the EMR cluster must have the following general permission:
  • kms:ListKeys

The nodes must also have the following permission for the KMS master encryption key that you use:

  • kms:GetPublicKey
  • kms:Decrypt
  • kms:DescribeKey
  • kms:Encrypt

Kerberos Stage Limitations

Transformer can communicate securely with an EMR cluster that uses Kerberos authentication by default.

However, pipelines that run on a Kerberos-enabled EMR cluster cannot include stages in the Hadoop ecosystem. This includes the following stages:
  • File stages
  • Hive stages
  • Kafka stages
  • Kudu stages

Force Stopping an EMR Job

When necessary, you can force jobs to stop. When forcing a job to stop, Transformer often stops processes before they complete, which can lead to unexpected results.

Important: Jobs can take a long time to stop gracefully, depending on the processing logic. Use this option only after waiting an appropriate amount of time for the job to come to a graceful stop.

When you force stop an EMR job, Transformer first tries to stop the Spark job through the YARN service in the cluster. If the YARN service is reachable, it stops the job. If the YARN service is not reachable, Transformer sends a new step to the EMR cluster with the stop command.

As a result, if the YARN service is not reachable, Transformer can only force stop an EMR job when all of the following are true:

  • The job runs on EMR 5.28 or later with support for step concurrency.

  • The Step Concurrency property in the pipeline is set to 2 or higher.

  • A step becomes available.

If the YARN service is not reachable and all three requirements above are not met, then Transformer cannot force stop the job.

Existing Cluster

You can configure a pipeline to run on an existing EMR cluster.

To run a pipeline on an existing EMR cluster, on the Cluster tab, clear the Provision a New Cluster property, then specify the cluster to use. You can specify a cluster by cluster ID or by cluster name and optional tags.

When an EMR cluster runs a Transformer pipeline, Transformer libraries are stored on the S3 staging URI and directory so they can be reused.

Tip: When feasible, running multiple pipelines on a single existing cluster can be a cost-reducing measure.

For best practices for configuring a cluster, see the Amazon EMR documentation.

Specifying a Cluster

When you configure pipeline properties for an existing EMR cluster, you specify the cluster to use by cluster ID, by default. If you prefer, you can specify the cluster using the cluster name and optional tags.

The cluster name property is case-sensitive. If your EMR account has access to more than one cluster by the same name, define tags to differentiate between the clusters.

Transformer searches for clusters that match the specified name and tags and that are not in Terminal state. If it finds more than one matching cluster, Transformer randomly selects one to use.
Tip: If you have multiple clusters of the same name with the same tags, to ensure that Transformer runs a pipeline on a particular cluster, specify the cluster by cluster ID.

To specify the cluster by ID, on the Cluster tab, enter the cluster ID in the Cluster ID property.

To specify the cluster by name and tags, on the Cluster tab, enable the Cluster by Name and Tags property. Then, specify a case-sensitive cluster name in the Cluster Name property.

To add optional tags, in the Cluster Tags property, click Add, then define a tag name and value for each tag that you want to define.

Provisioned Cluster

You can configure a pipeline to run on a provisioned cluster. When provisioning a cluster, Transformer creates a new EMR Spark cluster upon the initial run of a pipeline. You can optionally have Transformer terminate the cluster after the pipeline stops.

Tip: Provisioning a cluster that terminates after the pipeline stops is a cost-effective method of running a Transformer pipeline.

To provision a cluster for the pipeline, select the Provision a New Cluster property on the Cluster tab of the pipeline properties. Then, define the cluster configuration properties.

When provisioning a cluster, you specify cluster details such as the EMR version, the instance types to create, and the ID of the subnet to create the cluster in. You can define bootstrap actions to execute before processing data. You also indicate whether to terminate the cluster after the pipeline stops.

You can define the number of EC2 instances that the cluster uses to process data. The minimum is 2. To improve performance, you might increase that number based on the number of partitions that the pipeline uses. You can also configure the pipeline to save log data to a different location to avoid losing that data when the cluster terminates.

Optionally, you can specify an SSH key to access the cluster nodes for monitoring or troubleshooting. Make sure that the security group assigned to the EMR cluster allows access to the EC2 nodes. Or, you can use a bootstrap action to set up SSH on the provisioned cluster instead of specifying the SSH key.

For a full list of provisioning properties, see Configuring a Pipeline.

For best practices for configuring a cluster, see the Amazon EMR documentation.

Bootstrap Actions

When you provision an EMR cluster, you can define bootstrap actions to execute before processing data. You might use bootstrap actions to perform tasks such as installing a driver on the cluster or creating directories and setting permissions on them.

To use bootstrap actions, you can define the scripts to use in the cluster configuration properties or you can specify bootstrap actions scripts stored on Amazon S3.

When you specify more than one bootstrap action, place them in the order that you want them to run. If a bootstrap action fails to execute, Transformer cancels the cluster provisioning and stops the job.

For more information about EMR bootstrap actions, see the EMR documentation.

Example

You can use a bootstrap action to set up SSH on the provisioned cluster.

First, you generate a public key. Then, you use the key in the following script, which you enter in the Bootstrap Action Scripts cluster configuration property:
#!/bin/bash

cat > /tmp/authorized_keys << EOF
<PUBLIC_SSH_KEY>
EOF
sudo sh -c 'mv /tmp/authorized_keys /home/ec2-user/.ssh/authorized_keys'
sudo sh -c 'chown ec2-user:ec2-user /home/ec2-user/.ssh/authorized_keys'
sudo sh -c 'chmod 600 /home/ec2-user/.ssh/authorized_keys'

You could alternatively save the script on S3 and specify the script location in the Bootstrap Actions cluster configuration property.

S3 Staging URI and Directory

To run pipelines on an EMR cluster, Transformer must store files on Amazon S3.

Transformer stores libraries in the following location: <S3 staging URI>/<staging directory>.

The location must exist before you start the pipeline. You define the location using the following pipeline configuration properties on the Cluster tab:
  • Staging Directory
  • S3 Staging URI

When a pipeline runs on an existing cluster, you might configure pipelines to use the same S3 staging URI and staging directory. This allows EMR to reuse the common files stored in that location. Pipelines that run on different clusters can also use the same staging locations as long as the pipelines are started by the same Transformer instance. Pipelines started by different Transformer instances must use different staging locations.

When a pipeline runs on a provisioned cluster, using the same location is best practice, but not required.

Note: If you have multiple instances of Transformer that are configured to use different library versions, you must specify a different S3 staging URI or staging directory to avoid using the same staging location. For example, suppose you have two 5.6.0 instances of Transformer, each using a different Oracle JDBC driver. To allow each Transformer instance to use its own driver version, specify different staging locations for those pipelines.
Transformer stores the following files in the specified location:
Files that can be reused across pipelines
Transformer stores files that can be reused across pipelines, including Transformer libraries and external resources such as JDBC drivers, in the following location:
<S3 staging URI>/<staging_directory>/<Transformer version>
For example, say you configure s3://mybucket as the S3 staging URI with the default /streamsets staging directory for a Transformer 5.6.0 pipeline. Then, Transformer stores the reusable files in the following location:

s3://mybucket/streamsets/5.6.0

Files specific to each pipeline
Transformer stores files specific to each pipeline, such as the pipeline JSON file and resource files used by the pipeline, in the following directory:
<S3 staging URI>/<staging_directory>/staging/<pipelineId>/<runId>
For example, say you configure s3://mybucket as the S3 staging URI with the default /streamsets staging directory to run a pipeline named KafkaToJDBC. Transformer stores pipeline-specific files in a location like the following:

s3://mybucket/streamsets/staging/KafkaToJDBC03a0d2cc-f622-4a68-b161-7f2d9a4f3052/run1557350076328

Monitoring EMR Pipelines

When Transformer cannot access monitoring information for an EMR cluster, Transformer tries to retrieve that information again based on the following Transformer configuration properties:
  • transformer.emr.monitoring.max.retry - Defines how many times Transformer retries retrieving monitoring information. Default is 3.
  • transformer.emr.monitoring.retry.base.backoff - Defines the minimum number of seconds to wait between retries. Default is 15.
  • transformer.emr.monitoring.retry.max.backoff - Defines the maximum number of seconds to wait between retries. Default is 300, which is 5 minutes.

If Transformer cannot access monitoring information within the specified retries, it assumes that the Spark job for the pipeline is not running and stops the pipeline.

When needed, you can configure these properties in the Transformer configuration properties of the deployment.

Note: These properties also define how Transformer tries to retrieve monitoring information for EMR Serverless pipelines.