Amazon S3

The Amazon S3 origin reads objects stored in Amazon Simple Storage Service, also known as Amazon S3. The objects must be fully written, include data of the same supported format, and use the same schema.

When reading multiple objects in a batch, the origin reads the oldest object first. Upon successfully reading an object, the origin can delete the object, move it to an archive directory, or leave it in the directory.

When the pipeline stops, the origin notes the last-modified timestamp of the last object that it processed and stores it as an offset. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset pipeline offsets to process all available objects.

Before you run a pipeline that uses the Amazon S3 origin, make sure to complete the prerequisite tasks.

When you configure the origin, you specify the authentication method to use. You define the bucket and path to the objects to read. The origin reads objects from the specified directory and its subdirectories. If the origin reads partition objects grouped by field, you must specify the partition base path to include the fields and field values in the data. You also specify the name pattern for the objects to read. You can optionally configure another name pattern to exclude objects from processing and define post-processing actions for successfully read objects.

You can also use a connection to configure the origin.

You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties. You can also configure advanced properties such as performance-related properties and proxy server properties.

Note: The user that Transformer uses to access HDFS depends on the pipeline configuration. For more information, see Hadoop YARN.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets.

Prerequisites

Before reading from Amazon S3 with the Amazon S3 origin, complete the following prerequisite tasks:

Verify permissions
The user associated with the authentication credentials in effect must have the following Amazon S3 permissions:
  • READ permission on the bucket
  • s3:ListBucket permission in an AWS Identity and Access Management (IAM) policy
Use a single schema
All objects processed by the Amazon S3 origin must have the same schema.
When objects have different schemas, the resulting behavior depends on the data format and the version of Spark that you use.

For example, the origin might skip processing delimited data with a different schema, but add null values to Parquet data with a different schema.

Perform prerequisite tasks for local pipelines

To connect to Amazon S3, Transformer uses connection information stored in a Hadoop configuration file. Before you run a local pipeline that connects to Amazon S3, complete the prerequisite tasks.

Authentication Method

You can configure the Amazon S3 origin to authenticate with Amazon Web Services (AWS) using an instance profile or AWS access keys. When accessing a public bucket, you can connect anonymously using no authentication.

For more information about the authentication methods and details on how to configure each method, see Amazon Security.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For an Amazon S3 origin, Spark determines the partitioning based on the data format of the data being read:
Delimited, JSON, text, or XML
When reading text-based data, Spark can split the object into multiple partitions for processing, depending on the underlying file system. Multiline JSON files cannot be split.
Avro, ORC, or Parquet
When reading Avro, ORC, or Parquet data, Spark can split the object into multiple partitions for processing.

Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.

Data Formats

The Amazon S3 origin generates records based on the specified data format.

The origin can read the following data formats:
Avro
The origin generates a record for every Avro record in the object. Each object must contain the Avro schema. The origin uses the Avro schema to generate records.
When you configure the origin, you must specify the Avro option appropriate for the version of Spark to run the pipeline: Spark 2.3 or Spark 2.4 or later.
When using Spark 2.4 or later, you can define an Avro schema to use. The schema must be in JSON format. You can also configure the origin to process all objects in the specified locations. By default, the origin only processes objects with the .avro extension.
Delimited
The origin generates a record for each delimited line in the object. You can specify a custom delimiter, quote, and escape character used in the data.
By default, the origin uses the values in the first row for field names and creates records starting with the second row in the object. The origin infers data types from the data by default.
You can clear the Includes Header property to indicate that objects do not contain a header row. When objects do not include a header row, the origin names the first field _c0, the second field _c1, and so on. The origin also infers data types from the data by default. You can rename the fields downstream with a Field Renamer processor, or you can specify a custom schema in the origin.
When you specify a custom schema, the origin uses the field names and data types defined in the schema, applying the first field in the schema to the first field in the record, and so on.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
Objects must use \n as the newline character. The origin skips empty lines.
JSON
By default, the origin generates a record for each line in the object. Each line in the object must contain valid JSON Lines data. For details, see the JSON Lines website.
If the JSON Lines data contains objects that span multiple lines, you must configure the origin to process multiline JSON objects. When processing multiline JSON objects, the origin generates a record for each JSON object, even if it spans multiple lines.
A standard single-line JSON Lines object can be split into partitions and processed in parallel. A multiline JSON object cannot be split, so must be processed in a single partition, which can slow pipeline performance.
By default, the origin uses the field names, field order, and data types in the data.
When you specify a custom schema, the origin matches the field names in the schema to those in the data, then applies the data types and field order defined in the schema.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
ORC
The origin generates a record for each Optimized Row Columnar (ORC) row in the object.
Parquet
The origin generates records for every Parquet record in the object. The object must contain the Parquet schema. The origin uses the Parquet schema to generate records.
Text
The origin generates a record for each text line in the object. The object must use \n as the newline character.
The record includes a single field named Value where the origin writes the string data.
XML
The origin generates a record for every row in the object. You specify the root tag used in files and the row tag used to define records.
When reading XML-formatted partition files grouped by field, the origin cannot include the field and value from each group.

