Source code for streamsets.sdk.exceptions

# Copyright 2019 StreamSets Inc.

"""Common exceptions."""


[docs]class InternalServerError(Exception): """Internal server error.""" def __init__(self, response): self.status_code = response.status_code self.response = response.json() self.text = self.response['RemoteException']['localizedMessage'] # Propagate message from the REST interface as message of the exception super().__init__(self.text)
[docs]class ActivationError(Exception): """Activation error.""" def __init__(self, reason=None): self.reason = reason def __str__(self): return ('Failed to activate Python SDK ' 'for StreamSets ({}).'.format('reason: {}'.format(self.reason) if self.reason else 'no reason provided'))
[docs]class BadRequestError(Exception): """Bad request error (HTTP 400).""" def __init__(self, response): super().__init__(response.text)
[docs]class ValidationError(Exception): """Validation issues.""" def __init__(self, issues): self.issues = issues
[docs]class JobInactiveError(Exception): """Job status changed into INACTIVE_ERROR.""" def __init__(self, message): self.message = message
[docs]class JobRunnerError(Exception): """JobRunner errors.""" def __init__(self, code, message): super().__init__('{}: {}'.format(code, message)) self.code = code self.message = message
[docs]class DeploymentInactiveError(Exception): """Deployment status changed into INACTIVE_ERROR.""" def __init__(self, message): self.message = message
[docs]class UnsupportedMethodError(Exception): """An unsupported method was called.""" def __init__(self, message): self.message = message
[docs]class TopologyIssuesError(Exception): """Topology has some issues.""" def __init__(self, issues): self.message = issues
[docs]class InvalidCredentialsError(Exception): """Invalid credentials error.""" def __init__(self, message): self.message = message
[docs]class ConnectionError(Exception): """Connection Catalog errors.""" def __init__(self, code, message): super().__init__('{}: {}'.format(code, message)) self.message = message self.code = code
[docs]class MultipleIssuesError(Exception): """Multiple errors were returned.""" def __init__(self, errors): self.errors = errors self.message = 'Multiple issues encountered' super().__init__('{}: {}'.format(self.message, self.errors)) def __getitem__(self, item): return self.errors[item] def __repr__(self): return str('{}: {}'.format(self.message, self.errors))