Jython Scripting
Supported pipeline types:
|
- Create threads if supporting multithreaded processing
- Create batches
- Create records
- Add the records to a batch
- Process the batch
- Stop when the pipeline stops
The script must handle all necessary processing, such as generating events, sending errors for handling, and stopping when users stop the pipeline or when there is no more data. You can call external Java code from the script.
To handle restarts, the script must maintain an offset to track where the origin stopped and should restart. For the offset, the script requires a key, called an entity, associated with a unique value. For multithreaded processing, the entity must identify the partition of data processed by each thread. The method that processes batches saves an offset value for each entity.
For example, suppose your script processes data about U.S. states, using an API to read
data with a URL of the form ../<state>&page=<number>
. In the
script, each thread reads data from one state until finished with that state. You can
set the entity to the state and the offset to the page number.
You can reset the origin to process all available data.
The origin provides extensive sample code that you can use to develop your script.
When configuring the origin, you enter the script and the inputs required, including the batch size and number of threads, along with any script parameters used in the script.
Scripting Objects
- record
- An object that contains fields and values to
process. Create new
record
objects with thesdc.createRecord(<String record ID>)
method. The methods available to the object depend on the origin configuration. You can configure the record type to be native objects or Data Collector records. - batch
- An object that collects records to process together. Create new
batch
objects with thesdc.createBatch()
method. The object includes the following methods:add(<record>)
- Appends a record to the batch.add(<record[]>)
- Appends a list of records to the batch.addError(<record>,<String message>)
- Appends an error record to the batch. The appended error record contains the associated error message.addEvent(<event record>)
- Appends an event to the batch. Verify that the stage enables event generation before implementing event methods.size()
- Returns the number of records in the batch.process(<String entity>, <String offset>)
- Processes the batch and commits the offset for the named entity.getSourceResponseRecords()
- After processing a batch, retrieves any response records returned by downstream stages.
- log
- An object that writes messages to the log4j log. Use
sdc.log
to access the object configured for the stage. The object includes methods that correspond to the level in the log file:info(<message template>, <arguments>...)
warn(<message template>, <arguments>...)
error(<message template>, <arguments>...)
trace(<message template>, <arguments>...)
The message template can contain positional variables, indicated by curly brackets: { }. When writing messages, the method replaces each variable with the argument in the corresponding position - that is, the method replaces the first { } occurrence with the first argument, and so on.
- sdc
- A wrapper object that accesses the constants, methods, and objects available to the user script.
- The
sdc
object contains the following methods to work around a Jython import issue. For more information about using these methods, see Thread Safety in Jython Scripts.importLock()
- Acquires a system-wide lock. Call this method before importing.importUnlock()
- Releases a system-wide lock. Call this method after importing.
Thread Safety in Jython Scripts
Jython has a known issue with the thread safety of imports. To work around this issue, you can use a system-wide lock available with the Jython Scripting origin to synchronize imports between threads and between pipelines. In your script, acquire the lock, import all needed modules, and then release the lock.
try
statement with a
finally
block:try:
sdc.importLock()
import ...
finally:
sdc.importUnlock()
If a script that fails does not include the finally
block to release the
lock, then no other script can use the lock until you restart Data Collector.
Multithreaded Processing
The Jython Scripting origin can use multiple concurrent threads to process data based on the Number of Threads property.
To enable multithreaded processing, write the
script to create the configured number of threads. Each thread must create a batch and
pass the batch to an available pipeline runner by calling the
batch.process(<String entity>, <String offset>)
method. A pipeline runner is a sourceless
pipeline instance - an instance of the pipeline that includes
all of the processors, executors, and destinations in the pipeline and
handles all pipeline processing after the origin.
Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, suppose you write the script to use multiple threads to read files in the order of last-modified timestamp, and you configure the origin to use five threads. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners.
The origin assigns a thread to each of the five oldest files. Each thread processes its assigned file, creating batches of data and passing each batch to a pipeline runner.
After a thread completes processing a file, the origin assigns the thread to the next file based on the last-modified timestamp, until all files are processed.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Accessing Record Details
By default, you use native types from the scripting language to access records in scripts. However, with native types, you cannot easily access all the features of Data Collector records, such as field attributes.
To access records in scripts directly as Data Collector
records, configure the stage to use the Data Collector Java API to process records in
scripts by setting the Record Type advanced property to Data Collector
Records
.
In the script, reference the needed classes from
the Java package com.streamsets.pipeline.api
and then use the
appropriate methods to access records and fields. With the Data Collector
Java API, scripts can access all the features of Data Collector
records.
- Create a String field named
new
and set its value tonew-value
. - Update the existing field named
old
to set the value of theattr
attribute toattr-value
.
from com.streamsets.pipeline.api import Field
...
record.sdcRecord.set('/new', Field.create(Field.Type.STRING, 'new-value'))
record.sdcRecord.get('/old').setAttribute('attr', 'attr-value')
...
Type Handling
- Data type of null values
- You can associate null values with a data type. For example, if the script assigns a null value to an Integer field, the field is returned to the pipeline as an integer with a null value.
- Date fields
- Use the String data
type to create a new field to store a date with a specific format. For
example, the following sample code creates a new String field that
stores the current date using the format
YYYY-MM-dd
:# Define a date object to record the current date import datetime as dt new_date = dt.datetime.utcnow().strftime("%Y-%m-%d") cur_batch = sdc.createBatch() for record in records: try: # Create a string field to store the current date with the specified format record.value["date"] = new_date # Add record to the current batch cur_batch.add(record) except Exception as e: # Send record to error cur_batch.addError(record, str(e)) # Process the current batch cur_batch.process(entity_name, str(offset))
- Data type of modified values
- Values that are not modified by the stage retain their original type.
Event Generation
You can use the Jython Scripting origin to generate event records for an event stream. Enable event generation when you want the stage to generate an event record based on scripting logic.
As with any record, you can pass event records downstream to a destination for event storage or to any executor that can be configured to use the event. For more information about events and the event framework, see Dataflow Triggers Overview.
- On the General tab, select the Produce
Events property.
This enables the event output stream for use.
- Include both of the following methods in the script:
-
sdc.createEvent(<String type>, <Integer version>)
- Creates an event record with the specified event type and version number. You can create a new event type or use an existing event type. Existing event types are documented in other event-generating stages.The event record contains no record fields. Generate record fields as needed.
-
batch.toEvent(<record>)
- Use to append an event record to a batch and pass events to the event output stream.
-
Event Record
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type, specified by the sdc.createEvent method. |
sdc.event.version | Event version, specified by the sdc.createEvent
method. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
Record Header Attributes
The script in the Jython Scripting origin can create custom record header attributes. Pipeline logic can use record header attributes to affect data flow. Therefore, you might create a custom record header attribute for a specific purpose. For more information, see Record Header Attributes.
All records include a set of internal record header attributes that stages update automatically as they process records. Error records also have their own set of internal header attributes. You cannot change values of internal header attributes in scripts.
record.<header name>
- Use to return the value of a header attribute.record.attributes
- Use to return a map of custom record header attributes, or to create or update a specific record header attribute.
Calling External Java Code
You can call external Java code from the Jython Scripting origin. Simply install the external Java library to make it available to the origin. Then, call the external Java code from the Jython script that you develop for the origin.
You install the external Java library into the Jython stage library, streamsets-datacollector-jython_2_7-lib
, which includes the origin. For information about installing additional drivers, see Install External Libraries.
from <package> import <class name>
from org.bouncycastle.jcajce.provider.digest.SHA3 import DigestSHA3
For more information, see the following StreamSets blog post: Calling External Java Code from Script Evaluators.
Configuring a Jython Scripting Origin
Configure a Jython Scripting origin to run a Jython script to create Data Collector records.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Produce Events Generates event records when events occur. Use for event handling. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Performance tab, configure the following
properties:
Performance Property Description Batch Size Number of records to generate in a single batch. The script accesses this value with the
sdc.batchSize
constant and implements batch processing.Default value is 1000. Data Collector honors values up to the Data Collector maximum batch size. The Data Collector default is 1000.
Number of Threads Number of threads that generate data concurrently in parallel. The script accesses this value with the
sdc.numThreads
constant and implements multithreaded processing. -
On the Script tab, configure the following property:
Script Property Description User Script Script that runs during pipeline execution. Tip: To toggle full-screen editing, press either F11 or Esc, depending on the operating system, when the cursor is in the editor. -
On the Advanced tab, configure the following
properties:
Advanced Property Description Record Type Record type to use during script execution: - Data Collector Records - Select when scripts use Data Collector Java API methods to access records.
- Native Objects - Select when scripts use native types to access records.
Default value is Native Objects.
Parameters in Script Script parameters and their values. The script accesses the values with the
sdc.userParams
dictionary.