Dataflow Triggers

Dataflow Triggers Overview

Dataflow triggers are instructions for the event framework to kick off tasks in response to events that occur in the pipeline. For example, you can use dataflow triggers to start a MapReduce job after the pipeline writes a file to HDFS. Or you might use a dataflow trigger to stop a pipeline after the JDBC Query Consumer origin processes all available data.

The event framework consist of the following components:
event generation
The event framework generates pipeline-related events and stage-related events. The framework generates pipeline events only when the pipeline starts and stops. The framework generates stage events when specific stage-related actions take place. The action that generates an event differs from stage to stage and is related to how the stage processes data.
For example, the Hive Metastore destination updates the Hive metastore, so it generates events each time it changes the metastore. In contrast, the Hadoop FS destination writes files to HDFS, so it generates events each time it closes a file.
Events produce event records. Pipeline-related event records are passed immediately to the specified event consumer. Stage-related event records are passed through the pipeline in an event stream.
task execution
To trigger a task, you need an executor. Executor stages perform tasks in Data Collector or external systems. Each time an executor receives an event, it performs the specified task.
For example, the Hive Query executor runs user-defined Hive or Impala queries each time it receives an event, and the MapReduce executor triggers a MapReduce job when it receives events. Within Data Collector, the Pipeline Finisher executor stops a pipeline upon receiving an event, transitioning the pipeline to a Finished state.
Not available in Data Collector Edge pipelines. Executors are not supported in Data Collector Edge pipelines.
event storage
To store event information, pass the event to a destination. The destination writes the event records to the destination system, just like any other data.
For example, you might store event records to keep an audit trail of the files that the pipeline origin reads.

Pipeline Event Generation

The event framework generates pipeline events in Data Collector standalone pipelines at specific points in the pipeline lifecycle. You can configure the pipeline properties to pass each event to an executor or to another pipeline for more complex processing.

Not available in Data Collector Edge pipelines.

The event framework generates the following pipeline-related events:
Pipeline Start
The pipeline start event is generated as the pipeline initializes, immediately after it starts and before individual stages are initialized. This can allow time for an executor to perform a task before stages initialize.
Most executors wait for confirmation that a task completes. As a result, the pipeline waits for the executor to complete the task before continuing with stage initialization. For example, if you configure the JDBC Query executor to truncate a table before the pipeline begins, the pipeline waits until the task is complete before processing any data.

The MapReduce executor and Spark executor kick off jobs and do not wait for the submitted jobs to complete. When you use one of these executors, the pipeline waits only for successful job submission before continuing with stage initialization.

If the executor fails to process the event, for example if a Hive Query Executor fails to execute the specified query or if the query fails, then the initialization phase fails and the pipeline does not start. Instead the pipeline transitions to a failure state.

Pipeline Stop
The pipeline stop event is generated as the pipeline stops, either manually, programmatically, or due to a failure. The stop event is generated after all stages have completed processing and cleaning up temporary resources, such as removing temporary files. This allows an executor to perform a task after pipeline processing is complete, before the pipeline fully stops.

Similar to start event consumers, the behavior of the executor that consumes the event determines whether the pipeline waits for the executor task to complete before allowing the pipeline to stop. Also, if the processing of the pipeline stop event fails for any reason, the pipeline transitions to a failed state even though the data processing was successful.

Pipeline events differ from stage events as follows:
  • Virtual processing - Unlike stage events, pipeline events are not processed by stages that you configure in the canvas. They are passed to an event consumer that you configure in the pipeline properties.

    The event consumer does not display in the pipeline’s canvas. As a result, pipeline events are also not visualized in data preview or pipeline monitoring.

  • Single-use events - You can configure only one event consumer for each event type within the pipeline properties: one for the Start event and one for the Stop event.

    When necessary, you can pass pipeline events to another pipeline. In the event consuming pipeline, you can include as many stages as you need for more complex processing.

For a solution that describes a couple ways to use pipeline events, see Offloading Data from Relational Sources to Hadoop.

Using Pipeline Events

You can configure pipeline events in standalone pipelines. When you configure a pipeline event, you can configure it to be consumed by an executor or another pipeline.

Pass an event to an executor when the executor can perform all of the tasks that you need. You can configure one executor for each event type.

Pass an event to another pipeline when you need to perform more complex tasks in the consuming pipeline, such as passing the event to multiple executors or to an executor and destination for storage.

Pass to an Executor

You can configure a pipeline to pass each event type to an executor stage. This allows you to trigger a task when the pipeline starts or stops. You configure the behavior for each event type separately. And you can discard any event that you do not want to use.

