ADLS Gen2

The ADLS Gen2 origin reads data from Microsoft Azure Data Lake Storage Gen2. Every file must be fully written, include data of the same supported format, and use the same schema.

Note: When this stage is included in a pipeline that runs on an Azure HDInsight cluster, use an Azure HDInsight cluster version 4.0 or later.

When reading multiple files in a batch, the origin reads the oldest file first. Upon successfully reading a file, the origin can delete the file, move it to an archive directory, or leave it in the directory.

When the pipeline stops, the origin notes the last-modified timestamp of the last file that it processed and stores it as an offset. When the pipeline starts again, the origin continues processing from the last-saved offset by default. When needed, you can reset pipeline offsets to process all available files.

Before you use the origin, you must perform some prerequisite tasks.

When you configure the ADLS Gen2 origin, you specify the Azure authentication method to use and related properties. Or, you can have the origin use Azure authentication information configured in the cluster where the pipeline runs.

You configure the directory path to use and a name pattern for the files to read. The origin reads the files with matching names in the specified directory and its subdirectories. If the origin reads partition files grouped by field, you must specify the partition base path to include the fields and field values in the data. You can also configure a file name pattern for a subset of files to exclude from processing. You specify the data format of the data, related data format properties, and how to process successfully read files. When needed, you can define a maximum number of files to read in a batch.

You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets.

Prerequisites

Complete the following prerequisites, as needed, before you configure the ADLS Gen2 origin. Before using the origin in a local pipeline, complete these additional prerequisites.
  1. If necessary, create a new Azure Active Directory application for StreamSets Transformer.

    For information about creating a new application, see the Azure documentation.

  2. Ensure that the Azure Active Directory Transformer application has the appropriate access control to perform the necessary tasks.

    To read from Azure, the Transformer application requires Read and Execute permissions. If also writing to Azure, the application requires Write permission as well.

    For information about configuring Gen2 access control, see the Azure documentation.

  3. Install the Azure Blob File System driver on the cluster where the pipeline runs.

    Most recent cluster versions include the Azure Blob File System driver, azure-datalake-store.jar. However, older versions might require installing it. For more information about Azure Data Lake Storage Gen2 support for Hadoop, see the Azure documentation.

  4. Retrieve Azure Data Lake Storage Gen2 authentication information from the Azure portal for configuring the origin.

    You can skip this step if you want to use Azure authentication information configured in the cluster where the pipeline runs.

  5. Before using the stage in a local pipeline, ensure that Hadoop-related tasks are complete.

Retrieve Authentication Information

The ADLS Gen2 origin provides several ways to authenticate connections to Azure. Depending on the authentication method that you use, the origin requires different authentication details.

If the cluster where the pipeline runs has the necessary Azure authentication information configured, then that information is used by default. However, data preview is not available when using Azure authentication information configured in the cluster.

You can also specify Azure authentication information in stage properties. Any authentication information specified in stage properties takes precedence over the authentication information configured in the cluster.

The authentication information that the origin uses depends on the selected authentication method:
OAuth
When connecting using OAuth authentication, the origin requires the following information:
  • Application ID - Application ID for the Azure Active Directory Transformer application. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

  • Application Key - Authentication key for the Azure Active Directory Transformer application. Also known as the client key.

    For information on accessing the application key from the Azure portal, see the Azure documentation.

  • OAuth Token Endpoint - OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for Transformer. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.
To run a pipeline locally, you must use this authentication method. You can also use this authentication method for pipelines that run on a cluster.
Managed Service Identity
When connecting using Managed Service Identity authentication, the origin requires the following information:
  • Application ID - Application ID for the Azure Active Directory Transformer application. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

  • Tenant ID - Tenant ID for the Azure Active Directory Transformer application. Also known as the directory ID.

    For information on accessing the tenant ID from the Azure portal, see the Azure documentation.

You can use this authentication method for pipelines that run on a cluster.
Shared Key
When connecting using Shared Key authentication, the origin requires the following information:
  • Account Shared Key - Shared access key that Azure generated for the storage account.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

You can use this authentication method for pipelines that run on a cluster.

Schema Requirement

All files processed by the ADLS Gen2 origin must have the same schema.

When files have different schemas, the resulting behavior depends on the data format and the version of Spark that you use. For example, the origin might skip processing delimited files with a different schema, but add null values to Parquet files with a different schema.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For an ADLS Gen2 origin, Spark determines the partitioning based on the data format of the files being read:
Delimited, JSON, text, or XML
When reading text-based files, Spark can split the files into multiple partitions for processing, depending on the underlying file system. Multiline JSON files cannot be split, so are processed in a single partition.
Avro, ORC, or Parquet
When reading Avro, ORC, or Parquet files, Spark can split the file into multiple partitions for processing.

Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.

Data Formats

The ADLS Gen2 origin generates records based on the specified data format.

The origin can read the following data formats:

