The Hadoop FS destination writes data to the Hadoop Distributed File System (HDFS). You can write the data to HDFS as flat files or Hadoop sequence files. You can also use the whole file data format to write whole files to HDFS.
When you configure a Hadoop FS destination, you can define a directory template and time basis to determine the output directories that the destination creates and the files where records are written.
As part of the Drift Synchronization Solution for Hive, you can alternatively use record header attributes to perform record-based writes. You can write records to the specified directory, use the defined Avro schema, and roll files based on record header attributes. For more information, see Record Header Attributes for Record-Based Writes.
You can define a file prefix and suffix, the data time zone, and properties that define when the destination closes a file. You can specify the amount of time that a record can be written to its associated directory and what happens to late records.
You can configure the Hadoop FS destination to write to Azure HDInsight.
The destination can generate events for an event stream. For more information about the event framework, see Dataflow Triggers Overview.
When necessary, you can enable Kerberos authentication and specify a Hadoop user. You can also use Hadoop configuration files and add other Hadoop configuration properties as needed.
You can use Gzip, Bzip2, Snappy, LZ4, and other compression formats to write output files.
By default, the Hadoop FS destination uses directory templates to create output and late record directories. Hadoop FS writes records to the directories based on the configured time basis.
You can alternatively write records to directories based on the targetDirectory record header attribute. Using the targetDirectory attribute disables the ability to define directory templates.
When you define a directory template, you can use a mix of constants, field values, and datetime variables. You can use the every function to create new directories at regular intervals based on hours, minutes, or seconds, starting on the hour. You can also use the record:valueOrDefault function to use field values or a default in the directory template.
/outputfiles/${record:valueOrDefault("/State", "unknown")}/${YY()}-${MM()}-${DD()}-${hh()}
${YYYY()}-${MM()}-${DD()}
${YY()}_${MM()}_${DD()}
/HDFS_output/${YYYY()}-${MM()}-${DD()}-${hh()}-${every(5,mm())}
${record:valueOrDefault(<field path>, <default value>)}
/${record:valueOrDefault("/Product", "Misc")}/${YY()}-${MM()}-${DD()}
/Shirts/2015-07-31 /Misc/2015-07-31
When using directory templates, the time basis helps determine when directories are created. It also determines the directory Hadoop FS uses when writing a record, and whether a record is late.
When using the targetDirectory record header attribute to write records, the time basis determines only whether a record is late.
You can use the following times as the time basis:
When you use a record time as the time basis, you can define a time limit for records to be written to their associated output file. When the destination creates a new output file in a new directory, the previous output file is kept open for the specified late record time limit. When records that belong in that file arrive within the time limit, the destination writes the records to the open output file. When the late record time limit is reached, the output file is closed and any record that arrives past this limit is considered late.
You can send late records to a late records file or to the stage for error handling. When you send records to a late records file, you define a late records directory template.
/tmp/out/${YYYY()}-${MM()}-${DD()}-${hh()}
The first records that arrive have a datetime between the hours of 02:00 and 02:59, and so are written to an output file in the 02 directory. When records with a datetime between the hours of 03:00 and 03:59 arrive, the destination creates a new file in an 03 directory. The destination keeps the file in the 02 directory open for another hour.
If a record with a datetime between the hours of 02:00 and 02:59 arrives before the hour time limit, the destination writes the record to the open file in the 02 directory. After one hour, the destination closes the output file in the 02 directory. Any records with a datetime between the hours of 02:00 and 02:59 that arrive after the one hour time limit are considered late. The late records are sent to the stage for error handling.
You can configure the maximum time that an open output file can remain idle. After no records are written to an output file for the specified amount of time, the Hadoop FS destination closes the file.
You might want to configure an idle timeout when output files remain open and idle for too long, thus delaying another system from processing the files.
For example, when a record with a datetime of 03:00 arrives, the destination creates a new file in a new 03 directory. The previous file in the 02 directory is kept open for the late record time limit, which is an hour by default. However, when records arrive in chronological order, no records that belong in the 02 directory arrive after the 03 directory is created.
In either situation, configure an idle timeout so that other systems can process the files sooner, instead of waiting for the configured maximum records, maximum file size, or late records conditions to occur.
The Hadoop FS destination supports recovery after an unexpected stop of the pipeline by renaming temporary files when the pipeline restarts.
_tmp_<prefix>_<runnerId>Where <prefix> is the file prefix defined for the destination and <runnerId> is the ID of the pipeline runner performing the pipeline processing. For example, when the destination prefix is defined as sdc and the destination runs from a single-threaded pipeline, the temporary file is named like so: _tmp_sdc_0.
<prefix>_e7ce67c5-013d-47a7-9496-8c882ddb28a0
However, when the pipeline stops unexpectedly, the temporary files remain. When the pipeline restarts, the destination scans all subdirectories of the defined directory template to rename any temporary files that match the defined prefix for the destination. After the destination renames the temporary files, it continues writing to new output files.
/tmp/out/${YY()}-${MM()}-${DD()}/${sdc:hostname()}/${record:value('/a')}/${record:value('/b')}
In either of these situations, you must manually rename the temporary files.
File recovery can slow down the pipeline as it restarts. If needed, you can configure the destination to skip file recovery.
If using the Avro schema in the stage or in the record header attribute, you can optionally configure the destination to register the Avro schema with the Confluent Schema Registry.
You can use the HDP stage libraries to write to Azure Blob storage using the WASB protocol. This enables the Hadoop FS destination to write directly to Azure HDInsight.
To write to an Azure HDInsight cluster, Data Collector can be installed anywhere. It can be installed on a node in the HDInsight cluster or outside of the cluster entirely.
You can use simple or bulk edit mode to add configuration properties.
fs.azure.account.key.<storage account name>.blob.core.windows.netFor example, if the storage account name is "sdchd", then enter the following name for the property:
fs.azure.account.key.sdchd.blob.core.windows.net

