Kafka

Available when using an authoring Data Collector version 4.0.0 or later.

To create a Kafka connection, one of the following stage libraries must be installed on the selected authoring Data Collector:
  • Apache Kafka, streamsets-datacollector-apache-kafka_<version>-lib
  • Cloudera CDP, streamsets-datacollector-cdp_<version>-lib
Tip: To view the complete list of supported stage libraries, expand the list of libraries next to the Test Connection button when you create or edit a connection.

For a description of the Kafka connection properties, see Kafka Connection Properties.

After you create a Kafka connection, you can use the connection in the following stages and locations:
Engine Stages and Locations

Data Collector 4.0.0 or later

  • Kafka Multitopic Consumer origin
  • Kafka Producer destination
  • Write to Kafka error record handling configured for a pipeline

Transformer 4.0.0 or later

  • Kafka origin
  • Kafka destination

Kafka Connection Properties

When creating a Kafka connection, configure the following property on the Kafka tab:
Kafka Property Description
Broker URI Comma-separated list of connection strings for the Kafka brokers. Use the following format for each broker: <host>:<port>.

To ensure a pipeline can connect to Kafka in case a specified broker goes down, list as many brokers as possible.

Configure the following property on the Security tab:
Security Property Description
Security Option Authentication and encryption option used to connect to the Kafka brokers:
  • None (Security Protocol=PLAINTEXT) - Uses no authentication or encryption.
  • SSL/TLS Encryption (Security Protocol=SSL)
  • SSL/TLS Encryption and Authentication (Security Protocol=SSL)
  • SASL Authentication (Security Protocol=SASL_PLAINTEXT)
  • SASL Authentication on SSL/TLS (Security Protocol=SASL_SSL)
  • Custom Authentication (Security Protocol=CUSTOM)

Enabling security requires completing several prerequisite tasks on all execution engines that access the connection and requires configuring additional security properties in the connection, as described in Kafka Security.

Note: The Custom Authentication option is supported with Data Collector 5.1.0 or later.

Kafka Security

You can configure a Kafka connection to use one of the following security options to connect securely to Kafka:
  • SSL/TLS Encryption
  • SSL/TLS Encryption and authentication
  • SASL Authentication
  • SASL Authentication on SSL/TLS
  • Custom Authentication

The SASL authentication options provide two SASL mechanisms: PLAIN or GSSAPI (Kerberos).

Enabling security requires completing several prerequisite tasks in addition to configuring security properties in the connection.

Prerequisite Tasks

Before enabling security for a Kafka connection, you must complete several prerequisite tasks.
Important: You must complete the same prerequisites on all execution engines - Data Collectors and Transformers - that access the connection.

Complete the following prerequisite tasks for the security option that you want to use:

SSL/TLS
Complete the following prerequisite tasks before using SSL/TLS to connect to Kafka:
  • Make sure Kafka is configured for SSL/TLS as described in the Kafka documentation.
  • Store the SSL truststore and keystore files on the execution engine machine.

    For Transformer pipelines, store the files in the same location on the Transformer machine and on each node in the Spark cluster.

SASL with the PLAIN mechanism
Complete the following prerequisite tasks before using SASL with the PLAIN mechanism to connect to Kafka:
  • Make sure Kafka is configured for SASL authentication with the PLAIN mechanism as described in the Kafka documentation.
  • Define the username and password credentials in a JAAS configuration file, as described in Providing PLAIN Credentials.
  • Store the JAAS configuration file on the execution engine machine.

    For Transformer pipelines, store the files in the same location on the Transformer machine and on each node in the Spark cluster. When configuring the pipeline, you also must specify the path to the JAAS configuration file as Extra Spark Configuration properties on the pipeline Cluster tab. For more information, see Enabling SASL Authentication in the Transformer documentation.

SASL with the GSSAPI (Kerberos) mechanism

Complete the following prerequisite tasks before using SASL with the GSSAPI (Kerberos) mechanism to connect to Kafka:

  • Make sure Kafka is configured for SASL authentication with the GSSAPI (Kerberos) mechanism as described in the Kafka documentation.
  • Make sure Kerberos authentication is enabled for the execution engines.

    For Data Collector pipelines, see Kerberos Authentication in the Data Collector documentation. For Transformer pipelines that run on a Hadoop YARN cluster configured for Kerberos, see Kerberos Authentication in the Transformer documentation.

  • Determine how to provide the Kerberos credentials and complete the required tasks as described in Providing Kerberos Credentials.
  • Store the JAAS configuration and Kafka keytab files on the execution engine machine.

    For Transformer pipelines, store the files in the same location on the Transformer machine and on each node in the Spark cluster. When configuring the pipeline, you also must specify the path to the JAAS configuration file as Extra Spark Configuration properties on the pipeline Cluster tab. For more information, see Enabling SASL Authentication in the Transformer documentation.

