Subscriptions


A subscription listens for Control Hub events and then completes an action when those events occur. The SDK makes it possible to interact with Subscriptions including creation, deletion, and even reading the events from a subscription.

Creating a Subscription

You can create a new streamsets.sdk.sch_models.Subscription instance within Control Hub via the streamsets.sdk.sch_models.SubscriptionBuilder class. Use the streamsets.sdk.ControlHub.get_subscription_builder() method to instantiate the builder object:

subscription_builder = sch.get_subscription_builder()

subscription_builder.add_event(event_type='Pipeline Committed')
subscription_builder.set_email_action(recipients=['fake@fake.com'],
                                      subject='{{PIPELINE_NAME}} pipeline was committed',
                                      body=('{{PIPELINE_COMMITTER}} committed the {{PIPELINE_NAME}} pipeline '
                                            'on {{PIPELINE_COMMIT_TIME}}.'))
subscription = subscription_builder.build(name='Sample Subscription')
sch.add_subscription(subscription)

Retrieving a Subscription

To retrieve an existing subscription from Control Hub simply reference the subscriptions attribute:

sch.subscriptions

Output:

[<Subscription (id=9c513e23-0a0c-433f-91a3-646ecb481f48:admin, name=Error Message)>,
 <Subscription (id=f0240043-5d32-4645-b15b-1a37e7b78520:admin, name=Job Error)>,
 <Subscription (id=4accdcbc-9c74-4b5e-a224-1838c0149c33:admin, name=Pipeline Finish)>,
 <Subscription (id=fef279f1-4b3b-4e83-b99a-4250aa280361:admin, name=Pipeline/Job Name Alert)>,
 <Subscription (id=fe1f5b87-86bb-430c-bc14-9f2769546512:admin, name=SDC Down Alert)>]

You can also filter the results by attributes like name or id to retrieve a specific subscription:

sch.subscriptions.get(name='SDC Down Alert')

Output:

<Subscription (id=fe1f5b87-86bb-430c-bc14-9f2769546512:admin, name=SDC Down Alert)>

Getting the events from a Subscription object

Once you’ve retrieved a specific subscription streamsets.sdk.sch_models.Subscription that you’re interested in, you can then introspect on the events that exist within that subscription:

subscription.events

Output:

[<SubscriptionEvent (event_type=Job Status Change, filter=)>,
 <SubscriptionEvent (event_type=Data SLA Triggered, filter=)>,
 <SubscriptionEvent (event_type=Pipeline Committed, filter=)>,
 <SubscriptionEvent (event_type=Pipeline Status Change, filter=)>,
 <SubscriptionEvent (event_type=Report Generated, filter=)>,
 <SubscriptionEvent (event_type=Data Collector not Responding, filter=)>]

You can also filter the events by the event_type:

event = subscription.events.get(event_type='Pipeline Committed')
event

Output:

<SubscriptionEvent (event_type=Pipeline Committed, filter=)>

Getting the action from a Subscription object

Similar to retrieving the events from a subscription, you can also view the action set for a specific subscription by referencing the streamsets.sdk.sch_models.Subscription.action attribute:

action = subscription.action
action

Output:

<SubscriptionAction (event_type=EMAIL)>

Update an existing Subscription

Updating an existing subscription is similar to creating a new subscription for the first time. It makes use of the streamsets.sdk.sch_models.SubscriptionBuilder class to import the existing subscription object, which then allows the subscription to be modified. Once the subscription has been modified as desired, the streamsets.sdk.sch_models.SubscriptionBuilder.build() method is used to construct the subscription instance which can then be passed to Control Hub via the streamsets.sdk.ControlHub.update_subscription() method:

subscription = sch.subscriptions.get(name='Sample Subscription')
# Import Subscription into builder
subscription_builder = sch.get_subscription_builder()
subscription_builder.import_subscription(subscription)
# Remove existing event
subscription_builder.remove_event(event_type='Pipeline Committed')
# Add a new Job Status Change Event
subscription_builder.add_event(event_type='Job Status Change', filter="${{JOB_ID=='{}'}}".format(job.job_id))
# Change action to Webhook action
subscription_builder.set_webhook_action(uri='https://google.com')
# Build the subscription
subscription = subscription_builder.build(name='Sample Subscription updated')
# Update the Subscription on Control Hub instance
sch.update_subscription(subscription)

Deleting an existing Subscription

Deleting an existing subscription is as simple as retrieving the streamsets.sdk.sch_models.Subscription object from Control Hub, and passing it into the streamsets.sdk.ControlHub.delete_subscription() method:

subscription = sch.subscriptions.get(name='Sample Subscription updated')
sch.delete_subscription(subscription)

Acknowledging a subscription error

Errors generated by subscriptions can also be acknowledged directly from the SDK. Simply retrieve the subscription with the error from Control Hub, and pass the object into the streamsets.sdk.ControlHub.acknowledge_event_subscription_error() method:

subscription = sch.subscriptions.get(name='Sample Subscription')
# Check the current error message for this subscription, if any
subscription.error_message

sch.acknowledge_event_subscription_error(subscription)
subscription.error_message

Output:

# subscription.error_message
'Failed to trigger email action for event fbee1816-6c72-40ec-a432-e19b5ccac891:admin due to: Issues:
[APP_ISSUES_01 - Exception: com.streamsets.datacollector.email.EmailException: javax.mail.SendFailedException:
Invalid Addresses;\n  nested exception is:\n\tcom.sun.mail.smtp.SMTPAddressFailedException: 553 5.1.2
The recipient address <fake@fake.com> is not a valid RFC-5321 address. x203sm9391603pgx.61 - gsmtp\n]'

# sch.acknowledge_event_subscription_error(subscription)
<sdk.sch_api.Command at 0x111c50eb8>

# subscription.error_message
None

Retrieving subscription audits

Subscription events are also audited, allowing you to review changes and updates made to subscriptions directly in the SDK. To retrieve subscription audit events from Control Hub, simply reference the streamsets.sdk.ControlHub.subscription_audits attribute for the streamsets.sdk.ControlHub object you’ve instantiated:

sch.subscription_audits

Output:

[<SubscriptionAudit (subscription_name='pipeline',
                     event_name='PIPELINE_COMMITTED',
                     external_action_type='WEBHOOKV1',
                     created_time=1607548034094)>,
 <SubscriptionAudit (subscription_name='new pipeline',
                     event_name='PIPELINE_COMMITTED',
                     external_action_type='WEBHOOKV1',
                     created_time=1607548034094)>]