Note: If the specified executor fails to process the event, for example if a Shell executor fails to execute a script, the pipeline transitions to a failure state.
To pass a pipeline event to the executor, perform the following steps:
  1. In the pipeline properties, select the executor that you want to consume the event.
  2. In the pipeline properties, configure the executor to perform the task.
Example

Say you want to send an email when the pipeline starts. First, you configure the pipeline to use the Email executor for the pipeline start event. Since you don't need the Stop event, you can simply use the default discard option:

Then, also in the pipeline properties, you configure the Email executor. You can configure a condition for the email to be sent. If you omit the condition, the executor sends the email each time it receives an event:

Pass to Another Pipeline

Pass pipeline events to another pipeline to perform more complex processing than simply passing the event to a single consumer. The event-consuming pipeline must use the SDC RPC origin, then can include as many other stages as you require.

Note: When you pass a pipeline event to another pipeline, the event-consuming pipeline does not report processing failures back to the event-generating pipeline automatically. For example, if you pass a pipeline event to a pipeline that has an executor that fails to complete its task, the failure is not reported back to the event-generating pipeline.

To achieve the same behavior as passing to an executor, where a processing failure causes the event-generating pipeline to fail, configure the relevant stages to stop the event-consuming pipeline upon error. Upon error, the event-consuming pipeline then stops and passes the message back to the event-generating pipeline, which then transitions to a failure state.

For example, say you pass a pipeline event to a pipeline that routes the event to two executors. To ensure that the event-generating pipeline fails if either of the executors fail, configure the On Record Error property on the General tab of both executors, setting the property to Stop Pipeline.

This causes the event-consuming pipeline to stop on error, which causes the event-generating pipeline to transition to a failure state.

To pass an event to another pipeline, perform the following steps:
  1. Configure the pipeline to consume the event.
  2. Configure the event-generating pipeline to pass the event to the event-consuming pipeline, including details from the SDC RPC origin.
  3. Start the event-consuming pipeline before you start the event-generating pipeline.
Example

Say you want the Stop event to trigger a shell script that kicks off another process and a JDBC query. To do this, first configure the event-consuming pipeline. Use an SDC RPC origin and note the highlighted properties, because you will use them to configure the event-generating pipeline:

Then you configure the event-generating pipeline to pass the Stop event to your new pipeline. Note that if you don't need to use the Start event, you can simply use the default discard option:

Then you configure the Stop Event - Write to Another Pipeline properties, using the SDC RPC details from your event-consuming pipeline:

Stage Event Generation

You can configure certain stages to generate events. Event generation differs from stage to stage, based on the way the stage processes data. For details about each the event generation for each stage, see "Event Generation" in the stage documentation.

The following table lists event-generating stages and when they can generate events:
Stage Generates events when the stage...
Amazon S3 origin
  • Starts processing an object.
  • Completes processing an object.
  • Completes processing all available objects and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Azure Blob Storage origin
  • Starts processing an object.
  • Completes processing an object.
  • Completes processing all available objects and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Azure Data Lake Storage Gen1 origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Azure Data Lake Storage Gen2 origin
  • Starts processing an object.
  • Completes processing an object.
  • Completes processing all available objects and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Azure Data Lake Storage Gen2 (Legacy) origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Directory origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

File Tail origin
  • Starts processing a file.
  • Completes processing a file.

For more information, see Event Generation in the origin documentation.

Google BigQuery origin
  • Successfully completes a query.

For more information, see Event Generation in the origin documentation.

Google Cloud Storage origin
  • Completes processing all available objects and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

Groovy Scripting origin
  • Runs a script that generates events.

For more information, see Event Generation in the origin documentation.

Hadoop FS Standalone origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

JavaScript Scripting origin
  • Runs a script that generates events.

For more information, see Event Generation in the origin documentation.

JDBC Multitable Consumer origin
  • Completes processing the data returned by the queries for all tables.

For more information, see Event Generation in the origin documentation.

JDBC Query Consumer origin
  • Completes processing all data returned by a query.
  • Successfully completes a query.
  • Fails to complete a query.

For more information, see Event Generation in the origin documentation.

Jython Scripting origin
  • Runs a script that generates events.

For more information, see Event Generation in the origin documentation.

MapR FS Standalone origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

MongoDB origin
  • Completes processing all data returned by a query.

For more information, see Event Generation in the origin documentation.

MongoDB Atlas origin
  • Completes processing all data returned by a query.

For more information, see Event Generation in the origin documentation.

Oracle Bulkload origin
  • Completes processing data in a table.

For more information, see Event Generation in the origin documentation.

Oracle CDC origin
  • Pipeline starts
  • LogMiner session starts
  • Changes occur to monitored tables

For more information, see Event Generation in the origin documentation.

Oracle CDC Client origin
  • Reads DDL statements in the redo log.