SASL Authentication Credentials

When using SASL authentication to connect to Kafka, the method that you use to provide credentials depends on whether you use the PLAIN or GSSAPI (Kerberos) SASL mechanism.

Providing PLAIN Credentials

To connect to Kafka using SASL authentication with the PLAIN mechanism, provide the credentials in a Java Authentication and Authorization Service (JAAS) file.

Create a JAAS configuration file on the Data Collector or Transformer machine. You can define a single JAAS file for an execution engine. As a result, every Kafka connection in every pipeline run on that engine uses the same credentials.

Add the following KafkaClient login section to the file:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="<username>" 
    password="<password>";
};
Then modify the Java configuration options used by the deployment to include the option that defines the path to the JAAS configuration file. In Control Hub, edit the deployment. In the Configure Engine section, click Advanced Configuration. Then, click Java Configuration. Add the following option to the Java Options property:
-Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf

Providing Kerberos Credentials

To connect to Kafka using SASL authentication with the GSSAPI (Kerberos) mechanism, you must provide the Kerberos credentials to use.

You can provide Kerberos credentials in either of the following ways. You can also use both methods, as needed:

JAAS file
Define Kerberos credentials in a Java Authentication and Authorization Service (JAAS) file when you want to use the same keytab and principal for every Kafka connection in every pipeline that you create. When configured, credentials defined in connection properties override JAAS file credentials.
You might use this method to provide a default keytab and principal. Then, use connection properties to specify different credentials, as needed.
To use this method, create a JAAS configuration file on the Data Collector or Transformer machine.
Add the following KafkaClient login section to the file:
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="<keytab path>"
    principal="<principal name>/<host name>@<realm>";
};
For example:
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="kafka/node-01.cluster@EXAMPLE.COM";
};
Then modify the Java configuration options used by the deployment to include the option that defines the path to the JAAS configuration file. In Control Hub, edit the deployment. In the Configure Engine section, click Advanced Configuration. Then, click Java Configuration. Add the following option to the Java Options property:
-Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf
Connection properties
You can define Kerberos credentials in connection properties when the Kafka connection uses a stage library for Kafka 0.11.0.0 or higher. Define Kerberos credentials in connection properties when you want to use different credentials in different Kafka connections.
Important: Configuring Kerberos credentials in connection properties is not supported in Transformer pipelines at this time.
If you also configure a JAAS file to provide Kerberos credentials, the credentials that you enter in connection properties overrides those in the JAAS file.
To provide Kerberos credentials in connection properties, you select the Provide Keytab at Runtime property on the Security tab of the connection. You specify the principal in plain text, then you use one of the following methods to specify the keytab:
  • Enter a Base64-encoded keytab in the Runtime Keytab property.

    Encode the keytab before entering it in the stage property. Be sure to remove unnecessary characters, such as newline characters, before encoding the keytab.

  • Use a credential function to access a Base64-encoded keytab defined in a credential store.

    For more information, see Using a Credential Store.

Using a Credential Store

You can define Kerberos keytabs in a credential store, then call the appropriate keytab from a Kafka connection.

Defining Kerberos keytabs in a credential store allows you to store multiple keytabs for use by Kafka connections. It also provides flexibility in how you use the keytabs. For example, you might create two separate keytabs, one for connections used in Kafka origins and one for connections used in Kafka destinations. Or, you might provide separate keytabs for every Kafka connection that you define.

Using a credential store makes it easy to update keytabs without having to edit the connections that use them. This can simplify tasks such as recycling keytabs or migrating pipelines to production.

Make sure that Data Collector is configured to use a supported credential store. For a list of supported credential stores and instructions on enabling each credential store, see Credential Stores in the Data Collector documentation.

For an additional layer of security, you can require group access to credential store secrets. For more information, see Group Access to Secrets in the Data Collector documentation.

Enabling SSL/TLS Encryption

When the Kafka cluster uses the Kafka SSL security protocol, enable the Kafka connection to use SSL/TLS encryption.

Before you enable a Kafka connection to use SSL/TLS encryption, make sure that you have performed all necessary prerequisite tasks. Then, perform the following steps to enable the connection to use SSL/TLS encryption to connect to Kafka.

  1. On the Kafka tab of the connection, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  2. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption (Security Protocol=SSL).
    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore File Path to the truststore file.

    For use in Data Collector pipelines, enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    For use in Transformer pipelines, enter an absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster. For example, you might enter the following path: /var/private/ssl/kafka.client.truststore.jks

    Important: The file must exist in the same location on all execution engines that access the connection.
    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Enabled Protocols Comma-separated list of protocols used to connect to the Kafka brokers. Ensure that at least one of these protocols is enabled in the Kafka brokers.
    Note: Older protocols are not as secure as TLSv1.2.

