Source code for streamsets.testframework.environments.influxdb

# Copyright 2017 StreamSets Inc.

"""Environment abstractions to interact with InfluxDB."""

from urllib.parse import urlparse

from influxdb import InfluxDBClient
from influxdb.line_protocol import quote_ident

DEFAULT_USERNAME = 'sdc'
DEFAULT_PASSWORD = 'sdc'
DEFAULT_PORT = 8086


[docs]class InfluxDBInstance: """Class that encapsulates InfluxDB client instance. Args: uri (:obj:`str`): InfluxDB URI in scheme://[username:password]@host:port[/[database]] format where ``scheme`` is one of ``influxdb`` or ``https+influxdb`` or ``udp+influxdb`` (e.g. ``influxdb://sdcuser:sdcpass@myinfluxdb.cluster:8086/testdb``). """ def __init__(self, uri): self.uri = uri parts = urlparse(self.uri) self.hostname = parts.hostname self.username = parts.username or DEFAULT_USERNAME self.password = parts.password or DEFAULT_PASSWORD self.port = parts.port or DEFAULT_PORT self.database = parts.path.strip('/') self.sdc_stage_libs = ['streamsets-datacollector-influxdb_0_9-lib'] self.sdc_stage_configurations = { 'com_streamsets_pipeline_stage_destination_influxdb_InfluxDTarget': { # 'conf.connection.*' configurations are picked up by SDC versions where the stage uses # the connection, and ignored by older ones. Conversely, for 'conf.password|url|username'. 'conf.connection.password': self.password, 'conf.connection.url': f'http://{self.hostname}:{self.port}', 'conf.connection.username': self.username, 'conf.dbName': self.database, 'conf.password': self.password, 'conf.url': f'http://{self.hostname}:{self.port}', 'conf.username': self.username, } } @property def client(self): """Get a InfluxDB client. Returns: (:obj:`influxdb.InfluxDBClient`) """ return InfluxDBClient.from_DSN(self.uri) # Note: Current InfluxDB client version 4.1.1 does not support drop measurement.
[docs] def drop_measurement(self, measurement): """Drop a measurement from InfluxDB. Args: measurement (:obj:`str`) Returns: results (:obj:`influxdb.resultset.ResultSet`) """ self.client.query('DROP MEASUREMENT {}'.format(quote_ident(measurement)))