# Copyright 2019 StreamSets Inc.
"""Common exceptions."""
# fmt: off
from json.decoder import JSONDecodeError
from requests.exceptions import HTTPError
# fmt: on
[docs]class InternalServerError(HTTPError):
"""Internal server error."""
def __init__(self, response):
self.status_code = response.status_code
try:
self.response = response.json()
self.text = self.response.get('RemoteException', {}).get('localizedMessage')
if self.text is None:
# if text is None, and we did not get a JSONDecodeError,
# there is an error message in the JSON that is not formatted as expected
self.text = str(self.response)
except JSONDecodeError:
self.text = response.text
# Propagate message from the REST interface as message of the exception
super().__init__(self.text)
[docs]class BadRequestError(HTTPError):
"""Bad request error (HTTP 400)."""
def __init__(self, response):
super().__init__(response.text)
[docs]class LegacyDeploymentInactiveError(Exception):
"""Legacy deployment status changed into INACTIVE_ERROR."""
def __init__(self, message):
self.message = message
[docs]class ValidationError(Exception):
"""Validation issues."""
def __init__(self, issues):
self.issues = issues
[docs]class JobInactiveError(Exception):
"""Job status changed into INACTIVE_ERROR."""
def __init__(self, message):
self.message = message
[docs]class JobRunnerError(Exception):
"""JobRunner errors."""
def __init__(self, code, message):
super().__init__('{}: {}'.format(code, message))
self.code = code
self.message = message
[docs]class UnsupportedMethodError(Exception):
"""An unsupported method was called."""
def __init__(self, message):
self.message = message
[docs]class TopologyIssuesError(Exception):
"""Topology has some issues."""
def __init__(self, issues):
self.message = issues
[docs]class InvalidCredentialsError(Exception):
"""Invalid credentials error."""
def __init__(self, message):
self.message = message
[docs]class ConnectionError(Exception):
"""Connection Catalog errors."""
def __init__(self, code, message):
super().__init__('{}: {}'.format(code, message))
self.message = message
self.code = code
[docs]class MultipleIssuesError(Exception):
"""Multiple errors were returned."""
def __init__(self, errors):
self.errors = errors
self.message = 'Multiple issues encountered'
super().__init__('{}: {}'.format(self.message, self.errors))
def __getitem__(self, item):
return self.errors[item]
def __repr__(self):
return str('{}: {}'.format(self.message, self.errors))
[docs]class UnprocessableEntityError(HTTPError):
"""Unprocessable Entity Error (HTTP 422)."""
def __init__(self, response):
self.response = response.json()
# The error response from ASTER's API puts the error in a list, so we index into the list to get the message
self.message = self.response['errors'][0]['message']
self.code = self.response['status']
# Propagate the message from the API interface
super().__init__('{}: {}'.format(self.code, self.message))
[docs]class InvalidVersionError(ValueError):
"""The Version number could not be parsed."""
def __init__(self, input):
self.input = input
super().__init__("{!r} is not a valid version.".format(self.input))
[docs]class StatusError(Exception):
"""Parent class for pipeline status errors."""
def __init__(self, response):
self.response = response
self.message = self.response.get('message')
if self.message:
super().__init__(self.message)
else:
self.message = 'Unknown status message'
super().__init__(response)
[docs]class StartError(StatusError):
"""Pipeline start error."""
pass
[docs]class StartingError(StatusError):
"""Pipeline starting error."""
pass
[docs]class RunError(StatusError):
"""Pipeline running error."""
pass
[docs]class RunningError(StatusError):
"""Pipeline run error."""
pass
[docs]class ConnectError(StatusError):
"""Pipeline connect error."""
pass
[docs]class InvalidError(StatusError):
"""Invalid stage config error."""
pass
[docs]class ServiceDefinitionNotFound(Exception):
"""ServiceDefinition errors."""
def __init__(self, message):
super().__init__('{}'.format(message))
self.message = message
[docs]class EnginelessError(Exception):
"""Publishing an engineless designed pipeline error"""
def __init__(self, message):
self.message = message