<wasb[s]>://<container name>@<storage account name>.blob.core.windows.net/<path to files>
In the URI, <container name> is the Azure container name. And <storage account name> is the same Azure storage account name that you used for the Hadoop FS configuration property.
wasbs://sdc-hd@sdchd.blob.core.windows.net/files
A page like the following displays with the container name and storage account name:

The following image shows how to configure the Hadoop FS destination to write to HDInsight using the Azure account information in the examples above:

The Hadoop FS destination can generate events that you can use in an event stream. When you enable event generation, the destination generates event records each time the destination closes a file or completes streaming a whole file.
For an example, see Case Study: Output File Management.
For an example, see Case Study: Impala Metadata Updates for DDS for Hive.
For an example, see Case Study: Parquet Conversion.
For an example, see Case Study: Sending Email.
For an example, see Case Study: Event Storage.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Hadoop FS event records include the following event-related record header attributes. Record header attributes are stored as String values:
| Record Header Attribute | Description |
|---|---|
| sdc.event.type | Event type. Uses one of the following types:
|
| sdc.event.version | An integer that indicates the version of the event record type. |
| sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
| Field | Description |
|---|---|
| filepath | Absolute path to the closed file. |
| filename | File name of the closed file. |
| length | Size of the closed file in bytes. |
| Field | Description |
|---|---|
| sourceFileInfo | A map of attributes about the original
whole file that was processed. The attributes
include:
Additional attributes depend on the information provided by the origin system. |
| targetFileInfo | A map of attributes about the whole file
written to the destination. The attributes
include:
|
| checksum | Checksum generated for the written file.
Included only when you configure the destination to include checksums in the event record. |
| checksumAlgorithm | Algorithm used to generate the checksum.
Included only when you configure the destination to include checksums in the event record. |
You can use Kerberos authentication to connect to HDFS. When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to HDFS. By default, Data Collector uses the user account who started it to connect.
The Kerberos principal and keytab are defined in the Data Collector configuration file, $SDC_CONF/sdc.properties. To use Kerberos authentication, configure all Kerberos properties in the Data Collector configuration file, and then enable Kerberos in the Hadoop FS destination.
For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
Data Collector can either use the currently logged in Data Collector user or a user configured in the destination to write to HDFS.
A Data Collector configuration property can be set that requires using the currently logged in Data Collector user. When this property is not set, you can specify a user in the origin. For more information about Hadoop impersonation and the Data Collector property, see Hadoop Impersonation Mode.
Note that the destination uses a different user account to connect to HDFS. By default, Data Collector uses the user account who started it to connect to external systems. When using Kerberos, Data Collector uses the Kerberos principal.
For more information, see the Hadoop documentation.
Configure a Hadoop FS destination to write data to HDFS.