Hive

The Hive destination writes files of a specified file format to a Hive table. Hive is a transactional storage layer that works on top of Hadoop Distributed File System (HDFS). Hive stores files in tables on HDFS.

By default, the destination writes to Hive using connection information stored in Hive configuration files on the Transformer machine. You can alternatively specify the location of an external Hive metastore where the configuration information is stored.

The destination can write to a new or existing Hive table. If the table doesn't exist, the destination creates the table. The destination can create a managed internal table or an external table. If the table exists, the destination can either append data to the table, overwrite all existing data, or overwrite related partitions in the table.

When you configure the Hive destination, you specify the schema and table to write to. You configure the file format of the data and the write mode to use. You can also configure table columns to partition the data by.

You can enable data drift handling, which allows the destination to automatically compensate for new or missing data in pipeline records. When needed, you can specify URIs for an external Hive metastore where configuration information is stored. You can also specify the type of table to create if the table does not exist.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel.

When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline. Spark uses these partitions for the rest of the pipeline processing, unless a processor causes Spark to shuffle the data.

When writing files to Hive, you can configure the Hive destination to partition the data by columns.

Partitioning by columns is particularly useful when the destination writes to a Hive table used by data scientists who run analytic queries on the data. To increase the analytics performance, partition the data by the columns that the data scientists frequently include in the WHERE or GROUP BY clauses in their queries. For example, if the data scientists plan to run queries where the date column is a certain value, sending all records with the same date value to the same output file and subdirectory improves the efficiency of those queries.

The destination handles the configured partition columns differently, based on whether the destination writes to a new table or an existing table:
Writes to a new table

When the Hive destination writes to a new table and partition columns are not defined in stage properties, the destination uses the same number of partitions that Spark uses to process the upstream pipeline stages. The destination randomly redistributes the data to balance the data across the partitions, and then writes one output file for each partition to the specified table path. For example, if Spark splits the pipeline data into 20 partitions, the destination writes 20 output files to the specified table path.

When the destination writes to a new table and partition columns are defined in stage properties, the destination redistributes the data by the specified column, placing records with the same value for the specified column in the same partition. The destination creates a single file for each partition, writing each file to a subfolder within the table path.

For example, let's say that you configure the Hive destination to write to the table path /hive/orders and to partition the data by the month column. The destination creates 12 partitions, placing all records where month is january in one partition, all records where month is february in the next partition, and so on. The destination creates the following subdirectories and writes a single file to each subdirectory:
/hive/orders/january
/hive/orders/february
...
/hive/orders/december
Writes to an existing table
When the Hive destination writes to an existing table and partition columns are not defined in stage properties, the destination automatically uses the same partitioning as the existing table.
When the destination writes to an existing table with partition columns defined in stage properties, the destination writes based on how the Write Mode stage property is defined:
  • When set to Append rows to existing table or Overwrite related partitions in existing table, the destination uses existing partition columns in the table, ignoring any partition columns defined in stage properties.
  • When set to Overwrite complete existing table, the destination deletes all data in the table and writes to the table using partition columns defined in stage properties.

File Formats

The Hive destination can write files of the following formats to a Hive table:
  • Avro
  • ORC
  • Parquet
  • Record Columnar
  • Sequence
  • Text

Hive determines the exact format and file name of the output files. For more information, see the Hive documentation.

Data Drift Handling

You can enable the Hive destination to automatically compensate for data drift. Data drift occurs when the structure of data changes, such as the appearance of a new field or the absence of expected data.

When handling data drift, the destination updates the table schema as needed, allowing the destination to write records to the table even when the record includes unexpected data. If you prefer to retain the existing strict table schema, do not enable data drift handling. When you do not enable data drift handling, records that do not match the Hive table schema cause the pipeline to stop.

When you enable data drift handling, the destination performs the following tasks:
Create new columns for new fields
For any new field in pipeline data, the destination adds a nullable column to the table schema and populates the column with nulls for all existing rows. A field is considered new if the column of the same name does not exist.
For example, if a record includes a productId field that does not exist in the table schema, the destination creates a new productId column.
Add null values for missing data
For any table columns that do not have matching fields in pipeline data, the destination adds null values for those columns.
For example, if a record does not include a productName field expected by the table schema, the destination writes the record to the table with a null value in the productName column.

To enable data drift handling, select the Enable Data Drift property on the Hive tab.

Column Order

When you enable data drift handling, the Hive destination orders table columns as follows:
  • New columns are added to the right of existing columns.
  • Partition columns appear to the right of all other columns.

The resulting column order is as follows:

<existing columns> <new columns created by drift handling> <partition columns>

Additional Hive Configuration Properties

When needed, you can pass additional Hive configuration properties to Hive as additional Spark configuration properties. Simply define Extra Spark Configuration properties on the Cluster tab of the pipeline properties panel.

Hive configuration properties defined in the pipeline override those defined in Hive configuration files.

Configuring a Hive Destination

Use the Hive destination to write files to a Hive table.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
  2. On the Hive tab, configure the following properties:
    Hive Property Description
    Schema Schema where the Hive table is located.
    Table Hive table to write to.

    If the table does not exist, the destination creates the table. By default, the destination creates a managed internal table. You can specify the type of table to create with the Table Type property on the Advanced tab.

    Hive Metastore URIs Optional comma-separated list of URIs to an external Hive metastore that contains the Hive connection information to use.
    Use the following URI format:
    thrift://<metastore-host>:<metastore:port>
    When not defined, the destination uses connection information defined in the hive-site.xml and core-site.xml Hive configuration files on the Transformer machine.
    Partition Columns List of table columns to partition the data by.

    When writing to a new table or when used with the Overwrite Complete Existing Table write mode, the destination partitions the data so that records with the same value for the specified columns are in the same partition.

    When using other write modes, this property is ignored.

    Click the Add icon to specify another column to partition by.

    Enable Data Drift Automatically compensate for new or missing data in pipeline records. Enables adding new columns to the Hive table schema and populating missing columns with null values.
    Write Mode Mode to write to an existing Hive table:
    • Append data to existing table - Appends new data to the existing table.
    • Overwrite related partitions in existing table - Replaces all data in a partition before writing new data to the partition.

      To use this option, you must add an extra Spark configuration property named spark.sql.sources.partitionOverwriteMode, and set it to dynamic. Define extra Spark configuration properties on the Cluster tab of the pipeline properties panel.

    • Overwrite complete existing table - Replaces all data in the existing table with new data.

    By default, the destination appends data to existing tables.

    File Format Format of the data. Select one of the following options:
    • Avro
    • ORC
    • Parquet
    • Record Columnar
    • Sequence
    • Text
  3. On the Advanced tab, optionally configure the following properties:
    Advanced Property Description
    Table Type Type of table to create if the table does not exist:
    • Managed (Internal) - Creates a managed table if the specified table does not exist.
    • External - Creates an external table if the specified table does not exist.

    For more information about managed and external tables, see the Hive documentation.

    External Table Location Directory path to the location for the external table to be created.
    Additional Configuration

    Additional HDFS configuration properties to use. Specified properties override those in Hadoop configuration files.

    To add properties, click the Add icon and define the HDFS property name and value. You can use simple or bulk edit mode to configure the properties. Use the property names and values as expected by your version of Hadoop.

    Tip: You can specify additional Hive configuration properties in the pipeline properties.