Source code for streamsets.sdk.models

# Copyright 2019 StreamSets Inc.

"""Models to be used by multiple StreamSets components."""

# fmt: off
import json
import warnings
from uuid import uuid4

import inflection

# fmt: on

json_to_python_style = lambda x: inflection.underscore(x)
python_to_json_style = lambda x: inflection.camelize(x, uppercase_first_letter=False)


[docs]class Configuration: """Abstraction for configurations. This class enables easy access to and modification of data stored as a list of dictionaries. A Configuration is stored in the form: .. code-block:: none [{"name" : "<name_1>","value" : "<value_1>"}, {"name" : "<name_2>", value" : "<value_2>"},...] However, the passed in configuration parameter can be a list of Configurations such as: .. code-block:: none [[{"name" : "<name_1>","value" : "<value_1>"}, {"name" : "<name_2>", value" : "<value_2>"},...], [{"name" : "<name_3>","value" : "<value_3>"}, {"name" : "<name_4>", value" : "<value_4>"},...],...] Args: compatibility_map (:obj:`dict`, optional): A dictionary mapping values used for backwards compatibility. configuration (:obj:`list`): List of configurations (see above for format). property_key (:obj:`str`, optional): The dictionary entry denoting the property key. Default: ``name`` property_value (:obj:`str`, optional): The dictionary entry denoting the property value. Default: ``value`` update_callable (optional): A callable to which ``self._data`` will be passed as part of ``__setitem__``. update_callable_kwargs (:obj:`dict`, optional): A dictionary of kwargs to pass (along with a body) to the callable. id_to_remap (:obj:`dict`, optional): A dictionary mapping configuration IDs to human-readable container keys. Example: {'custom_region':'googleCloudConfig.customRegion', ... } """ # Use an uber secret class attribute to specify whether other attributes can be assigned by __setattr__. __frozen = False def __init__( self, configuration=None, compatibility_map=None, property_key='name', property_value='value', update_callable=None, update_callable_kwargs=None, id_to_remap=None, ): # Apply overrides to initial data if compatibility_map: for configuration_entry in configuration: configuration_name = configuration_entry.get(property_key) configuration_value = configuration_entry.get(property_value) if configuration_name in compatibility_map: overrides = compatibility_map[configuration_name] override_values = overrides.get('values', {}) configuration_entry[property_key] = overrides['name'] if configuration_value in override_values: configuration_entry[property_value] = override_values[configuration_value] warnings.warn( 'Configuration {}={} has been deprecated. Please use {}={} instead.'.format( configuration_name, configuration_value, overrides['name'], override_values[configuration_value], ), DeprecationWarning, ) else: warnings.warn( 'Configuration {} has been deprecated. Please use {} instead.'.format( configuration_name, overrides['name'] ), DeprecationWarning, ) self._compatibility_map = compatibility_map or {} self.property_key = property_key self.property_value = property_value self._id_to_remap = id_to_remap or {} self._update_callable = update_callable self._update_callable_kwargs = update_callable_kwargs or {} # Ensure the input 'configuration' is properly formatted, handling both single configurations and lists. self._data = [configuration] if isinstance(configuration[0], dict) else configuration self._configuration_index_map = self._create_configuration_index_map() self.__frozen = True def _create_configuration_index_map(self): """ Creates a mapping {config_item_name: (config_item_index, config_list_index)} for efficient lookups. This method facilitates quick identification of the index of a configuration item within the Configurations/self._data list. Here, 1. config_list_index is the index within the Configurations list, i.e. the outer index. 2. config_item_index is the index within an individual Configuration, i.e. the inner index. If our ._data is as follows: [[{"name" : "<name_1>","value" : "<value_1>"}, {"name" : "<name_2>", value" : "<value_2>"}], [{"name" : "<name_3>","value" : "<value_3>"}, {"name" : "<name_4>", value" : "<value_4>"}]] Then configuration_index_map would be returned as: {<name_1>:(0,0), <name_2>:(1,0),, <name_3>:(0,1),, <name_4>:(1,1),} Returns: A configuration index map (:obj:`dict`). Raises: TypeError: If the input data structure is not a list of configurations, if a Configuration is not a list of dictionaries, or if a Configuration does not contain the specified property_key and property_value. """ configuration_index_map = {} for config_list_index, config_list in enumerate(self._data): if not isinstance(config_list, list): raise TypeError('Please pass in a list of configurations') for config_item_index, config_item in enumerate(config_list): if not isinstance(config_item, dict): raise TypeError('A Configuration must be a list of dictionaries') if self.property_key not in config_item or self.property_value not in config_item: raise TypeError( 'Configuration {} does not contain property_key:{} or property_value:{}'.format( config_item, self.property_key, self.property_value ) ) configuration_index_map[config_item[self.property_key]] = (config_item_index, config_list_index) return configuration_index_map def __getattr__(self, key): if not self.__frozen: super().__getattr__(key) return return self.__getitem__(key) def __getitem__(self, key): if key in self._id_to_remap: key = self._id_to_remap[key] if key not in self._configuration_index_map: raise AttributeError(key) index, configuration_index = self._configuration_index_map[key] config = self._data[configuration_index][index] return self._convert_value(config) def __setattr__(self, key, value): self.__setitem__(key, value) def __setitem__(self, key, value): if not self.__frozen: super().__setattr__(key, value) return if key in self._id_to_remap: key = self._id_to_remap[key] if key in self._compatibility_map: overrides = self._compatibility_map[key] if value in self._compatibility_map[key]['values']: warnings.warn( 'Deprecation warning: Configuration {}={} is deprecated on this engine version. ' 'Updating value to {}={}.'.format(key, value, overrides['name'], overrides['values'][value]), DeprecationWarning, ) value = overrides['values'][value] else: warnings.warn( 'Configuration {} has been deprecated. Please use {} instead.'.format(key, overrides['name']), DeprecationWarning, ) key = overrides['name'] if key not in self._configuration_index_map: raise AttributeError(key) index, configuration_index = self._configuration_index_map[key] config = self._data[configuration_index][index] config[self.property_value] = value if self._update_callable: kwargs = dict(body=[config]) kwargs.update(self._update_callable_kwargs) self._update_callable(**kwargs) def __contains__(self, item): return item in self._id_to_remap or item in self._configuration_index_map def __repr__(self): configs = {} for configuration in self._data: for config in configuration: key = config[self.property_key] configs[key] = self._convert_value(config) # If a key has a remapped key, delete the original key and add the remapped key into configs for remapped_key, original_key in self._id_to_remap.items(): if original_key != remapped_key and original_key in configs: configs[remapped_key] = configs[original_key] del configs[original_key] return '{{{}}}'.format(', '.join("'{}': {}".format(k, v) for k, v in configs.items())) def __dir__(self): # Stripping out any values that have multiple words in it. Example: 'Organization account type' # in sch_models.Organization.configuration id_to_remap_cleaned = [key for key in self._id_to_remap.keys() if ' ' not in key] return sorted(list(dir(object)) + list(self.__dict__.keys()) + id_to_remap_cleaned)
[docs] def items(self): """Gets the configuration's items. Returns: A new view of the configuration’s items ((key, value) pairs). """ # To keep the behavior in line with a Python dict's, we'll generate one and then use its items method. configuration_dict = {} for configuration in self._data: for config in configuration: configuration_dict[config[self.property_key]] = self._convert_value(config) for config_property in self._id_to_remap: key = self._id_to_remap[config_property] if key in configuration_dict: configuration_dict[config_property] = configuration_dict[key] del configuration_dict[key] return configuration_dict.items()
[docs] def get(self, key, default=None): """Return the value of key or, if not in the configuration, the default value.""" try: return self[key] except KeyError: return default
[docs] def update(self, configs): """Update instance with a collection of configurations. Args: configs (:obj:`dict`): Dictionary of configurations to use. """ for key, value in configs.items(): self[key] = value
def _convert_value(self, config): if config.get('type') == 'boolean': return json.loads(config[self.property_value]) elif config.get('type') == 'integer': return int(config[self.property_value]) else: return config[self.property_value]
class BaseModel: """Base class for StreamSets Accounts models that essentially just wrap a dictionary. Args: data (:obj:`dict`): The underlying JSON representation of the model. attributes_to_ignore (:obj:`list`, optional): A list of string attributes to mask from being handled by this class' __setattr__ method. Default: ``None``. attributes_to_remap (:obj:`dict`, optional): A dictionary of attributes to remap with the desired attributes as keys and the corresponding property name in the JSON representation as values. Default: ``None``. repr_metadata (:obj:`list`, optional): A list of attributes to use in the model's __repr__ string. Default: ``None``. """ def __new__(cls, *args, **kwargs): instance = super(BaseModel, cls).__new__(cls) super(BaseModel, instance).__setattr__('_data_internal', {}) super(BaseModel, instance).__setattr__('_attributes_to_ignore', []) super(BaseModel, instance).__setattr__('_attributes_to_remap', {}) super(BaseModel, instance).__setattr__('_repr_metadata', []) return instance def __init__(self, data, attributes_to_ignore=None, attributes_to_remap=None, repr_metadata=None): # _data_internal is introduced to help inherited classes that need to load _data when _data is accessed # eg. Pipeline super().__setattr__('_data_internal', data) super().__setattr__('_attributes_to_ignore', attributes_to_ignore or []) super().__setattr__('_attributes_to_remap', attributes_to_remap or {}) super().__setattr__('_repr_metadata', repr_metadata or []) # By default these properties don't do anything by can be overrided by inherited classes to load something @property def _data_internal(self): return self.__dict__['_data'] if '_data' in self.__dict__ else None @_data_internal.setter def _data_internal(self, data): self.__dict__['_data'] = data @property def _data(self): return self._data_internal @_data.setter def _data(self, data): self._data_internal = data def __getattr__(self, name): name_ = python_to_json_style(name) if name in self._attributes_to_remap: remapped_name = self._attributes_to_remap[name] return self._data_internal[remapped_name] elif ( name_ in self._data_internal and name not in self._attributes_to_ignore and name not in self._attributes_to_remap.values() ): return self._data_internal[name_] raise AttributeError('Could not find attribute {}.'.format(name_)) def __setattr__(self, name, value): name_ = python_to_json_style(name) if name in self._attributes_to_remap: remapped_name = self._attributes_to_remap[name] self._data_internal[remapped_name] = value elif ( name_ in self._data_internal and name not in self._attributes_to_ignore and name not in self._attributes_to_remap.values() ): self._data_internal[name_] = value else: super().__setattr__(name, value) def __dir__(self): return sorted( list(dir(object)) + list(self.__dict__.keys()) + list( json_to_python_style(key) for key in self._data_internal.keys() if key not in (list(self._attributes_to_remap.values()) + self._attributes_to_ignore) ) + list(self._attributes_to_remap.keys()) ) def __eq__(self, other): return self._data_internal == other._data_internal def __repr__(self): return '<{} ({})>'.format( self.__class__.__name__, ', '.join('{}={}'.format(key, getattr(self, key)) for key in self._repr_metadata) ) class ModelCollection: """Base class wrapper with Abstractions. Args: streamsets_entity: An instance of underlysing StreamSets entity e.g. :py:class:`streamsets.sdk.next.Next` or :py:class:`streamsets.sdk.accounts.Accounts`. """ def __init__(self, streamsets_entity): self._streamsets_entity = streamsets_entity self._id_attr = 'id' def _get_all_results_from_api(self, **kwargs): """Used to get multiple (all) results from api. Args: Optional arguments to be passed to filter the results. Returns: A :obj:`collections.namedtuple`: of results (:py:class:`streamsets.sdk.utils.SeekableList`): a SeekableList of inherited instances of :py:class:`streamsets.sdk.models.BaseModel` and kwargs (:obj:`dict`): a dict of local variables not used in this function. """ pass def __iter__(self): """Enables the list enumeration or iteration.""" for item in self._get_all_results_from_api().results: yield item def __getitem__(self, i): """Enables the user to fetch items by index. Args: i (:obj:`int`): Index of the item. Returns: An inherited instance of :py:class:`streamsets.sdk.models.BaseModel`. """ return self._get_all_results_from_api().results[i] def __len__(self): """Provides length (count) of items. Returns: A :py:obj:`int` object. """ return len(self._get_all_results_from_api().results) def __contains__(self, item_given): """Checks if given item is in the list of items by comparing the ids. Returns: A :py:obj:`boolean` object. """ return self.contains(**{self._id_attr: getattr(item_given, self._id_attr)}) def get(self, **kwargs): """ Args: **kwargs: Optional arguments to be passed to filter the results offline. Returns: An inherited instance of :py:class:`streamsets.sdk.models.BaseModel`. """ result, new_kwargs = self._get_all_results_from_api(**kwargs) return result.get(**new_kwargs) def get_all(self, **kwargs): """ Args: **kwargs: Optional other arguments to be passed to filter the results offline. Returns: A :py:obj:`streamsets.sdk.utils.SeekableList` of inherited instances of :py:class:`streamsets.sdk.models.BaseModel`. """ result, new_kwargs = self._get_all_results_from_api(**kwargs) return result.get_all(**new_kwargs) def __repr__(self): return str(self._get_all_results_from_api().results) def contains(self, **kwargs): """ Args: **kwargs: Optional arguments to be passed to filter the results offline. Returns: A :py:obj:`boolean` object. """ try: self.get(**kwargs) except ValueError: return False return True class _StageWithPredicates: """Pipeline Stage extension to include a predicate configuration and handle the output lanes. This class is expected to inherit from either SDC or ST Stage class. Args: variable_output_drive (:obj:`str`, optional): Configuration name that drives the output lanes available. Default: ``None``. """ def __init__(self, variable_output_drive=None, *args, **kwargs): self.variable_output_drive = variable_output_drive super().__init__(*args, **kwargs) if not self.predicates: self.predicates = ['default'] def _prepare_predicates(self, predicates): """Validate a configuration dictionary and add an output lane if necessary. Args: predicates (:obj:`list`): A list of predicates in predicate/outputLane form or as predicates. Returns: A corresponding :obj:`list` of dictionaries containing the predicate and outputLane for each entry. """ formatted_predicates = [] for predicate in predicates: if not isinstance(predicate, dict): predicate = {'predicate': str(predicate)} output_lane = predicate.get( 'outputLane', '{}OutputLane{}'.format(self._data['instanceName'], str(uuid4())).replace('-', '_') ) if not predicate.get('predicate', None): raise ValueError('Output Lane drives should have a predicate key.') if not predicate.get('outputLane', None): output_lane_config = {'outputLane': output_lane} # We make sure that the predicate is in the form {'outputLane': 'value', 'predicate': 'value'} to be # consistent with the UI predicate = {**output_lane_config, **predicate} formatted_predicates.append(predicate) return formatted_predicates def _set_config(self, config_property, value): # Check whether the configuration asked for is driving the output lanes. if self.variable_output_drive == config_property.config_name: if isinstance(value, list): self.predicates = value else: raise ValueError('Configuration must be a list of predicates or a list of dictionaries.') else: super()._set_config(config_property, value) def _disconnect_and_remove_predicate(self, i): deleted_predicate = self.predicates.pop(i) output_lane = deleted_predicate['outputLane'] output_lane_index = self.output_lanes.index(output_lane) self.disconnect_output_lanes(output_lane_index=output_lane_index) self._data['outputLanes'].remove(output_lane) self.output_streams -= 1 @property def predicates(self): """ Get the predicate list for this stage. Predicates define the output lanes number and behaviour. Set it using either a list of predicates or a list of the full dictionary specifying the output lane. Example: stage.predicates = ['>0'] stage.predicates = ['>0', 'default'] stage.predicates = [{'predicate':'>0', 'outputLane': 'lane1'}] Returns: A :obj:`list` of the predicates. """ return self.configuration[self.variable_output_drive] @predicates.setter def predicates(self, predicates): predicates = self._prepare_predicates(predicates) # Create default condition if not present if not [predicate for predicate in predicates if predicate['predicate'] == 'default']: default_predicate = self._prepare_predicates(['default']) predicates.extend(default_predicate) if not predicates[-1]['predicate'] == 'default': raise ValueError('The default predicate must be placed at the end of the list.') self.disconnect_output_lanes(all_stages=True) self.output_streams = 0 self._data['outputLanes'] = [] self._output_lane_idx = 0 # Create output lanes in parent stage for each predicate for predicate in predicates: self._data['outputLanes'].append(predicate['outputLane']) self.output_streams += 1 self.configuration[self.variable_output_drive] = predicates def add_predicates(self, predicates): """Add a predicate. Example: stage.add_predicates(['>0']) stage.add_predicates([{'predicate':'>0', 'outputLane':'lane1'}]) stage.add_predicates(['>0' ,'=0']) Args: predicates (:obj:`list`): The list of predicates to add. """ if not isinstance(predicates, list): raise ValueError('Predicates should be a list.') formatted_predicates = self._prepare_predicates(predicates) for new_predicate in formatted_predicates: self._data['outputLanes'].insert(0, new_predicate['outputLane']) self.predicates.insert(0, new_predicate) self.output_streams += 1 def remove_predicate(self, predicate): """Remove a predicate. Example: stage.remove_predicate(stage.predicates[0]) stage.remove_predicate({'predicate':'>0', 'outputLane':'lane1'}) Args: predicate (:obj:`dict`): The predicate to delete as a dictionary including the outputLane. """ clean_predicate = next(iter(self._prepare_predicates([predicate])), {}) if not clean_predicate: raise ValueError("Need to specify a predicate") if clean_predicate.get('predicate') == 'default': raise ValueError("Can't delete the default predicate.") for i, found_predicate in enumerate(self.predicates): if found_predicate == clean_predicate: self._disconnect_and_remove_predicate(i) return raise ValueError("Can't find target predicate in the predicates list.")