Configuring an Amazon S3 Origin

Configure an Amazon S3 origin to read data in Amazon S3. Before you run the pipeline, make sure to complete the prerequisite tasks.

  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Stage library to use to connect to Amazon S3:
    • AWS cluster-provided libraries - The cluster where the pipeline runs has Apache Hadoop Amazon Web Services libraries installed, and therefore has all of the necessary libraries to run the pipeline.
    • AWS Transformer-provided libraries for Hadoop 2.7.7 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries required for Hadoop 2.7.7.

    • AWS Transformer-provided libraries for Hadoop 3.2.0 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries required for Hadoop 3.2.0.

    • AWS Transformer-provided libraries for Hadoop 3.3.4 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries required for Hadoop 3.3.4.

    Note: When using additional Amazon stages in the pipeline, ensure that they use the same stage library.
    Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run.

    Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.

    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.

    Skip Offset Tracking Skips tracking offsets.

    The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.

  2. On the Amazon S3 tab, configure the following properties:
    Amazon S3 Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Authentication Method Authentication method used to connect to Amazon Web Services (AWS):
    • AWS Keys - Authenticates using an AWS access key pair.
    • Instance Profile - Authenticates using an instance profile associated with the Transformer EC2 instance.
    • None - Connects to a public bucket using no authentication.
    Access Key ID AWS access key ID. Required when using AWS keys to authenticate with AWS.
    Secret Access Key AWS secret access key. Required when using AWS keys to authenticate with AWS.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Assume Role Temporarily assumes another role to authenticate with AWS.
    Important: Transformer supports assuming another role when the pipeline meets the stage library and cluster type requirements.
    Role Session Name

    Optional name for the session created by assuming a role. Overrides the default unique identifier for the session.

    Available when assuming another role.

    Role ARN

    Amazon resource name (ARN) of the role to assume, entered in the following format:

    arn:aws:iam::<account_id>:role/<role_name>

    Where <account_id> is the ID of your AWS account and <role_name> is the name of the role to assume. You must create and attach an IAM trust policy to this role that allows the role to be assumed.

    Available when assuming another role.

    Session Timeout

    Maximum number of seconds for each session created by assuming a role. The session is refreshed if the pipeline continues to run for longer than this amount of time.

    Set to a value between 3,600 seconds and 43,200 seconds.

    Available when assuming another role.

    Set Session Tags

    Sets a session tag to record the name of the currently logged in StreamSets user that starts the pipeline or the job for the pipeline. AWS IAM verifies that the user account set in the session tag can assume the specified role.

    Select only when the IAM trust policy attached to the role to be assumed uses session tags and restricts the session tag values to specific user accounts.

    When cleared, the connection does not set a session tag.

    Available when assuming another role.

    External ID External ID included in an IAM trust policy that allows the specified role to be assumed.

    Available when assuming another role.

    Use Specific Region Specify the AWS region or endpoint to connect to.

    When cleared, the stage uses the Amazon S3 default global endpoint, s3.amazonaws.com.

    Region AWS region to connect to. Select one of the available regions. To specify an endpoint to connect to, select Other.
    Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name.
    Bucket and Path Location of the objects to read. You can use the following wildcards to define the location:
    • Question mark (?) to match a single character.
    • Asterisk (*) to match zero or more characters.

    The origin reads files in the specified bucket and in the first-level subfolders. For example, if you specify s3a://commerce/sales as the bucket and path, the origin reads all files in the /sales and /sales/US subfolders, but does not read files in the /sales/US/east subfolder.

    Use the following format:
    s3a://<bucket name>/<path to objects>/
    Object Name Pattern Pattern that defines the names of the objects to process. You can use the asterisk (*) and question mark (?) wildcards.

    For example, to read all objects in the specified bucket, use an asterisk ( * ).

    Exclusion Pattern Pattern that defines the names of the objects to exclude from processing. Use to exclude a subset of objects that are included by the Object Name Pattern property.

    You can use the asterisk (*) and question mark (?) wildcards.

    For example, if you set Object Name Pattern to * to read all objects in the bucket, you can set Exclusion Pattern to *.log to exclude log files from processing.

    Partition Base Path Bucket and path to the subfolders of partition objects grouped by fields.

    When reading partition objects grouped by fields, the origin requires this property to add the field and value from the subfolder name to the data set. For more information, see the Spark documentation.

    Enter a URL. For example, if the partition objects are located in the subfolders /warehouse/data/ID=1 and /warehouse/data/ID=2, then enter:

    s3a://bucket-name/warehouse/data/

    Not supported for the XML data format. When reading XML-formatted partition objects grouped by field, the origin cannot include the field and value from each group.

    Max Objects Per Batch Maximum number of objects to include in a batch.
  3. On the Post-Processing tab, optionally configure the following properties:
    Post-Processing Property Description
    Post Processing Action to take after successfully processing an object:
    • None - Keeps the object in place.
    • Archive - Copies or moves the object to another location.
    • Delete - Deletes the object.
    Archive Directory Location to store successfully processed objects. Use the following format:
    s3a://<bucket name>/<path to objects>/
  4. On the Advanced tab, optionally configure the following properties:
    Advanced Property Description
    Additional Configuration

    Additional HDFS properties to pass to an HDFS-compatible file system. Specified properties override those in Hadoop configuration files.

    To add properties, click the Add icon and define the HDFS property name and value. You can use simple or bulk edit mode to configure the properties. Use the property names and values as expected by your version of Hadoop.

    Block Size Block size to use when reading data, in bytes.

    Default is 33554432.

    Buffer Hint

    TCP socket buffer size hint, in bytes.

    Default is 8192.

    Maximum Connections Maximum number of connections to Amazon.

    Default is 1.

    Connection Timeout Seconds to wait for a response before closing the connection.
    Socket Timeout Seconds to wait for a response to a query.
    Retry Count Maximum number of times to retry requests.
    Use Proxy Specifies whether to use a proxy to connect.
    Proxy Host Proxy host.
    Proxy Port Proxy port.
    Proxy User User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Proxy Domain Optional domain name for the proxy server.
    Proxy Workstation Optional workstation for the proxy server.
  5. On the Data Format tab, configure the following properties:
    Data Format Property Description
    Data Format Format of the data. Select one of the following formats:
    • Avro (Spark 2.4 or later) - For Avro data processed by Spark 2.4 or later.
    • Avro (Spark 2.3) - For Avro data processed by Spark 2.3.
    • Delimited
    • JSON
    • ORC
    • Parquet
    • Text
    • XML
    Additional Data Format Configuration Additional data format properties to use. Specify needed data format properties not available on configuration tabs. Properties on the configuration tabs override additional data format properties when there are conflicts.
    For example, for the JSON format you might add the property allowUnquotedFieldNames if the data has unquoted fields. With the property set to True, the origin can read JSON data with the following content:
    {counter: 1}

    To add properties, click the Add icon and define the property name and value. You can use simple or bulk edit mode to configure the properties. Enter the property names and values expected by your version of Hadoop.

  6. For Avro data processed by Spark 2.4 or later, optionally configure the following properties:
    Avro/Spark 2.4 Property Description
    Avro Schema Optional Avro schema to use to process data. The specified Avro schema overrides any schema included in the objects.

    Specify the Avro schema in JSON format.

    Ignore Extension Processes all files in the specified directories.

    When not enabled, the origin only processes objects with the .avro extension.

  7. For delimited data, on the Data Format tab, optionally configure the following properties:
    Delimited Property Description
    Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character used in the data.
    Escape Character Escape character used in the data
    Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line.
  8. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    Multiline Enables processing multiline JSON Lines data.

    By default, the origin expects a single JSON object on each line of the file. Use this option to process JSON objects that span multiple lines.

  9. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Root Tag Tag used as the root element.

    Default is ROWS, which represents a <ROWS> root element.

    Row Tag Tag used as a record delineator.

    Default is ROW, which represents a <ROW> record delineator element.

  10. To use a custom schema for delimited or JSON data, click the Schema tab and configure the following properties:
    Schema Property Description
    Schema Mode Mode that determines the schema to use when processing data:
    • Infer from Data

      The origin infers the field names and data types from the data.

    • Use Custom Schema - JSON Format

      The origin uses a custom schema defined in the JSON format.

    • Use Custom Schema - DDL Format

      The origin uses a custom schema defined in the DDL format.

    Note that the schema is applied differently depending on the data format of the data.

    Schema Custom schema to use to process the data.

    Enter the schema in DDL or JSON format, depending on the selected schema mode.

    Error Handling Determines how the origin handles parsing errors:
    • Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
    • Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
    • Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
    Original Data Field Field where the data from the original record is written when the origin cannot parse the record.

    When writing the original record to a field, you must add the field to the custom schema as a String field.

    Available when using permissive error handling.