Interacting with Pipeline Snapshots
Section Contents
Interacting with Pipeline Snapshots#
Capturing a Snapshot#
To generate a snapshot for an existing pipeline, you could use the following steps:
# Connect to a DataCollector instance and set the pipeline you wish to generate a snapshot for
sdc = DataCollector("https://localhost:18630")
pipeline = sdc.pipelines.get(id='testpipel1c35d2ff-5e49-4f99-ba56-00051fd7845f')
# Capture the snapshot
snapshot = sdc.capture_snapshot(pipeline).snapshot
snapshot
Output:
<Snapshot (name=New Snapshot, time_stamp=1616718217886, batch_number=1)>
Reading from a Snapshot#
Once a snapshot is captured, it’s possible to inspect what data was captured.
Let’s assume the above pipeline consisted of a Dev Data Generator origin writing to a Trash destination, with the origin generating datetime data.
To read output values from a stage, you would execute the following:
# Select the stage you're interested in reading the output values for. In this case, the Origin stage
stage = pipeline.stages[0]
# Read the field attribute from output[0], which is the first record captured for the stage specified
field = snapshot[stage].output[0].field
which returns a dictionary of the output:
{'random_value': 2018-11-05 04:31:05.953000}
To check a specific field’s value from the output record you’ve just retrieved, you can index it directly by providing the name of the field:
field['random_value']
Output:
datetime.datetime(2018, 11, 5, 4, 31, 5, 953000)
Note that the field value is coerced into the appropriate type, but the underlying raw value is stored along with its type.
field['random_value'].raw_value
field['random_value'].type
Output:
# field['random_value'].raw_value
1541392265953
# field['random_value'].type
'DATETIME'
Retrieving an existing Snapshot#
Perhaps you had generated a snapshot for the same pipeline previously, either via the DataCollector UI or Python SDK,
and now you’d like to retrieve it. The following would allow you to pick a snapshot based on a number of unique
attributes such as id
, name
, or time_stamp
:
snapshot = sdc.get_snapshots(pipeline).get(name='Earlier Snapshot')
snapshot
snapshot.id
Output:
# snapshot
<Snapshot (name=Earlier Snapshot, time_stamp=1616716207032, batch_number=1)>
# snapshot.id
'snapshot1616718217454'
Deleting a Snapshot#
If you’ve successfully generated a snapshot for a pipeline and no longer need to retain it, it can be deleted using the
streamsets.sdk.DataCollector.delete_snapshot()
method. The method expects to receive a
streamsets.sdk.sdc_models.Snapshot
instance as an argument, exactly like the one returned by the
streamsets.sdk.DataCollector.get_snapshots()
method used previously:
# Check the list of current snapshots for the pipeline
sdc.get_snapshots(pipeline)
# Get the snapshot object that corresponds to the snapshot you wish to delete
snapshot = sdc.get_snapshots(pipeline).get(name='New Snapshot')
snapshot
sdc.delete_snapshot(snapshot)
# The snapshot has been deleted
sdc.get_snapshots(pipeline)
Output:
# sdc.get_snapshots(pipeline)
[<Snapshot (name=New Snapshot, time_stamp=1617379142414, batch_number=1)>,
<Snapshot (name=Earlier Snapshot, time_stamp=1617379338310, batch_number=1)>]
# snapshot
<Snapshot (name=New Snapshot, time_stamp=1617379142414, batch_number=1)>
# sdc.delete_snapshot(snapshot)
<streamsets.sdk.sdc_api.Command object at 0x7f02c29eac88>
# sdc.get_snapshots(pipeline)
[<Snapshot (name=Earlier Snapshot, time_stamp=1617379338310, batch_number=1)>]