Avro
The origin generates a record for every Avro record in an Avro container file. Each file must contain the Avro schema. The origin uses the Avro schema to generate records.
When you configure the origin, you must specify the Avro option appropriate for the version of Spark to run the pipeline: Spark 2.3 or Spark 2.4 or later.
When using Spark 2.4 or later, you can define an Avro schema to use. The schema must be in JSON format. You can also configure the origin to process all files in the specified locations. By default, the origin only processes files with the .avro extension.
Delimited
The origin generates a record for each line in a delimited file. You can specify a custom delimiter, quote, and escape character used in the data.
By default, the origin uses the values in the first row for field names and creates records starting with the second row in the file. The origin infers data types from the data by default.
You can clear the Includes Header property to indicate that files do not contain a header row. When files do not include a header row, the origin names the first field _c0, the second field _c1, and so on. The origin also infers data types from the data by default. You can rename the fields downstream with a Field Renamer processor, or you can specify a custom schema in the origin.
When you specify a custom schema, the origin uses the field names and data types defined in the schema, applying the first field in the schema to the first field in the record, and so on.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
Files must use \n as the newline character. The origin skips empty lines.
JSON
By default, the origin generates a record for each line in a JSON Lines file. Each line in the file should contain a valid JSON object. For details, see the JSON Lines website.
If the JSON Lines file contains objects that span multiple lines, you must configure the origin to process multiline JSON objects. When processing multiline JSON objects, the origin generates a record for each JSON object, even if it spans multiple lines.
A standard, single-line JSON Lines file can be split into partitions and processed in parallel. A multiline JSON file cannot be split, so must be processed in a single partition, which can slow pipeline performance.
By default, the origin uses the field names, field order, and data types in the data.
When you specify a custom schema, the origin matches the field names in the schema to those in the data, then applies the data types and field order defined in the schema.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
ORC
The origin generates a record for each row in an Optimized Row Columnar (ORC) file.
Parquet
The origin generates records for every Parquet record in the file. The file must contain the Parquet schema. The origin uses the Parquet schema to generate records.
Text
The origin generates a record for each line in a text file. The file must use \n as the newline character.
The generated record consists of a single String field named Value that contains the data.
XML
The origin generates a record for every row defined in an XML file. You specify the root tag used in files and the row tag used to define records.
When reading XML-formatted partition files grouped by field, the origin cannot include the field and value from each group.

Configuring an ADLS Gen2 Origin

Configure an ADLS Gen2 origin to read files in Azure Data Lake Storage Gen2. Before you use the origin in a pipeline, complete the required prerequisites. Before using the origin in a local pipeline, complete these additional prerequisites.