For more information, see Event Generation in the origin documentation.

Salesforce origin
  • Completes processing all data returned by a query.

For more information, see Event Generation in the origin documentation.

Salesforce Bulk API 2.0 origin
  • Completes processing all data returned by a query.

For more information, see Event Generation in the origin documentation.

SAP HANA Query Consumer origin
  • Completes processing all data returned by a query.
  • Successfully completes a query.
  • Fails to complete a query.

For more information, see Event Generation in the origin documentation.

SFTP/FTP/FTPS Client origin
  • Starts processing a file.
  • Completes processing a file.
  • Completes processing all available files and the configured batch wait time has elapsed.

For more information, see Event Generation in the origin documentation.

SQL Server 2019 BDC Multitable Consumer origin
  • Completes processing the data returned by the queries for all tables.

For more information, see Event Generation in the origin documentation.

SQL Server CDC Client origin
  • Completes processing the data in the associated CDC tables.
  • When you enable checking for schema changes, the origin generates an event each time it detects a schema change.

For more information, see Event Generation in the origin documentation.

SQL Server Change Tracking origin
  • Completes processing the data in all specified change tracking tables.

For more information, see Event Generation in the origin documentation.

Teradata Consumer origin
  • Completes processing the data returned by the queries for all tables.

For more information, see Event Generation in the origin documentation.

Web Client origin
  • Finishes processing all available data.

For more information, see Event Generation in the origin documentation.

Windowing Aggregator processor
  • Performs aggregations, based on the configured window type and time window.

For more information, see Event Generation in the processor documentation.

Groovy Evaluator processor
  • Runs a script that generates events.

For more information, see Event Generation in the processor documentation.

JavaScript Evaluator processor
  • Runs a script that generates events.

For more information, see Event Generation in the processor documentation.

Jython Evaluator processor
  • Runs a script that generates events.

For more information, see Event Generation in the processor documentation.

TensorFlow Evaluator processor
  • Evaluates the entire batch at once.

For more information, see Event Generation in the processor documentation.

Web Client processor
  • Finishes processing all available data.

For more information, see Event Generation in the destination documentation.

Windowing Aggregator processor
  • Performs aggregations.

For more information, see Event Generation in the processor documentation.

Amazon S3 destination
  • Completes writing to an object.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Azure Blob Storage destination
  • Completes writing to a blob.

For more information, see Event Generation in the destination documentation.

Azure Data Lake Storage (Legacy) destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Azure Data Lake Storage Gen1 destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Azure Data Lake Storage Gen2 destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Google Cloud Storage destination
  • Completes writing to an object.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Hadoop FS destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Hive Metastore destination
  • Updates the Hive metastore by creating a table, adding columns, or creating a partition.
  • Generates and writes a new Avro schema file.

For more information, see Event Generation in the destination documentation.

Local FS destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

MapR FS destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

SFTP/FTP/FTPS Client destination
  • Closes a file.
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Snowflake File Uploader destination
  • Completes streaming a whole file.

For more information, see Event Generation in the destination documentation.

Web Client destination
  • Finishes processing all available data.

For more information, see Event Generation in the destination documentation.

ADLS Gen1 File Metadata executor
  • Changes file metadata, such as the file name, location, or permissions.
  • Creates an empty file.
  • Removes a file or directory.

For more information, see Event Generation in the executor documentation.

ADLS Gen2 File Metadata executor
  • Changes file metadata, such as the file name, location, or permissions.
  • Creates an empty file.
  • Removes a file or directory.

For more information, see Event Generation in the executor documentation.

Amazon S3 executor
  • Creates a new Amazon S3 object.
  • Copies an object another location.
  • Adds tags to an existing object.

For more information, see Event Generation in the executor documentation.

Databricks Job Launcher executor
  • Starts a Databricks job.

For more information, see Event Generation in the executor documentation.

Databricks Query executor
  • Determines that the submitted query completed successfully.
  • Determines that the submitted query failed to complete.

For more information, see Event Generation in the executor documentation.

Google BigQuery executor
  • Determines that the submitted query completed successfully.
  • Determines that the submitted query failed to complete.

For more information, see Event Generation in the executor documentation.

Google Cloud Storage executor
  • Creates a new Google Cloud Storage object.
  • Copies an object to another location.
  • Moves an object to another location.
  • Adds metadata to an existing object.
For more information, see Event Generation in the executor documentation.
HDFS File Metadata executor
  • Changes file metadata, such as the file name, location, or permissions.
  • Creates an empty file.
  • Removes a file or directory.

For more information, see Event Generation in the executor documentation.

Hive Query executor
  • Determines that the submitted query completed successfully.
  • Determines that the submitted query failed to complete.