Enabling SSL/TLS Encryption and Authentication

When the Kafka cluster uses the Kafka SSL security protocol and requires client authentication, enable the Kafka connection to use SSL/TLS encryption and authentication.

Before you enable a Kafka connection to use SSL/TLS encryption and authentication, make sure that you have performed all necessary prerequisite tasks. Then, perform the following steps to enable the connection to use SSL/TLS encryption and authentication to connect to Kafka.

  1. On the Kafka tab of the connection, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  2. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption and Authentication (Security Protocol=SSL).
    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore File Path to the truststore file.

    For use in Data Collector pipelines, enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    For use in Transformer pipelines, enter an absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster. For example, you might enter the following path: /var/private/ssl/kafka.client.truststore.jks

    Important: The file must exist in the same location on all execution engines that access the connection.
    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore File Path to the keystore file.

    For use in Data Collector pipelines, enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    For use in Transformer pipelines, enter an absolute path to the keystore file stored in the same location on the Transformer machine and on each node in the cluster. For example, you might enter the following path: /var/private/ssl/kafka.client.keystore.jks

    Important: The file must exist in the same location on all execution engines that access the connection.
    Keystore Password Password to the keystore file.
    Key Password Password for the key in the keystore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Enabled Protocols Comma-separated list of protocols used to connect to the Kafka brokers. Ensure that at least one of these protocols is enabled in the Kafka brokers.
    Note: Older protocols are not as secure as TLSv1.2.

Enabling SASL Authentication

When the Kafka cluster uses the SASL_PLAINTEXT security protocol, enable the Kafka connection to use SASL authentication.

Before you enable a Kafka connection to use SASL authentication, make sure that you have performed all necessary prerequisite tasks.

Note: The following steps provide details on providing Kerberos credentials using a JAAS file or connection properties. For use in Data Collector pipelines, you can use either method or both. Skip the steps that are not relevant to your desired implementation.
  1. To use a Java Authentication and Authorization Service (JAAS) file to provide plain or Kerberos credentials, create a JAAS configuration file.

    The contents of the JAAS configuration file depend on whether you use the PLAIN or GSSAPI (Kerberos) SASL mechanism. For details, see Providing PLAIN Credentials or Providing Kerberos Credentials.

  2. If using the GSSAPI (Kerberos) SASL mechanism and a credential store to call keytabs from connection properties for Data Collector pipelines, add the Base64-encoded keytabs that you want to use to the credential store.
    Note: Be sure to remove unnecessary characters, such as newline characters, before encoding the keytab.

    If you configured Data Collector to require group secrets, for each keytab secret that you define, create a group secret and specify a comma-separated list of groups allowed to access the keytab secret.

    Name the group secret based on the keytab secret name, as follows: <keytab secret name>-groups.

    For more information about requiring group secrets, see Group Access to Secrets in the Data Collector documentation. For details on defining secrets, see your credential store documentation.

  3. On the Security tab of the connection, configure the following properties:
    Security Property Description
    Security Option Set to SASL Authentication (Security Protocol=SASL_PLAINTEXT).
    SASL Mechanism SASL mechanism to use:
    • PLAIN
    • GSSAPI (Kerberos)
    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.

    Available when using the GSSAPI (Kerberos) mechanism.

    Provide Keytab at Runtime Enables providing Kerberos credentials in the connection properties.
    Important: Configuring Kerberos credentials in connection properties is not supported in Transformer pipelines at this time.

    Available when using the GSSAPI (Kerberos) mechanism.

    Runtime Keytab Kerberos keytab to use for the connection, specified in one of the following ways:
    • Enter a Base64-encoded keytab.

      Be sure to remove unnecessary characters, such as newline characters, before encoding the keytab.

    • If using a credential store, use the credential:get() or credential:getWithOptions() credential function to retrieve a Base64-encoded keytab.
      Note: The user who starts the pipeline must be in the Data Collector group specified in the credential function. When Data Collector requires a group secret, the user must also be in a group associated with the keytab.

    For more information about using keytabs in a credential store, see Using a Credential Store.

    Available when using the GSSAPI (Kerberos) mechanism.

    Runtime Principal Kerberos principal to use for the connection, specified in the following format: <principal name>/<host name>@<realm>.

    Available when using the GSSAPI (Kerberos) mechanism.

Enabling SASL Authentication on SSL/TLS

When the Kafka cluster uses the SASL_SSL security protocol, enable the Kafka connection to use SASL authentication on SSL/TLS.

