Amazon S3
The Amazon S3 origin reads objects stored in Amazon Simple Storage Service, also known as Amazon S3. The objects must be fully written, include data of the same supported format, and use the same schema.
When reading multiple objects in a batch, the origin reads the oldest object first. Upon successfully reading an object, the origin can delete the object, move it to an archive directory, or leave it in the directory.
When the pipeline stops, the origin notes the last-modified timestamp of the last object that it processed and stores it as an offset. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset pipeline offsets to process all available objects.
Before you run a pipeline that uses the Amazon S3 origin, make sure to complete the prerequisite tasks.
When you configure the origin, you specify the authentication method to use. You define the bucket and path to the objects to read. The origin reads objects from the specified directory and its subdirectories. If the origin reads partition objects grouped by field, you must specify the partition base path to include the fields and field values in the data. You also specify the name pattern for the objects to read. You can optionally configure another name pattern to exclude objects from processing and define post-processing actions for successfully read objects.
You can also use a connection to configure the origin.
You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties. You can also configure advanced properties such as performance-related properties and proxy server properties.
You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets.
Prerequisites
Before reading from Amazon S3 with the Amazon S3 origin, complete the following prerequisite tasks:
- Verify permissions
- The user associated with the authentication credentials in effect must have
the following Amazon S3 permissions:
- READ permission on the bucket
- s3:ListBucket permission in an AWS Identity and Access Management (IAM) policy
- Use a single schema
- All objects processed by the Amazon S3 origin must have the same schema.
- Perform prerequisite tasks for local pipelines
-
To connect to Amazon S3, Transformer uses connection information stored in a Hadoop configuration file. Before you run a local pipeline that connects to Amazon S3, complete the prerequisite tasks.
Authentication Method
You can configure the Amazon S3 origin to authenticate with Amazon Web Services (AWS) using an instance profile or AWS access keys. When accessing a public bucket, you can connect anonymously using no authentication.
For more information about the authentication methods and details on how to configure each method, see Amazon Security.
Partitioning
Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.
- Delimited, JSON, text, or XML
- When reading text-based data, Spark can split the object into multiple partitions for processing, depending on the underlying file system. Multiline JSON files cannot be split.
- Avro, ORC, or Parquet
- When reading Avro, ORC, or Parquet data, Spark can split the object into
multiple partitions for processing.
Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.
Data Formats
The Amazon S3 origin generates records based on the specified data format.
- Avro
- The origin generates a record for every Avro record in the object. Each object must contain the Avro schema. The origin uses the Avro schema to generate records.
- Delimited
- The origin generates a record for each delimited line in the object. You can specify a custom delimiter, quote, and escape character used in the data.
- JSON
- By default, the origin generates a record for each line in the object. Each line in the object must contain valid JSON Lines data. For details, see the JSON Lines website.
- ORC
- The origin generates a record for each Optimized Row Columnar (ORC) row in the object.
- Parquet
- The origin generates records for every Parquet record in the object. The object must contain the Parquet schema. The origin uses the Parquet schema to generate records.
- Text
- The origin generates a record for each text line in the object. The object
must use
\n
as the newline character. - XML
- The origin generates a record for every row in the object. You specify the root tag used in files and the row tag used to define records.
Configuring an Amazon S3 Origin
Configure an Amazon S3 origin to read data in Amazon S3. Before you run the pipeline, make sure to complete the prerequisite tasks.
-
On the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Stage Library Stage library to use to connect to Amazon S3: - AWS cluster-provided libraries - The cluster where the pipeline runs has Apache Hadoop Amazon Web Services libraries installed, and therefore has all of the necessary libraries to run the pipeline.
- AWS Transformer-provided libraries - Transformer passes the necessary libraries with the pipeline
to enable running the pipeline.
Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries for Hadoop.
Select the appropriate version for your cluster.
Note: When using additional Amazon stages in the pipeline, ensure that they use the same stage library.Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run. Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.
Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.
Skip Offset Tracking Skips tracking offsets. The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.
-
On the Amazon S3 tab, configure the following
properties:
Amazon S3 Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Authentication Method Authentication method used to connect to Amazon Web Services (AWS): - AWS Keys - Authenticates using an AWS access key pair.
- Instance Profile - Authenticates using an instance profile associated with the Transformer EC2 instance.
- None - Connects to a public bucket using no authentication.
Access Key ID AWS access key ID. Required when using AWS keys to authenticate with AWS. Secret Access Key AWS secret access key. Required when using AWS keys to authenticate with AWS. Assume Role Temporarily assumes another role to authenticate with AWS. Important: Transformer supports assuming another role when the pipeline meets the stage library and cluster type requirements.Role Session Name Optional name for the session created by assuming a role. Overrides the default unique identifier for the session.
Available when assuming another role.
Role ARN Amazon resource name (ARN) of the role to assume, entered in the following format:
arn:aws:iam::<account_id>:role/<role_name>
Where
<account_id>
is the ID of your AWS account and<role_name>
is the name of the role to assume. You must create and attach an IAM trust policy to this role that allows the role to be assumed.Available when assuming another role.
Session Timeout Maximum number of seconds for each session created by assuming a role. The session is refreshed if the pipeline continues to run for longer than this amount of time.
Set to a value between 3,600 seconds and 43,200 seconds.
Available when assuming another role.
Set Session Tags Sets a session tag to record the name of the currently logged in StreamSets user that starts the pipeline or the job for the pipeline. AWS IAM verifies that the user account set in the session tag can assume the specified role.
Select only when the IAM trust policy attached to the role to be assumed uses session tags and restricts the session tag values to specific user accounts.
When cleared, the connection does not set a session tag.
Available when assuming another role.
External ID External ID included in an IAM trust policy that allows the specified role to be assumed. Available when assuming another role.
Use Specific Region Specify the AWS region or endpoint to connect to. When cleared, the stage uses the Amazon S3 default global endpoint, s3.amazonaws.com.
Region AWS region to connect to. Select one of the available regions. To specify an endpoint to connect to, select Other. Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name. Bucket and Path Location of the objects to read. You can use the following wildcards to define the location: - Question mark (?) to match a single character.
- Asterisk (*) to match zero or more characters.
The origin reads files in the specified bucket and in the first-level subfolders. For example, if you specify
s3a://commerce/sales
as the bucket and path, the origin reads all files in the/sales
and/sales/US
subfolders, but does not read files in the/sales/US/east
subfolder.Use the following format:s3a://<bucket name>/<path to objects>/
Object Name Pattern Pattern that defines the names of the objects to process. You can use the asterisk (*) and question mark (?) wildcards. For example, to read all objects in the specified bucket, use an asterisk ( * ).
Exclusion Pattern Pattern that defines the names of the objects to exclude from processing. Use to exclude a subset of objects that are included by the Object Name Pattern property. You can use the asterisk (*) and question mark (?) wildcards.
For example, if you set Object Name Pattern to
*
to read all objects in the bucket, you can set Exclusion Pattern to*.log
to exclude log files from processing.Partition Base Path Bucket and path to the subfolders of partition objects grouped by fields. When reading partition objects grouped by fields, the origin requires this property to add the field and value from the subfolder name to the data set. For more information, see the Spark documentation.
Enter a URL. For example, if the partition objects are located in the subfolders /warehouse/data/ID=1 and /warehouse/data/ID=2, then enter:
s3a://bucket-name/warehouse/data/
Not supported for the XML data format. When reading XML-formatted partition objects grouped by field, the origin cannot include the field and value from each group.
Max Objects Per Batch Maximum number of objects to include in a batch. -
On the Post-Processing tab, optionally configure the
following properties:
Post-Processing Property Description Post Processing Action to take after successfully processing an object: - None - Keeps the object in place.
- Archive - Copies or moves the object to another location.
- Delete - Deletes the object.
Archive Directory Location to store successfully processed objects. Use the following format: s3a://<bucket name>/<path to objects>/
-
On the Advanced tab, optionally configure the following
properties:
Advanced Property Description Additional Configuration Additional HDFS properties to pass to an HDFS-compatible file system. Specified properties override those in Hadoop configuration files.
To add properties, click the Add icon and define the HDFS property name and value. You can use simple or bulk edit mode to configure the properties. Use the property names and values as expected by your version of Hadoop.
Block Size Block size to use when reading data, in bytes. Default is 33554432.
Buffer Hint TCP socket buffer size hint, in bytes.
Default is 8192.
Maximum Connections Maximum number of connections to Amazon. Default is 1.
Connection Timeout Seconds to wait for a response before closing the connection. Socket Timeout Seconds to wait for a response to a query. Retry Count Maximum number of times to retry requests. Use Proxy Specifies whether to use a proxy to connect. Proxy Host Proxy host. Proxy Port Proxy port. Proxy User User name for proxy credentials. Proxy Password Password for proxy credentials. Proxy Domain Optional domain name for the proxy server. Proxy Workstation Optional workstation for the proxy server. -
On the Data Format tab, configure the following
properties:
Data Format Property Description Data Format Format of the data. Select one of the following formats: - Avro
- Delimited
- JSON
- ORC
- Parquet
- Text
- XML
Additional Data Format Configuration Additional data format properties to use. Specify needed data format properties not available on configuration tabs. Properties on the configuration tabs override additional data format properties when there are conflicts. For example, for the JSON format you might add the propertyallowUnquotedFieldNames
if the data has unquoted fields. With the property set to True, the origin can read JSON data with the following content:{counter: 1}
To add properties, click the Add icon and define the property name and value. You can use simple or bulk edit mode to configure the properties. Enter the property names and values expected by your version of Hadoop.
-
For Avro data, optionally configure the following
properties:
Avro Property Description Avro Schema Optional Avro schema to use to process data. The specified Avro schema overrides any schema included in the objects. Specify the Avro schema in JSON format.
Ignore Extension Processes all files in the specified directories. When not enabled, the origin only processes objects with the
.avro
extension. -
For delimited data, on the Data Format tab, optionally
configure the following properties:
Delimited Property Description Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Quote Character Quote character used in the data. Escape Character Escape character used in the data Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line. -
For JSON data, on the Data Format tab, configure the
following property:
JSON Property Description Multiline Enables processing multiline JSON Lines data. By default, the origin expects a single JSON object on each line of the file. Use this option to process JSON objects that span multiple lines.
-
For XML data, on the Data Format tab, configure the
following properties:
XML Property Description Root Tag Tag used as the root element. Default is ROWS, which represents a <ROWS> root element.
Row Tag Tag used as a record delineator. Default is ROW, which represents a <ROW> record delineator element.
-
To use a custom schema for delimited or JSON data, click the
Schema tab and configure the following
properties:
Schema Property Description Schema Mode Mode that determines the schema to use when processing data: - Infer from Data
The origin infers the field names and data types from the data.
- Use Custom Schema - JSON Format
The origin uses a custom schema defined in the JSON format.
- Use Custom Schema - DDL Format
The origin uses a custom schema defined in the DDL format.
Note that the schema is applied differently depending on the data format of the data.
Schema Custom schema to use to process the data. Enter the schema in DDL or JSON format, depending on the selected schema mode.
Error Handling Determines how the origin handles parsing errors: - Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
- Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
- Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
Original Data Field Field where the data from the original record is written when the origin cannot parse the record. When writing the original record to a field, you must add the field to the custom schema as a String field.
Available when using permissive error handling.
- Infer from Data