Azure IoT/Event Hub Consumer

Supported pipeline types:
  • Data Collector

The Azure IoT/Event Hub Consumer origin reads data from Microsoft Azure Event Hub. The origin can use multiple threads to enable parallel processing of data from a single Azure event hub. For information about supported versions, see Supported Systems and Versions.

Before you use the Azure IoT/Event Hub Consumer origin, make sure you have the required Microsoft Azure storage account and container.

When you configure the Azure IoT/Event Hub Consumer, you specify the Microsoft Azure namespace and event hub names. You also define the shared access policy name and connection string key. You specify the consumer group to use and an event processor prefix that the origin uses when communicating with Azure Event Hub.

You configure the storage account details, such as the storage account name and key. And you specify the number of threads to use during processing.

Storage Account and Container Prerequisite

Before you use the Azure IoT/Event Hub Consumer origin, you need a Microsoft Azure storage account and at least one container.

The origin stores offsets in a storage account container, so to ensure the integrity of offset information, you must use a different container for each pipeline that includes an Azure IoT/Event Hub Consumer origin.

For example, say you use the Azure IoT/Event Hub Consumer as the origin for an IoT pipeline and a Transactions pipeline. To keep the offset data for these pipelines separate, you need to use two different storage account containers. They can be in the same storage account or in different storage accounts. When you configure the origins, you specify the storage account and container to use.

To create a new container for the pipeline:
  1. Log into the Microsoft Azure portal: https://portal.azure.com
  2. In the Navigation panel, click Storage Accounts.
  3. Select the storage account to use.

    If you need to create a storage account, click the Add icon. Enter a name for the storage account, and enter or select a resource group name. You can use the defaults for all other properties.

  4. In the storage account view, click + Container to create a container.
  5. Enter a container name, and click OK.
    Tip: Use a name that can be easily identified as the container for the pipeline that you want to use it in.

If these steps are no longer accurate, see the Microsoft Azure Event Hub documentation.

Resetting the Origin in Event Hub

You cannot use Data Collector to reset the origin for Azure IoT/Event Hub Consumer pipelines because the offset is stored in Azure Event Hub.

To reset the origin in Microsoft Azure Event Hub:
  1. In the Microsoft Azure portal, navigate to the storage account.
  2. To delete the offset information stored for the pipeline, delete the container that the pipeline uses.

    This can take some time. Allow the portal to complete the removal of the container before continuing.

  3. To enable the pipeline to store new offset information when you restart the pipeline, create a new container with the same name. Or, use a different name and update the Container Name property in the pipeline.

Multithreaded Processing

The Azure IoT/Event Hub Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline.

The Azure IoT/Event Hub Consumer origin uses multiple concurrent threads to read from an event hub based on the Max Threads property. When you start the pipeline, the origin creates the number of threads specified in the Max Threads property. Each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For example, say you set the Max Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read order.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Data Formats

The Azure IoT/Event Hub Consumer origin reads data from Microsoft Azure Event Hub based on the data format that you select. You can use the following data formats:
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.

Configuring an Azure IoT/Event Hub Consumer

Configure an Azure IoT/Event Hub Consumer origin to write data to Microsoft Azure Event Hub. Be sure to complete the necessary prerequisites before you configure the origin.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Event Hub tab, configure the following properties:
    Event Hub Property Description
    Namespace Name The name of the namespace that contains the event hub that you want to use.
    Event Hub Name The event hub name.
    Shared Access Policy Name The policy name associated with the namespace.

    To retrieve the policy name, when logged into the Azure portal, navigate to your namespace and event hub, and then click Shared Access Policies for a list of policies.

    When appropriate, you can use the default shared access key policy, RootManageSharedAccessKey.

    Connection String Key One of the connection string keys associated with the specified shared access policy.

    To retrieve a connection string key, after accessing the list of shared access policies, click the policy name, and then copy the Connection String - Primary Key value.

    The value typically begins with "Endpoint".

    Consumer Group Consumer group to use. Enter a consumer group associated with the specified event hub.

    You can use the default consumer group, $Default.

    To view a list of available consumer groups, when viewing the event hub in the Azure portal, click Consumer Groups.

    Event Processor Prefix A prefix to identify the pipeline. Use a different prefix for each pipeline that includes the origin.

    Used to communicate with Azure Event Hub.

    Storage Account Name Name of the storage account to use.

    For information about creating a storage account, see Storage Account and Container Prerequisite.

    Storage Account Key One of the keys for the storage account.

    To retrieve the storage account key, when viewing the storage account details in the Azure portal, click Access Keys. Then copy one of the default key values.

    Container Name The name of the container to use for the pipeline.

    For information about creating a container, see Storage Account and Container Prerequisite.

    Max Threads Number of threads the origin generates and uses for multithreaded processing.
  3. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Data Formats Format of data to be written. Use one of the following options:
  4. For binary data, on the Data Format tab, configure the following properties:
    Binary Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  5. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Type of JSON content. Use one of the following options:
    • JSON array of objects
    • Multiple JSON objects
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  6. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.