Source code for streamsets.testframework.environments.influxdb
# Copyright 2017 StreamSets Inc.
"""Environment abstractions to interact with InfluxDB."""
from urllib.parse import urlparse
from influxdb import InfluxDBClient
from influxdb.line_protocol import quote_ident
DEFAULT_USERNAME = 'sdc'
DEFAULT_PASSWORD = 'sdc'
DEFAULT_PORT = 8086
[docs]class InfluxDBInstance:
"""Class that encapsulates InfluxDB client instance.
Args:
uri (:obj:`str`): InfluxDB URI in scheme://[username:password]@host:port[/[database]] format
where ``scheme`` is one of ``influxdb`` or ``https+influxdb`` or ``udp+influxdb``
(e.g. ``influxdb://sdcuser:sdcpass@myinfluxdb.cluster:8086/testdb``).
"""
def __init__(self, uri):
self.uri = uri
parts = urlparse(self.uri)
self.hostname = parts.hostname
self.username = parts.username or DEFAULT_USERNAME
self.password = parts.password or DEFAULT_PASSWORD
self.port = parts.port or DEFAULT_PORT
self.database = parts.path.strip('/')
self.sdc_stage_libs = ['streamsets-datacollector-influxdb_0_9-lib']
self.sdc_stage_configurations = {
'com_streamsets_pipeline_stage_destination_influxdb_InfluxDTarget': {
# 'conf.connection.*' configurations are picked up by SDC versions where the stage uses
# the connection, and ignored by older ones. Conversely, for 'conf.password|url|username'.
'conf.connection.password': self.password,
'conf.connection.url': f'http://{self.hostname}:{self.port}',
'conf.connection.username': self.username,
'conf.dbName': self.database,
'conf.password': self.password,
'conf.url': f'http://{self.hostname}:{self.port}',
'conf.username': self.username,
}
}
@property
def client(self):
"""Get a InfluxDB client.
Returns:
(:obj:`influxdb.InfluxDBClient`)
"""
return InfluxDBClient.from_DSN(self.uri)
# Note: Current InfluxDB client version 4.1.1 does not support drop measurement.
[docs] def drop_measurement(self, measurement):
"""Drop a measurement from InfluxDB.
Args:
measurement (:obj:`str`)
Returns:
results (:obj:`influxdb.resultset.ResultSet`)
"""
self.client.query('DROP MEASUREMENT {}'.format(quote_ident(measurement)))