Before you enable a Kafka connection to use SASL authentication on SSL/TLS, make sure that you have performed all necessary prerequisite tasks.

Note: The following steps provide details on providing Kerberos credentials using a JAAS file or connection properties. For use in Data Collector pipelines, you can use either method or both. Skip the steps that are not relevant to your desired implementation.
  1. To use a Java Authentication and Authorization Service (JAAS) file to provide plain or Kerberos credentials, create a JAAS configuration file.

    The contents of the JAAS configuration file depend on whether you use the PLAIN or GSSAPI (Kerberos) SASL mechanism. For details, see Providing PLAIN Credentials or Providing Kerberos Credentials.

  2. If using the GSSAPI (Kerberos) SASL mechanism and a credential store to call keytabs from connection properties for Data Collector pipelines, add the Base64-encoded keytabs that you want to use to the credential store.
    Note: Be sure to remove unnecessary characters, such as newline characters, before encoding the keytab.

    If you configured Data Collector to require group secrets, for each keytab secret that you define, create a group secret and specify a comma-separated list of groups allowed to access the keytab secret.

    Name the group secret based on the keytab secret name, as follows: <keytab secret name>-groups.

    For more information about requiring group secrets, see Group Access to Secrets in the Data Collector documentation. For details on defining secrets, see your credential store documentation.

  3. On the Kafka tab of the connection, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  4. On the Security tab of the connection, configure the following properties:
    Security Property Description
    Security Option Set to SASL Authentication on SSL/TLS (Security Protocol=SASL_SSL).
    SASL Mechanism SASL mechanism to use:
    • PLAIN
    • GSSAPI (Kerberos)
    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.

    Available when using the GSSAPI (Kerberos) mechanism.

    Provide Keytab at Runtime Enables providing Kerberos credentials in the connection properties.
    Important: Configuring Kerberos credentials in connection properties is not supported in Transformer pipelines at this time.

    Available when using the GSSAPI (Kerberos) mechanism.

    Runtime Keytab Kerberos keytab to use for the connection, specified in one of the following ways:
    • Enter a Base64-encoded keytab.

      Be sure to remove unnecessary characters, such as newline characters, before encoding the keytab.

    • If using a credential store, use the credential:get() or credential:getWithOptions() credential function to retrieve a Base64-encoded keytab.
      Note: The user who starts the pipeline must be in the Data Collector group specified in the credential function. When Data Collector requires a group secret, the user must also be in a group associated with the keytab.

    For more information about using keytabs in a credential store, see Using a Credential Store.

    Available when using the GSSAPI (Kerberos) mechanism.

    Runtime Principal Kerberos principal to use for the connection, specified in the following format: <principal name>/<host name>@<realm>.

    Available when using the GSSAPI (Kerberos) mechanism.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore File Path to the truststore file.

    For use in Data Collector pipelines, enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    For use in Transformer pipelines, enter an absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster. For example, you might enter the following path: /var/private/ssl/kafka.client.truststore.jks

    Important: The file must exist in the same location on all execution engines that access the connection.
    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Enabled Protocols Comma-separated list of protocols used to connect to the Kafka brokers. Ensure that at least one of these protocols is enabled in the Kafka brokers.
    Note: Older protocols are not as secure as TLSv1.2.

Enabling Custom Authentication

To specify security requirements with custom properties, enable the connection to use custom authentication.

Note: The Custom Authentication option is supported with Data Collector 5.1.0 or later.

With custom authentication, you specify custom properties that contain the information required by the security protocol rather than using the properties in the connection. For example, you can enable custom authentication and then configure custom properties required for the SASL_SSL security protocol rather than enabling SASL Authentication on SSL/TLS.

Before enabling custom authentication, complete any necessary prerequisites for the security methods you are using, as described in the Kafka documentation. For example, if using SSL/TLS to connect to Kafka, you must make sure Kafka is configured for SSL/TLS.

  1. If using SSL/TLS to connect to Kafka, then on the Kafka tab of the connection, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  2. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to Custom Authentication (Security Protocol=CUSTOM).
    Custom Security Properties Additional security properties to use. In simple or bulk edit mode, click the Add icon to add properties. Define the security property name and value.

    Use the property names and values as expected for Kafka clients. For more information, see the Kafka documentation.

    For example, to configure the SASL_SSL security protocol with the PLAIN mechanism, in Custom Security Properties you configure properties to set the protocol, to connect to the Java Authentication and Authorization Service (JAAS), and to use SSL/TLS:
    Name Value
    security.protocol SASL_SSL
    sasl.mechanism PLAIN
    sasl.jaas.config
    org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="<user name for JAAS>"\
       password="<password>";
    ssl.truststore.location <full path to the client truststore jks file>
    ssl.truststore.password <password>