File

The File origin reads data from files in Hadoop Distributed File System (HDFS) or a local file system. Every file must be fully written, include data of the same supported format, and use the same schema.

The File origin reads from HDFS using connection information stored in a Hadoop configuration file.

When reading multiple files in a batch, the origin reads the oldest file first. Upon successfully reading a file, the origin can delete the file, move it to an archive directory, or leave it in the directory.

When the pipeline stops, the origin notes the last-modified timestamp of the last file that it processed and stores it as an offset. When the pipeline starts again, the origin continues processing from the last-saved offset by default. When needed, you can reset pipeline offsets to process all available files.

When you configure the File origin, you specify the directory path to use and a name pattern for the files to read. The origin reads the files with matching names in the specified directory and its subdirectories. If the origin reads partition files grouped by field, you must specify the partition base path to include the fields and field values in the data. You can also configure a file name pattern for a subset of files to exclude from processing. You specify the data format of the data, related data format properties, and how to process successfully read files. When needed, you can define a maximum number of files to read in a batch.

You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties.

You can also specify HDFS configuration properties for a HDFS-compatible system. Any specified properties override those defined in the Hadoop configuration file.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets.

Schema Requirement

All files processed by the File origin must have the same schema.

When files have different schemas, the resulting behavior depends on the data format and the version of Spark that you use. For example, the origin might skip processing delimited files with a different schema, but add null values to Parquet files with a different schema.

Directory Path

When you configure the File origin, you specify the directory path to use. You can use the following wildcards to define a set of directories to read from:
  • Question mark (?) to match a single character.
  • Asterisk (*) to match zero or more characters.

The origin reads all files in the specified directory and in the first-level subdirectories. For example, if you specify file:///sales as the directory path, the origin reads all files in the /sales and /sales/US directories, but does not read files in the /sales/US/east directory.

In each batch, the origin reads any files added to the directory path since the last batch completed.

The format of the directory path depends on the file system that you want to read from:

HDFS
To read files in HDFS, use the following format for the directory path:
hdfs://<authority>/<path>
For example, to read from the /user/hadoop/files directory on HDFS, enter the following path:
hdfs://nameservice/user/hadoop/files
Local file system
To read files in a local file system, use the following format for the directory path:
file:///<directory>
For example, to read from the /Users/transformer/source directory on the local file system, enter the following path:
file:///Users/transformer/source

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For a File origin, Spark determines partitioning based on the data format of the files being read:
Delimited, JSON, text, or XML
When reading text-based files from a local file system, Spark creates one partition for each file being read.
When reading text-based files from HDFS, Spark can split the files into multiple partitions for processing, depending on the underlying file system. Multiline JSON files cannot be split, so are processed in a single partition.
Avro, ORC, or Parquet
When reading Avro, ORC, or Parquet files, Spark can split the file into multiple partitions for processing.

Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.

Data Formats

The File origin generates records based on the specified data format.

The origin can read the following data formats:
Avro
The origin generates a record for every Avro record in an Avro container file. Each file must contain the Avro schema. The origin uses the Avro schema to generate records.
You can define an Avro schema to use. The schema must be in JSON format. You can also configure the origin to process all files in the specified locations. By default, the origin only processes files with the .avro extension.
Delimited
The origin generates a record for each line in a delimited file. You can specify a custom delimiter, quote, and escape character used in the data.
By default, the origin uses the values in the first row for field names and creates records starting with the second row in the file. The origin infers data types from the data by default.
You can clear the Includes Header property to indicate that files do not contain a header row. When files do not include a header row, the origin names the first field _c0, the second field _c1, and so on. The origin also infers data types from the data by default. You can rename the fields downstream with a Field Renamer processor, or you can specify a custom schema in the origin.
When you specify a custom schema, the origin uses the field names and data types defined in the schema, applying the first field in the schema to the first field in the record, and so on.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
Files must use \n as the newline character. The origin skips empty lines.
JSON
By default, the origin generates a record for each line in a JSON Lines file. Each line in the file should contain a valid JSON object. For details, see the JSON Lines website.
If the JSON Lines file contains objects that span multiple lines, you must configure the origin to process multiline JSON objects. When processing multiline JSON objects, the origin generates a record for each JSON object, even if it spans multiple lines.
A standard, single-line JSON Lines file can be split into partitions and processed in parallel. A multiline JSON file cannot be split, so must be processed in a single partition, which can slow pipeline performance.
By default, the origin uses the field names, field order, and data types in the data.
When you specify a custom schema, the origin matches the field names in the schema to those in the data, then applies the data types and field order defined in the schema.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
ORC
The origin generates a record for each row in an Optimized Row Columnar (ORC) file.
Parquet
The origin generates records for every Parquet record in the file. The file must contain the Parquet schema. The origin uses the Parquet schema to generate records.
Text
The origin generates a record for each line in a text file. The file must use \n as the newline character.
The generated record consists of a single String field named Value that contains the data.
XML
The origin generates a record for every row defined in an XML file. You specify the root tag used in files and the row tag used to define records.
When reading XML-formatted partition files grouped by field, the origin cannot include the field and value from each group.

