Connection Tags
Section Contents
Connection Tags#
A Connection Tag identifies similar connections and allows you to easily search and filter connections.
Creating a connection with tags#
To add a tag to a connection at creation time, simply include a tags
argument in the
streamsets.sdk.sch_models.ConnectionBuilder.build()
method:
connection_builder = sch.get_connection_builder()
data_collector = sch.data_collectors.get(url='http://localhost:18630')
# Create a list to store the tags to add to the connection at creation time
tags = ['test/dev', 'test']
# Add the list of tags into the builder via `tags=tags`
connection = connection_builder.build(title='s3 connection dev',
connection_type='AWS_S3',
authoring_data_collector=data_collector,
tags=tags)
connection.connection_definition.configuration['awsConfig.awsAccessKeyId'] = 123
connection.connection_definition.configuration['awsConfig.awsSecretAccessKey'] = 456
sch.add_connection(connection)
# Double check that the tags were successfully added
connection.tags
Output:
[<Tag (tag=test/dev)>,
<Tag (tag=test)>]
Updating tags of an existing connection#
To update the tags of an existing connection via the sdk, retrieve the streamsets.sdk.sch_models.Connection
object you want to update and use its streamsets.sdk.sch_models.Connection.add_tag()
method to pass in the tags
you want to add. Finally, pass the modified connection to the streamsets.sdk.ControlHub.update_connection()
method to push the changes to Control Hub:
connection = sch.connections.get(name='s3 connection dev')
connection.add_tag('prod/dev', 'prod')
sch.update_connection(connection)
connection.tags
Output:
[<Tag (tag=test/dev)>,
<Tag (tag=test)>,
<Tag (tag=prod/dev)>,
<Tag (tag=prod)>]
Removing existing tags for a connection#
Similar to adding tags for a connection, removing tags requires the same steps. Retrieve the streamsets.sdk.sch_models.Connection
object you want to update and use its streamsets.sdk.sch_models.Connection.remove_tag()
method to specify the
the tags you wish to remove from the connection. Finally, pass the modified connection to the streamsets.sdk.ControlHub.update_connection()
method to push the changes to Control Hub:
connection = sch.connections.get(name='s3 connection dev')
connection.remove_tag('test', 'test/dev')
sch.update_connection(connection)
connection.tags
Output:
[<Tag (tag=prod/dev)>,
<Tag (tag=prod)>]]
Retrieve all connection tags#
To retrieve all connection tags that exist in Control Hub for your organization, reference the streamsets.sdk.ControlHub.connection_tags
attribute for your Control Hub instance:
sch.connection_tags
Output:
[<Tag (tag=dev)>, <Tag (tag=prod)>]
Similarly if you wanted to retrieve all connections by a particular parent ID, you could use the parent_id
attribute
to further filter the results:
sch.connection_tags.get_all(parent_id='prod:{}'.format(sch.organization))
Output:
[<Tag (tag=prod/data)>, <Tag (tag=prod/pipeline)>]
Connection audits#
Changes to connections on Control Hub also have an audit trail that can be interacted with via the SDK.
To retrieve connection audits for last 30 days, you can reference the streamsets.sdk.ControlHub.connection_audits
attribute:
sch.connection_audits
Output:
[<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=UPDATE,
audit_time=1601574060023)>,
<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=CREATE,
audit_time=1601574050166)>]
To retrieve connection audits for a specific time period, you’ll still reference the streamsets.sdk.ControlHub.connection_audits
attribute but can filter the results further by providing a start_time
and end_time
:
import datetime
current_timestamp = datetime.datetime.now().timestamp() * 1000
sch.connection_audits.get_all(start_time=0, end_time=current_timestamp)
Output:
[<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=UPDATE,
audit_time=1601574060023)>,
<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=CREATE,
audit_time=1601574050166)>]
To retrieve connection audits for a specific connection, you will again reference the streamsets.sdk.ControlHub.connection_audits
attribute but will provide a specific streamsets.sdk.sch_models.Connection
object to filter the results by:
connection = sch.connections.get(name='s3 connection invalid')
sch.connection_audits.get_all(connection=connection)
Output:
[<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=UPDATE,
audit_time=1601574060023)>,
<ConnectionAudit (user_id=admin@admin,
connection_name=s3 connection prod,
audit_action=CREATE,
audit_time=1601574050166)>]
To retrieve connection audits for a different organization, you’ll need to be a system administrator for Control Hub.
You will still reference the streamsets.sdk.ControlHub.connection_audits
attribute, but will specify the
organization='org_name'
for which you’d like to retrieve connection audits for:
sch.connection_audits.get_all(organization='test', start_time=0, end_time=current_timestamp)
Output:
[<ConnectionAudit (user_id=admin@test,
connection_name=s3 connection test,
audit_action=UPDATE,
audit_time=1601574060023)>,
<ConnectionAudit (user_id=admin@test,
connection_name=s3 connection test,
audit_action=CREATE,
audit_time=1601574050166)>]