For more information, see Event Generation in the executor documentation.

JDBC Query executor
  • Determines that the submitted query completed successfully.
  • Determines that the submitted query failed to complete.

For more information, see Event Generation in the executor documentation.

MapR FS File Metadata executor
  • Changes file metadata, such as the file name, location, or permissions.
  • Creates an empty file.
  • Removes a file or directory.

For more information, see Event Generation in the executor documentation.

MapReduce executor
  • Starts a MapReduce job.

For more information, see Event Generation in the executor documentation.

Snowflake executor
  • Determines that the submitted query completed successfully.
  • Determines that the submitted query failed to complete.

For more information, see Event Generation in the executor documentation.

Spark executor
  • Starts a Spark application.

For more information, see Event Generation in the executor documentation.

Using Stage Events

You can use stage-related events in any way that suits your needs. When configuring the event stream for stage events, you can add additional stages to the stream. For example, you might use a Stream Selector to route different types of events to different executors. But you cannot merge the event stream with a data stream.

There are two general types of event streams that you might create:
  • Task execution streams that route events to an executor to perform a task.

    Not available in Data Collector Edge pipelines. Executors are not supported in Data Collector Edge pipelines.

  • Event storage streams that route events to a destination to store event information.
You can, of course, configure an event stream that performs both tasks by routing event records to both an executor and a destination. You can also configure event streams to route data to multiple executors and destinations, as needed.

Task Execution Streams

A task execution stream routes event records from the event-generating stage to an executor stage. The executor performs a task each time it receives an event record.

In Data Collector Edge pipelines, you cannot use task execution streams. Executors are not supported in Data Collector Edge pipelines.

For example, you have a pipeline that reads from Kafka and writes files to HDFS:

When Hadoop FS closes a file, you would like the file moved to a different directory and the file permissions changed to read-only.

Leaving the rest of the pipeline as is, you can enable event handling in the Hadoop FS destination, connect it to the HDFS File Metadata executor, and configure the HDFS File Metadata executor to files and change permissions. The resulting pipeline looks like this:

If you needed to set permissions differently based on the file name or location, you could use a Stream Selector to route the event records accordingly, then use two HDFS File Metadata executors to alter file permissions, as follows:

Event Storage Streams

An event storage stream routes event records from the event-generating stage to a destination. The destination writes the event record to a destination system.

Event records include information about the event in record header attributes and record fields. You can add processors to the event stream to enrich the event record before writing it to the destination.

For example, you have a pipeline that uses the Directory origin to process weblogs:

Directory generates event records each time it starts and completes reading a file, and the event record includes a field with the file path of the file. For auditing purposes, you'd like to write this information to a database table.

Leaving the rest of the pipeline as is, you can enable event handling for the Directory origin and simply connect it to the JDBC Producer as follows:

But you want to know when events occur. The Directory event record stores the event creation time in the sdc.event.creation_timestamp record header attribute. So you can use an Expression Evaluator with the following expression to add the creation date and time to the record:
${record:attribute('sdc.event.creation_timestamp')}
And if you have multiple pipelines writing events to the same location, you can use the following expression to include the pipeline name in the event record as well:
${pipeline:name()}

The Expression Evaluator and the final pipeline looks like this:

Executors

Executors perform tasks when they receive event records.

Not available in Data Collector Edge pipelines. Executors are not supported in Data Collector Edge pipelines.

