Source code for streamsets.sdk.exceptions

# Copyright 2019 StreamSets Inc.

"""Common exceptions."""

# fmt: off
from json.decoder import JSONDecodeError

from requests.exceptions import HTTPError

# fmt: on


[docs]class InternalServerError(HTTPError): """Internal server error.""" def __init__(self, response): self.status_code = response.status_code try: self.response = response.json() self.text = self.response.get('RemoteException', {}).get('localizedMessage') if self.text is None: # if text is None, and we did not get a JSONDecodeError, # there is an error message in the JSON that is not formatted as expected self.text = str(self.response) except JSONDecodeError: self.text = response.text # Propagate message from the REST interface as message of the exception super().__init__(self.text)
[docs]class BadRequestError(HTTPError): """Bad request error (HTTP 400).""" def __init__(self, response): super().__init__(response.text)
[docs]class LegacyDeploymentInactiveError(Exception): """Legacy deployment status changed into INACTIVE_ERROR.""" def __init__(self, message): self.message = message
[docs]class ValidationError(Exception): """Validation issues.""" def __init__(self, issues): self.issues = issues
[docs]class JobInactiveError(Exception): """Job status changed into INACTIVE_ERROR.""" def __init__(self, message): self.message = message
[docs]class JobRunnerError(Exception): """JobRunner errors.""" def __init__(self, code, message): super().__init__('{}: {}'.format(code, message)) self.code = code self.message = message
[docs]class UnsupportedMethodError(Exception): """An unsupported method was called.""" def __init__(self, message): self.message = message
[docs]class TopologyIssuesError(Exception): """Topology has some issues.""" def __init__(self, issues): self.message = issues
[docs]class InvalidCredentialsError(Exception): """Invalid credentials error.""" def __init__(self, message): self.message = message
[docs]class ConnectionError(Exception): """Connection Catalog errors.""" def __init__(self, code, message): super().__init__('{}: {}'.format(code, message)) self.message = message self.code = code
[docs]class MultipleIssuesError(Exception): """Multiple errors were returned.""" def __init__(self, errors): self.errors = errors self.message = 'Multiple issues encountered' super().__init__('{}: {}'.format(self.message, self.errors)) def __getitem__(self, item): return self.errors[item] def __repr__(self): return str('{}: {}'.format(self.message, self.errors))
[docs]class UnprocessableEntityError(HTTPError): """Unprocessable Entity Error (HTTP 422).""" def __init__(self, response): self.response = response.json() # The error response from ASTER's API puts the error in a list, so we index into the list to get the message self.message = self.response['errors'][0]['message'] self.code = self.response['status'] # Propagate the message from the API interface super().__init__('{}: {}'.format(self.code, self.message))
[docs]class InvalidVersionError(ValueError): """The Version number could not be parsed.""" def __init__(self, input): self.input = input super().__init__("{!r} is not a valid version.".format(self.input))
[docs]class StatusError(Exception): """Parent class for pipeline status errors.""" def __init__(self, response): self.response = response self.message = self.response.get('message') if self.message: super().__init__(self.message) else: self.message = 'Unknown status message' super().__init__(response)
[docs]class StartError(StatusError): """Pipeline start error.""" pass
[docs]class StartingError(StatusError): """Pipeline starting error.""" pass
[docs]class RunError(StatusError): """Pipeline running error.""" pass
[docs]class RunningError(StatusError): """Pipeline run error.""" pass
[docs]class ConnectError(StatusError): """Pipeline connect error.""" pass
[docs]class InvalidError(StatusError): """Invalid stage config error.""" pass
[docs]class ServiceDefinitionNotFound(Exception): """ServiceDefinition errors.""" def __init__(self, message): super().__init__('{}'.format(message)) self.message = message