Note: All processed files must share the same schema.
  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Stage library to use:
    • ADLS cluster-provided libraries - The cluster where the pipeline runs has Apache Hadoop Azure Data Lake libraries installed, and therefore has all of the necessary libraries to run the pipeline.

      Use when you want to use the Apache Hadoop Azure Data Lake library installed on the cluster. When the pipeline runs on an Apache Spark for HDInsight cluster, you must use this stage library.

    • ADLS Transformer-provided libraries for Hadoop 3.2.0 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include Apache Hadoop Azure Data Lake libraries version 3.2.0.

    Note: When using additional ADLS stages in the pipeline, ensure that they use the same stage library.
    Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run.

    Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.

    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.

    Skip Offset Tracking Skips tracking offsets.

    The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.

  2. On the ADLS tab, configure the following properties:
    ADLS Property Description
    Storage Account Azure Data Lake Storage Gen2 storage account name.
    File System Name of the file system to use.
    Authentication Method Authentication method to use to connect to Azure:
    • OAuth - Use to run pipelines locally. Can also be used to run pipelines on a cluster.
    • Managed Service Identity - Can be used to run pipelines on a cluster.
    • Shared Key - Can be used to run pipelines on a cluster.
    Application ID Application ID for the Azure Active Directory Transformer application. Also known as the client ID.

    When not specified, the stage uses the equivalent Azure authentication information configured in the cluster where the pipeline runs.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

    Available when using the OAuth or Managed Service Identity authentication methods.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Application Key Authentication key for the Azure Active Directory Transformerapplication. Also known as the client key.

    When not specified, the stage uses the equivalent Azure authentication information configured in the cluster where the pipeline runs.

    For information on accessing the application key from the Azure portal, see the Azure documentation.

    Available when using the OAuth authentication method.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    OAuth Token Endpoint OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for Transformer. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.

    When not specified, the stage uses the equivalent Azure authentication information configured in the cluster where the pipeline runs.

    Available when using the OAuth authentication method.

    Tenant ID Tenant ID for the Azure Active Directory Transformer application. Also known as the directory ID.

    When not specified, the stage uses the equivalent Azure authentication information configured in the cluster where the pipeline runs.

    For information on accessing the tenant ID from the Azure portal, see the Azure documentation.

    Available when using the Managed Service Identity authentication method.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Account Shared Key Shared access key that Azure generated for the storage account.

    When not specified, the stage uses the equivalent Azure authentication information configured in the cluster where the pipeline runs.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

    Available when using the Shared Key authentication method.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Directory Path Path to the directory that stores the source files. You can use the following wildcards to define the location:
    • Question mark (?) to match a single character.
    • Asterisk (*) to match zero or more characters.

    The origin reads files in the specified directory and in the first-level subdirectories. For example, if you specify /sales/ as the directory path, the origin reads all files in the /sales and /sales/US directories, but does not read files in the /sales/US/east directory.

    Use the following format:

    /<path to files>/

    File Name Pattern Pattern that defines the file names to process. You can use the asterisk (*) and question mark (?) wildcards.

    Default is *, which processes all files.

    Exclusion Pattern Pattern that defines the file names to exclude from processing. Use to exclude a subset of files that are included by the File Name Pattern property.

    You can use the asterisk (*) and question mark (?) wildcards.

    For example, if you set File Name Pattern to * to process all files in the directory, you can set File Name Exclude Pattern to *.log to exclude log files from processing.

    Partition Base Path Path to the root directory of partition files grouped by fields.

    When reading partition files grouped by fields, the origin requires this property to add the field and value from the subdirectory name to the data set. For more information, see the Spark documentation.

    Enter a URL. For example, if the partition files are located in the subdirectories /warehouse/data/ID=1 and /warehouse/data/ID=2, then enter:

    abfss://filesystem-id@deltalake-store-account-fqdn/warehouse/data/

    Not supported for the XML data format. When reading XML-formatted partition files grouped by field, the origin cannot include the field and value from each group.

    Max Files Per Batch Maximum number of files to read in a batch.

    This property can be useful in streaming pipelines that need to process an initial backlog of files.

    Default is 0 for no limit.

    Post Processing Action to take on successfully read files:
    • Delete - Remove the file from the directory.
    • Archive - Move the file to an archive directory.
    • None - Leave the file in the directory where found.
    Archive Directory Directory to store successfully read files when the origin archives files.
    Additional Configuration

    Additional HDFS properties to pass to an HDFS-compatible file system. Specified properties override those in Hadoop configuration files.

    To add properties, click the Add icon and define the HDFS property name and value. You can use simple or bulk edit mode to configure the properties. Use the property names and values as expected by your version of Hadoop.

  3. On the Data Format tab, configure the following properties:
    Data Format Property Description
    Data Format Format of the data. Select one of the following formats:
    • Avro (Spark 2.4 or later) - For Avro data processed by Spark 2.4 or later.
    • Avro (Spark 2.3) - For Avro data processed by Spark 2.3.
    • Delimited
    • JSON
    • ORC
    • Parquet
    • Text
    • XML
    Additional Data Format Configuration Additional data format properties to use. Specify needed data format properties not available on configuration tabs. Properties on the configuration tabs override additional data format properties when there are conflicts.
    For example, for the JSON format you might add the property allowUnquotedFieldNames if the data has unquoted fields. With the property set to True, the origin can read JSON data with the following content:
    {counter: 1}

    To add properties, click the Add icon and define the property name and value. You can use simple or bulk edit mode to configure the properties. Enter the property names and values expected by your version of Hadoop.

  4. For Avro data processed by Spark 2.4 or later, optionally configure the following properties:
    Avro/Spark 2.4 Property Description
    Avro Schema Optional Avro schema to use to process data. The specified Avro schema overrides any schema included in the files.

    Specify the Avro schema in JSON format.

    Ignore Extension Processes all files in the specified directories.

    When not enabled, the origin only processes files with the .avro extension.

  5. For delimited data, on the Data Format tab, optionally configure the following properties:
    Delimited Property Description
    Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character used in the data.
    Escape Character Escape character used in the data
    Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line.
  6. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    Multiline Enables processing multiline JSON Lines data.

    By default, the origin expects a single JSON object on each line of the file. Use this option to process JSON objects that span multiple lines.

  7. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Root Tag Tag used as the root element.

    Default is ROWS, which represents a <ROWS> root element.

    Row Tag Tag used as a record delineator.

    Default is ROW, which represents a <ROW> record delineator element.

  8. To use a custom schema for delimited or JSON data, click the Schema tab and configure the following properties:
    Schema Property Description
    Schema Mode Mode that determines the schema to use when processing data:
    • Infer from Data

      The origin infers the field names and data types from the data.

    • Use Custom Schema - JSON Format

      The origin uses a custom schema defined in the JSON format.

    • Use Custom Schema - DDL Format

      The origin uses a custom schema defined in the DDL format.

    Note that the schema is applied differently depending on the data format of the data.

    Schema Custom schema to use to process the data.

    Enter the schema in DDL or JSON format, depending on the selected schema mode.

    Error Handling Determines how the origin handles parsing errors:
    • Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
    • Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
    • Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
    Original Data Field Field where the data from the original record is written when the origin cannot parse the record.

    When writing the original record to a field, you must add the field to the custom schema as a String field.

    Available when using permissive error handling.