You can use the following executor stages for event handling:
ADLS Gen1 File Metadata executor
Changes file metadata, creates an empty file, or removes a file or directory in Azure Data Lake Storage Gen1 upon receiving an event.
When changing file metadata, the executor can rename and move files in addition to specifying the owner and group, and updating permissions and ACLs for files. When creating an empty file, the executor can specify the owner and group, and set permissions and ACLs for the file. When removing files and directories, the executor performs the task recursively.
You can use the executor in any logical way, such as changing permissions after an Azure Data Lake Storage Gen1 destination closes a file.
ADLS Gen2 File Metadata executor
Changes file metadata, creates an empty file, or removes a file or directory in Azure Data Lake Storage Gen2 upon receiving an event.
When changing file metadata, the executor can rename and move files in addition to specifying the owner and group, and updating permissions and ACLs for files. When creating an empty file, the executor can specify the owner and group, and set permissions and ACLs for the file. When removing files and directories, the executor performs the task recursively.
You can use the executor in any logical way, such as moving a file after an Azure Data Lake Storage Gen2 destination closes it.
Amazon S3 executor
Creates new Amazon S3 objects for the specified content, copies objects within a bucket, or adds tags to existing Amazon S3 objects upon receiving an event.
You can use the executor in any logical way, such as writing information from an event record to a new S3 object, or copying or tagging objects after they are written by the Amazon S3 destination.
Databricks Job Launcher executor
Starts a Databricks job for each event.
You can use the executor in any logical way, such as running Databricks jobs after the Hadoop FS, MapR FS, or Amazon S3 destination closes files.
Databricks Query executor
Runs a Spark SQL query on Databricks upon receiving an event.
You can use the executor in any logical way.
Email executor
Sends a custom email to the configured recipients upon receiving an event. You can optionally configure a condition that determines when to send the email.
You can use the executor in any logical way, such as sending an email each time the Azure Data Lake Storage destination completes streaming a whole file.
For a solution that describes how to use the Email executor, see Sending Email During Pipeline Processing.
Google BigQuery executor
Runs one or more SQL queries on Google BigQuery upon receiving an event.
You can use the executor in any logical way, such as running a SQL query each time a Google BigQuery origin successfully completes a query.
Google Cloud Storage executor
Creates new Google Cloud Storage objects for the specified content, copies or moves objects within a project, or adds metadata to existing objects upon receiving an event.
You can use the executor in any logical way, such as moving objects after they are read by the Google Cloud Storage origin or adding metadata to objects after they are written by the Google Cloud Storage destination.
Hive Query executor
Executes user-defined Hive or Impala queries for each event.
You can use the executor in any logical way, such as running Hive or Impala queries after the Hive Metadata destination updates the Hive metastore, or after the Hadoop FS or MapR FS destination closes files.
For example, you might use the Hive Query executor as part of the Drift Synchronization Solution for Hive if you read data with Impala. Impala requires you to run the Invalidate Metadata command when the table structure or data changes.
Instead of trying to time this action manually, you can use the Hive Query executor to submit the command automatically each time the Hive Metastore destination changes the structure of a table and each time the Hadoop FS destination closes a file.

For a solution that describes how to use the Hive Query executor, see Automating Impala Metadata Updates for Drift Synchronization for Hive.

HDFS File Metadata executor
Changes file metadata, creates an empty file, or removes a file or directory in HDFS or a local file system upon receiving an event.
When changing file metadata, the executor can rename and move files in addition to specifying the owner and group, and updating permissions and ACLs for files. When creating an empty file, the executor can specify the owner and group, and set permissions and ACLs for the file. When removing files and directories, the executor performs the task recursively.
You can use the executor in any logical way, such as changing file metadata after receiving file closure events from the Hadoop FS or Local FS destinations.
For a solution that describes how to use the HDFS File Metadata executor, see Managing Output Files.
Pipeline Finisher executor
Stops the pipeline when it receives an event, transitioning the pipeline to a Finished state. Allows the pipeline to complete all expected processing before stopping.
You can use the Pipeline Finisher executor in any logical way, such as stopping a pipeline upon receiving a no-more-data event from the JDBC Query Consumer origin. This enables you to achieve "batch" processing - stopping the pipeline when all available data is processed rather than leaving the pipeline to sit idle indefinitely.
For example, you might use the Pipeline Finisher executor with the JDBC Multitable Consumer to stop the pipeline when it processes all queried data in the specified tables.
For a solution that describes how to use the Pipeline Finisher executor, see Stopping a Pipeline After Processing All Available Data.
JDBC Query executor
Connects to a database using JDBC and runs one or more specified SQL queries.
Use to run SQL queries on a database after an event occurs.
MapR FS File Metadata executor
Changes file metadata, creates an empty file, or removes a file or directory in MapR FS upon receiving an event.
When changing file metadata, the executor can rename and move files in addition to specifying the owner and group, and updating permissions and ACLs for files. When creating an empty file, the executor can specify the owner and group, and set permissions and ACLs for the file. When removing files and directories, the executor performs the task recursively.
You can use the executor in any logical way, such as creating an empty file after the MapR FS destination closes a file.
MapReduce executor
Connects to HDFS or MapR FS and starts a MapReduce job for each event.
You can use the executor in any logical way, such as running MapReduce jobs after the Hadoop FS or MapR FS destination closes files. For example, you might use the MapReduce executor with the Hadoop FS destination to convert Avro files to Parquet when Hadoop FS closes a file.
For a solution that describes how to use the MapReduce executor, see Converting Data to the Parquet Data Format.
SFTP/FTP/FTPS Client executor
Connects to an SFTP, FTP, or FTPS server and moves or removes a file upon receiving an event.
You can use the executor in any logical way, such as moving a file after the SFTP/FTP/FTPS Client origin finishes reading it.
Shell executor
Executes a user-defined shell script for each event.
You can use the executor in any logical way.
Snowflake executor
Loads whole files staged by the Snowflake File Uploader destination to Snowflake tables.
For details on how to use the executor, see Snowflake File Uploader and Executor Pipelines.
Spark executor
Connects to Spark on YARN and starts a Spark application for each event.
You can use the executor in any logical way, such as running Spark applications after the Hadoop FS, MapR FS, or Amazon S3 destination closes files. For example, you might use the Spark executor with the Hadoop FS destination to convert Avro files to Parquet when Hadoop FS closes a file.

