Connections
Section Contents
Connections#
A connection defines the information required to connect to an external system. Rather than providing these details repeatedly in every pipeline that accesses the same system, a connection can be created to store those details centrally and reused across multiple pipelines.
Creating a new connection#
To create a new connection and add it to Control Hub, use the streamsets.sdk.sch_models.ConnectionBuilder
class. Use the streamsets.sdk.ControlHub.get_connection_builder()
method to instantiate the builder
object.
The streamsets.sdk.sch_models.Connection
object can then be passed to the streamsets.sdk.ControlHub.add_connection()
method to be published in Control Hub:
connection_builder = sch.get_connection_builder()
data_collector = sch.data_collectors.get(url='http://localhost:18630')
connection = connection_builder.build(title='s3 connection dev',
connection_type='AWS_S3',
authoring_data_collector=data_collector)
connection.connection_definition.configuration['awsConfig.awsAccessKeyId'] = 123
connection.connection_definition.configuration['awsConfig.awsSecretAccessKey'] = 456
sch.add_connection(connection)
Please refer to the Control Hub documentation for the available connection types and various configurations.
Retrieving connections#
Retrieving connections from Control Hub is a simple task. You can reference streamsets.sdk.ControlHub.connections
to retrieve all existing connections. Likewise, you can further filter the available connections on attributes like name
, connection_type
and id
:
sch.connections
# Retrieve all connections with JDBC type
sch.connections.get_all(connection_type='STREAMSETS_JDBC')
Output:
# sch.connections
[<Connection (id='25dc5a92-a01c-4fef-979c-824000053396:admin',
name='s3 connection dev',
connection_type='STREAMSETS_AWS_S3')>]
# sch.connections.get_all(connection_type='STREAMSETS_JDBC')
[<Connection (id=6b8d75e2-7595-48ac-a6d2-cade5cafcc42:admin, name=Test, connection_type=STREAMSETS_JDBC)>,
<Connection (id=3afe2ce6-db7d-4c05-a6b0-1ab2b667ea67:admin, name=test mysql connection, connection_type=STREAMSETS_JDBC)>]
Updating a connection#
To update a connection, first retrieve the streamsets.sdk.sch_models.Connection
object you wish
to update. You can make modifications to attributes like the name
or various connection-specific configurations.
Once you’ve made the desired changes, use the streamsets.sdk.ControlHub.update_connection()
method to push the
updated connection to Control Hub:
connection = sch.connections.get(name='s3 connection dev')
connection.connection_definition.configuration['awsConfig.awsAccessKeyId'] = 234
connection.connection_definition.configuration['awsConfig.awsSecretAccessKey'] = 567
connection.name = 's3 connection prod'
sch.update_connection(connection)
Deleting a connection#
To delete a connection, first retrieve the streamsets.sdk.sch_models.Connection
object you wish
to delete, and then pass it to the streamsets.sdk.ControlHub.delete_connection()
method:
connection = sch.connections.get(name='s3 connection prod')
sch.delete_connection(connection)
Using a connection inside a pipeline#
After building a connection and adding it to Control Hub, the connection is available for use within a pipeline. To add
a connection to a pipeline, the steps are almost identical to building a normal pipeline - retrieve a
streamsets.sdk.sch_models.DataCollector
instance to use as an Authoring Data Collector (if desired),
retrieve the streamsets.sdk.sch_models.Connection
instance to add to the pipeline, instantiate a
streamsets.sdk.sch_models.PipelineBuilder
object to build the pipeline with, add and configure some stages
for the pipeline, and then finally add the connection and publish the pipeline:
# Retrieve a Data Collector instance to use as the Authoring Data Collector (optional)
data_collector = sch.data_collectors.get(url='http://localhost:18630')
# Retrieve the connection to be used inside the pipeline
connection = sch.connections.get(name='s3 connection prod')
# Create the pipeline builder instance as you would with any other pipeline
pipeline_builder = sch.get_pipeline_builder(data_collector)
# Add stages to the pipeline, set some attributes in the stages, and then connect the stages together
dev_raw_data_source = pipeline_builder.add_stage('Dev Raw Data Source')
dev_raw_data_source.stop_after_first_batch = True
amazon_s3_destination = pipeline_builder.add_stage('Amazon S3', type='destination')
amazon_s3_destination.set_attributes(bucket='bucket-name', data_format='JSON')
dev_raw_data_source >> amazon_s3_destination
# Set the destination stage (AWS S3 in this example) to use the connection retrieved earlier
amazon_s3_destination.use_connection(connection)
# Build and publish the pipeline
pipeline = pipeline_builder.build('Dev to S3')
sch.publish_pipeline(pipeline)
Verifying a connection#
Once a connection is built and added to Control Hub, it can be further tested for connectivity to verify that it’s
successfully connecting. To verify a connection, retrieve the streamsets.sdk.sch_models.Connection
instance you want to validate and then use the streamsets.sdk.ControlHub.verify_connection()
method to return
the results of the verification.
If any issues arise during the connection’s verification, you can introspect on the streamsets.sdk.sch_models.ConnectionVerificationResult.issue_count
and streamsets.sdk.sch_models.ConnectionVerificationResult.issue_message
to identify the issue:
# Retrieve the connection to be verified
connection = sch.connections.get(name='s3 connection prod')
# Run the verification, and then check the results (successful case)
verification_result = sch.verify_connection(connection)
verification_result
connection = sch.connections.get(name='s3 connection invalid')
# Run the verification, and then check the results (failure case)
verification_result = sch.verify_connection(connection)
verification_result
verification_result.issue_count
verification_result.issue_message
Output:
# verification_result
<ConnectionVerificationResult (status=VALID)>
#verification_result
<ConnectionVerificationResult (status=INVALID)>
# verification_result.issue_count
1
# verification_result.issue_message
'S3_SPOOLDIR_20 - Cannot connect to Amazon S3, reason : com.amazonaws.services.s3.model.AmazonS3Exception:
The request signature we calculated does not match the signature you provided. Check your key and signing method.'
Get pipelines using a connection#
To retrieve all pipelines using a specific connection, first retrieve the streamsets.sdk.sch_models.Connection
object you’d like to check against and then reference the streamsets.sdk.sch_models.Connection.pipeline_commits
attribute to determine which pipelines are currently using the connection:
# Get the connection to check the usage of
connection = sch.connections.get(name='s3 connection prod')
# Check the pipeline commits that currently use this connection
connection.pipeline_commits
# Introspect a little further and identify the pipeline details for the first (and only) pipeline commit above
connection.pipeline_commits[0].pipeline
Output:
#connection.pipeline_commits
[<PipelineCommit (commit_id=db1e3b87-1499-44ef-93b8-e4e045318c48:admin, version=1, commit_message=None)>]
# connection.pipeline_commits[0].pipeline
<Pipeline (pipeline_id=5462626e-0243-48dd-8c07-c6787a813e37:admin,
commit_id=db1e3b87-1499-44ef-93b8-e4e045318c48:admin, name=s3, version=1)>
Retrieving ACL permissions#
Connections can also have ACL permissions set on them to restrict access. To retrieve existing ACL permissions set on a
given connection, reference the streamsets.sdk.sch_models.Connection.acl.permissions
attribute for the
connection object you wish to check:
connection = sch.connections.get(name='s3 connection prod')
# Check which ACL resource is set for this connection
connection.acl
<ACL (resource_id=cadd8eaa-85f4-48d0-a1a0-ff77a63584cc:admin, resource_type=CONNECTION)>
# Check the specific permissions set by this connection's ACL resource
connection.acl.permissions
Output:
# connection.acl
<ACL (resource_id=cadd8eaa-85f4-48d0-a1a0-ff77a63584cc:admin, resource_type=CONNECTION)>
# connection.acl.permissions
[<Permission (resource_id=cadd8eaa-85f4-48d0-a1a0-ff77a63584cc:admin, subject_type=USER, subject_id=admin@admin)>]
Adding new permissions#
Adding new permissions to a connection via the SDK is similar to permission management in other SDK classes. Retrieve
the streamsets.sdk.sch_models.Connection
object you wish to modify permissions for, store the current ACL
permissions for the connection, generate a new set of permissions using the streamsets.sdk.sch_models.ACLPermissionBuilder.build()
method, and then add them back to the connection using the streamsets.sdk.sch_models.ACL.add_permission()
method:
# Get the connection object to be modified
connection = sch.connections.get(name='s3 connection prod')
# Store the ACL permissions of the connection object
acl = connection.acl
# Generate a new list that stores the specific permissions to be added later
actions = ['READ', 'WRITE']
# Build the new permissions by supplying a subject_id (username), subject_type (user or group) and the
# permissions to be set (the list created above)
permission = acl.permission_builder.build(subject_id='testuser@testorg', subject_type='USER', actions=actions)
# Add the permissions to the ACL object stored for the connection earlier
acl.add_permission(permission)
# Double check that the permissions were correctly added
connection = sch.connections.get(name='s3 connection prod')
connection.acl.permissions.get(subject_id='testuser@testorg')
Output:
<Permission (resource_id=cadd8eaa-85f4-48d0-a1a0-ff77a63584cc:admin, subject_type=USER, subject_id=testuser@testorg)>
Updating existing permissions#
Updating existing permissions for a connection is a similar exercise to creating new permissions - rather than building
a brand new permission set, you simply update an existing one in-place. Retrieve the streamsets.sdk.sch_models.Connection
object you wish to modify permissions for, retrieve the specific permissions you wish to update, and then modify the
permissions as needed:
# Get the connection object to be modified
connection = sch.connections.get(name='s3 connection prod')
# Get the specific permissions to be modified
permission = connection.acl.permissions.get(subject_id='testuser@testorg')
# Create a list of new permissions to be set
updated_actions = ['READ']
# Set the actions of the permission, retrieved earlier, to be the list of new permissions
permission.actions = updated_actions
# Double check that the permissions were correctly updated
connection = sch.connections.get(name='s3 connection prod')
connection.acl.permissions.get(subject_id=permission.subject_id).actions
Output:
['READ']
Removing existing permissions#
To remove existing permissions on a connection object, use the streamsets.sdk.sch_models.Connection
object’s
streamsets.sdk.sch_models.ACL.remove_permission()
method:
# Get the connection object to be modified
connection = sch.connections.get(name='s3 connection prod')
# Retrieve the specific permission to be removed
permission = connection.acl.permissions.get(subject_id='testuser@testorg')
# Remove the permission
connection.acl.remove_permission(permission)