Configuring a File Origin

Configure a File origin to read data from files in HDFS or the local file system.
Note: All processed files must share the same schema.
  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run.

    Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.

    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.

    Skip Offset Tracking Skips tracking offsets.

    The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.

  2. On the File tab, configure the following properties:
    File Property Description
    Directory Path Path to the directory that stores the source files. You can use the following wildcards to define the location:
    • Question mark (?) to match a single character.
    • Asterisk (*) to match zero or more characters.

    The origin reads files in the specified directory and in the first-level subdirectories.

    To read from HDFS, use the following format:

    hdfs://<authority>/<path>

    To read from a local file system, use the following format:

    file:///<directory>
    File Name Pattern Pattern that defines the file names to process. You can use the asterisk (*) and question mark (?) wildcards.

    Default is *, which processes all files.

    Exclusion Pattern Pattern that defines the file names to exclude from processing. Use to exclude a subset of files that are included by the File Name Pattern property.

    You can use the asterisk (*) and question mark (?) wildcards.

    For example, if you set File Name Pattern to * to process all files in the directory, you can set File Name Exclude Pattern to *.log to exclude log files from processing.

    Partition Base Path Path to the root directory of partition files grouped by fields.

    When reading partition files grouped by fields, the origin requires this property to add the field and value from the subdirectory name to the data set. For more information, see the Spark documentation.

    Enter an absolute path. For example, if the partition files are located in the subdirectories /warehouse/data/ID=1 and /warehouse/data/ID=2, then enter:

    /warehouse/data

    Not supported for the XML data format. When reading XML-formatted partition files grouped by field, the origin cannot include the field and value from each group.

    Max Files Per Batch Maximum number of files to read in a batch.

    This property can be useful in streaming pipelines that need to process an initial backlog of files.

    Default is 0 for no limit.

    Post Processing Action to take on successfully read files:
    • Delete - Remove the file from the directory.
    • Archive - Move the file to an archive directory.
    • None - Leave the file in the directory where found.
    Archive Directory Directory to store successfully read files when the origin archives files.
    Additional Configuration Additional HDFS configuration properties to use. Specified properties override those in Hadoop configuration files.

    To add properties, click the Add icon and define the HDFS property name and value. You can use simple or bulk edit mode to configure the properties. Use the property names and values as expected by your version of Hadoop.

  3. On the Data Format tab, configure the following properties:
    Data Format Property Description
    Data Format Format of the data. Select one of the following formats:
    • Avro
    • Delimited
    • JSON
    • ORC
    • Parquet
    • Text
    • XML
    Additional Data Format Configuration Additional data format properties to use. Specify needed data format properties not available on configuration tabs. Properties on the configuration tabs override additional data format properties when there are conflicts.
    For example, for the JSON format you might add the property allowUnquotedFieldNames if the data has unquoted fields. With the property set to True, the origin can read JSON data with the following content:
    {counter: 1}

    To add properties, click the Add icon and define the property name and value. You can use simple or bulk edit mode to configure the properties. Enter the property names and values expected by your version of Hadoop.

  4. For Avro data, optionally configure the following properties:
    Avro Property Description
    Avro Schema Optional Avro schema to use to process data. The specified Avro schema overrides any schema included in the files.

    Specify the Avro schema in JSON format.

    Ignore Extension Processes all files in the specified directories.

    When not enabled, the origin only processes files with the .avro extension.

  5. For delimited data, on the Data Format tab, optionally configure the following properties:
    Delimited Property Description
    Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character used in the data.
    Escape Character Escape character used in the data
    Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line.
  6. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    Multiline Enables processing multiline JSON Lines data.

    By default, the origin expects a single JSON object on each line of the file. Use this option to process JSON objects that span multiple lines.

  7. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Root Tag Tag used as the root element.

    Default is ROWS, which represents a <ROWS> root element.

    Row Tag Tag used as a record delineator.

    Default is ROW, which represents a <ROW> record delineator element.

  8. To use a custom schema for delimited or JSON data, click the Schema tab and configure the following properties:
    Schema Property Description
    Schema Mode Mode that determines the schema to use when processing data:
    • Infer from Data

      The origin infers the field names and data types from the data.

    • Use Custom Schema - JSON Format

      The origin uses a custom schema defined in the JSON format.

    • Use Custom Schema - DDL Format

      The origin uses a custom schema defined in the DDL format.

    Note that the schema is applied differently depending on the data format of the data.

    Schema Custom schema to use to process the data.

    Enter the schema in DDL or JSON format, depending on the selected schema mode.

    Error Handling Determines how the origin handles parsing errors:
    • Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
    • Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
    • Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
    Original Data Field Field where the data from the original record is written when the origin cannot parse the record.

    When writing the original record to a field, you must add the field to the custom schema as a String field.

    Available when using permissive error handling.