Logical Pairings

You can use events in any way that works for your needs. The following tables outline some logical pairings of event generation with executors and destinations.

Pipeline Events

Pipeline Event Type Event Consumer
Pipeline Start
  • Any single executor, except Pipeline Finisher.
  • Another pipeline for additional processing.
Pipeline Stop
  • Any single executor, except Pipeline Finisher.
  • Another pipeline for additional processing.

Origin Events

Event Generating Origin Event Consumer
Amazon S3
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all objects.
  • Email executor to send email when the origin completes processing available objects.
  • Any destination for event storage.
Azure Blob Storage
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all objects.
  • Email executor to send email when the origin completes processing available objects.
  • Any destination for event storage.
Azure Data Lake Storage Gen1
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Azure Data Lake Storage Gen2
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all objects.
  • Email executor to send email after the origin completes processing available objects.
  • Any destination for event storage.
Azure Data Lake Storage Gen2 (Legacy)
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Directory
  • Email executor to send email each time the origin starts or completes processing a file.
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • Any destination for event storage.
File Tail
  • Email executor to send email each time the origin starts or completes processing a file.
  • Any destination for event storage.
Google BigQuery
  • Email executor to send email each time the origin successfully completes a query.
  • Google BigQuery executor to run SQL queries each time the origin successfully completes a query.
  • Any destination for event storage.
Google Cloud Storage
  • Google Cloud Storage executor to perform a task after reading an object.
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all objects.
  • Email executor to send email when the origin completes processing available objects.
  • Any destination for event storage.
Hadoop FS Standalone
  • Email executor to send email each time the origin starts or completes processing a file.
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • Any destination for event storage.
JDBC Multitable Consumer
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all tables.
  • Email executor to send email when the origin completes processing all data returned by queries.
  • Any destination for event storage.
JDBC Query Consumer
  • Route the no-more-data event to the Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email each time the origin successfully completes a query, fails to complete a query, or completes processing all available data.
  • Any destination for event storage.
MapR FS Standalone
  • Email executor to send email each time the origin starts or completes processing a file.
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • Any destination for event storage.
MongoDB
  • Route the no-more-data event to the Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email when the origin completes processing available objects.
  • Any destination for event storage.
MongoDB Atlas
  • Route the no-more-data event to the Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email when the origin completes processing available data.
  • Any destination for event storage.
Oracle Bulkload
  • Email executor to send email after it completes reading data in a table.
  • Any destination for event storage.
Oracle CDC
  • Email executor to send email for each event.
  • Any destination for event storage.
Oracle CDC Client
  • Email executor to send email each time it reads DDL statements in the redo logs.
  • Any destination for event storage.
Salesforce
  • Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email when the origin completes processing all data returned by a query.
  • Any destination for event storage.
Salesforce Bulk API 2.0
  • Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email when the origin completes processing all data returned by a query.
  • Any destination for event storage.
SAP HANA Query Consumer
  • Route the no-more-data event to the Pipeline Finisher executor to stop the pipeline after processing queried data.
  • Email executor to send email each time the origin successfully completes a query, fails to complete a query, or completes processing all available data.
  • Any destination for event storage.
SFTP/FTP/FTPS Client
  • Email executor to send email each time the origin starts or completes processing a file.
  • Pipeline Finisher executor to stop the pipeline after processing all available files.
  • SFTP/FTP/FTPS Client executor to move or remove a processed file.
  • Any destination for event storage.
SQL Server 2019 BDC Multitable Consumer
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all tables.
  • Email executor to send email when the origin completes processing all data returned by queries.
  • Any destination for event storage.
SQL Server Change Tracking
  • Pipeline Finisher executor to stop the pipeline after processing available data.
Teradata Consumer
  • Pipeline Finisher executor to stop the pipeline after processing queried data from all tables.
  • Email executor to send email when the origin completes processing all data returned by queries.
  • Any destination for event storage.
Web Client
  • Pipeline Finisher executor to stop the pipeline after processing all available data.
  • Email executor to send email when the origin completes processing all data returned by queries.
  • Any destination for event storage.

Processor Events

Event Generating Processor Event Consumer
Windowing Aggregator
  • Email executor for notification when results pass specified thresholds.
  • Any destination for event storage.
Groovy Evaluator
  • Any logical executor.
  • Any destination for event storage.
JavaScript Evaluator
  • Any logical executor.
  • Any destination for event storage.
Jython Evaluator
  • Any logical executor.
  • Any destination for event storage.
TensorFlow Evaluator
  • Any destination for event storage.
Web Client
  • Email executor to send email after processing all available data.
  • Any destination for event storage.

Destination Events

Event Generating Destination Event Consumer
Amazon S3
  • Amazon S3 executor to create or copy objects or to add tags to closed objects.
  • Databricks Job Launcher executor to run a Databricks application after closing an object or whole file.
  • Databricks Delta Lake executor to copy or merge data from Amazon S3 to a Delta Lake table.
  • Spark executor to run a Spark application after closing an object or whole file.
  • Email executor to send email after closing an object or whole file.
  • Any destination for event storage.
Azure Blob Storage
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Azure Data Lake Storage (Legacy)
  • ADLS Gen1 executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Azure Data Lake Storage Gen1
  • ADLS Gen1 executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Azure Data Lake Storage Gen2
  • ADLS Gen2 executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • Databricks Delta Lake executor to copy or merge data from Azure Data Lake Storage Gen2 to a Delta Lake table.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Google Cloud Storage
  • Databricks Job Launcher executor to run a Databricks job after closing an object or whole file.
  • Google Cloud Storage executor to perform a task after writing an object or whole file.
  • Spark executor to run a Spark application after closing an object or whole file.
  • Email executor to send email after closing an object or whole file.
  • Any destination for event storage.
Hadoop FS
  • HDFS File Metadata executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • Hive Query executor to run Hive or Impala queries after closing a file.

    Particularly useful when using the Drift Synchronization Solution for Hive with Impala.

  • MapReduce executor to run a MapReduce job after closing a file.
  • Databricks Job Launcher executor to run a Databricks job after closing a file.
  • Spark executor to run a Spark application after closing a file.
  • Email executor to send email after closing a file or streaming a whole file.
  • Any destination for event storage.
Hive Metastore
  • Hive Query executor to run Hive or Impala queries after the destination changes table structures.

    Particularly useful when using the Drift Synchronization Solution for Hive with Impala.

  • HDFS File Metadata executor to change file metadata, create an empty file, or remove a file or directory after writing an Avro schema file.
  • Email executor to send email each time the destination changes the Hive metastore.
  • Any destination for event storage.
Local FS
  • HDFS File Metadata executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • Email executor to send email after the destination closes a file or streams a whole file.
  • Any destination for event storage.
MapR FS
  • MapR FS File Metadata executor to change file metadata, create an empty file, or remove a file or directory after closing a file.
  • MapReduce executor to run a MapReduce job after closing a file.
  • Databricks Job Launcher executor to run a Spark application after closing a file.
  • Spark executor to run a Spark application after closing a file.
  • Email executor to send email each time the destination closes a file or streams a whole file.
  • Any destination for event storage.
SFTP/FTP/FTPS Client
  • Email executor to send email each time the destination closes a file or streams a whole file.
  • SFTP/FTP/FTPS Client executor to move a closed file.
  • Any destination for event storage.
Snowflake File Uploader
  • Snowflake executor to run queries to load data to Snowflake tables each time the destination streams a whole file.
  • Email executor to send email each time the destination streams a whole file.
  • Any destination for event storage.

Executor Events

Event Generating Executor Event Consumer
ADLS Gen1 File Metadata executor
  • Email executor to send email each time the executor changes file metadata.
  • Any destination for event storage.
ADLS Gen2 File Metadata executor
  • Email executor to send email each time the executor changes file metadata.
  • Any destination for event storage.
Amazon S3
  • Email executor to send email each time the executor changes object metadata.
  • Any destination for event storage.
Databricks Job Launcher executor
  • Email executor to send email each time the Databricks Job Launcher executor starts a Databricks job.
  • Any destination for event storage.
Databricks Query executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
Google BigQuery executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
Google Cloud Storage executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
HDFS File Metadata executor
  • Email executor to send email each time the executor changes file metadata.
  • Any destination for event storage.
Hive Query executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
JDBC Query executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
MapR FS File Metadata executor
  • Email executor to send email each time the executor changes file metadata.
  • Any destination for event storage.
MapReduce executor
  • Email executor to send email each time the executor starts a MapReduce job.
  • Any destination for event storage.
Snowflake executor
  • Email executor to send email when a query succeeds or fails.
  • Any destination for event storage.
Spark executor
  • Email executor to send email each time the Spark executor starts a Spark application.
  • Any destination for event storage.

Event Records

Event records are records that are created when a stage or pipeline event occurs.

Most event records pass general event information in record headers, such as when the event occurred. They also can include event-specific details in record fields, like the name and location of the output file that was closed.

Event records generated by the File Tail origin are the exception - they include all event information in record fields.

Event Record Header Attributes

In addition to the standard record header attributes, most event records include record header attributes for event information such as the event type and when the event occurred.

As with any record header attribute, you can use the Expression Evaluator and the record:attribute function to include record header attribute information as a field in the record. For example, when storing event records, you most likely want to include the time of the event in the event record, using the following expression in an Expression Evaluator:
${record:attribute('sdc.event.creation_timestamp')}

Note that all record header attributes are String values. For more information about working with record header attributes, see Record Header Attributes.

Most events include the following event record header attributes. The exception, File Tail, writes all of the event information to record fields.
Event Record Header Attribute Description
sdc.event.type Event type. Defined by the stage that generates the event.

For information about the event types available for an event generating stage, see the stage documentation.

sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
Note: Stage-generated event records differ from stage to stage. For a description of stage events, see "Event Record" in the documentation for the event-generating stage. For a description of pipeline events, see Pipeline Event Records.

Viewing Events in Data Preview, Snapshot, and Monitor Mode

When generated, stage events display in data preview, Monitor mode, and snapshot as event records. Once a record leaves the event-generating stage, it is treated like a standard record.

Pipeline events generated by the event framework do not display in data preview or Monitor mode. However, you can enable data preview to generate and process pipeline events.

Viewing Stage Events in Data Preview and Snapshot

In data preview and when reviewing snapshots of data, stage-related event records display in the event-generating stage marked as "event records," and they appear below the batch of standard records.

After leaving the stage, the record displays like any other record.

For example, the Directory origin below generates an event record as it starts reading a file for data preview:

When you select the Local FS destination where the event record is written, you see that the same event record no longer displays the event record label. It is treated like any other record:

Viewing Stage Events in Monitor Mode

In Monitor mode, the event-generating stage provides statistics about stage-related event records. Once the event records leave the stage, Monitor mode treats event records like any other record.

For example, when you run and monitor the pipeline featured above, the Directory origin information displays event records in its statistics:

Notice, in the Record Throughput chart, that you can hover over graphics to get the exact number of records that they represent.

And when you select the Local FS destination where the event record is written, Monitor mode displays statistics for the records written to the destination. At this point, the event records are treated like any other record:

Executing Pipeline Events in Data Preview

You can enable pipeline event execution in data preview to test event processing.

When you enable pipeline event generation while previewing data, the event framework generates the pipeline start event when you start data preview and the stop event when you stop data preview. If you configured the pipeline to pass the events to executors or to event-consuming pipelines, the events are passed and trigger the additional processing.

To enable generating pipeline event execution during data preview, use the Enable Pipeline Execution data preview property.

Summary

Here are the key points about dataflow triggers and the event framework:
  1. You can use the event framework in any pipeline where the logic suits your needs.
  2. The event framework generates pipeline-related events and stage-related events.
  3. You can use pipeline events in standalone pipelines.
  4. Pipeline events are generated when the pipeline starts and stops. For details, see Pipeline Event Generation.

    Not available in Data Collector Edge pipelines.

  5. You can configure each pipeline event type to pass to a single executor or to another pipeline for more complex processing.

    Not available in Data Collector Edge pipelines.

  6. Stage events are generated based on the processing logic of the stage. For a list of event-generating stages, see Stage Event Generation.
  7. Events generate event records to pass relevant information regarding the event, such as the path to the file that was closed.

    Stage-generated event records differ from stage to stage. For a description of stage events, see "Event Record" in the documentation for the event-generating stage. For a description of pipeline events, see Pipeline Event Records.

  8. In the simplest use case, you can route stage event records to a destination to save event information.
  9. You can route stage event records to an executor stage so it can perform a task upon receiving an event.

    For a list of logical event generation and executor pairings, see Logical Pairings.

    Not available in Data Collector Edge pipelines. Executors are not supported in Data Collector Edge pipelines.

  10. You can add processors to event streams for stage events or to consuming pipelines for pipeline events.

    For example, you might add an Expression Evaluator to add the event generation time to an event record before writing it to a destination. Or, you might use a Stream Selector to route different types of event records to different executors.

  11. When working with stage events, you cannot merge event streams with data streams.
  12. You can use the Dev Data Generator and To Event development stages to generate events for pipeline development and testing. For more information about the development stages, see Development Stages.
  13. In data preview and in Monitor mode, stage-generated event records display separately in the event-generating stage. Afterwards, they are treated like any standard record.
  14. You can configure data preview to generate and execute pipeline events.

For examples of how you might use the event framework, see the case studies earlier in this chapter.