Skip to content

Commit

Permalink
feat: improve NetHSM API exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nponsard authored and robin-nitrokey committed Sep 21, 2023
1 parent fd497e0 commit 659b308
Showing 1 changed file with 55 additions and 12 deletions.
67 changes: 55 additions & 12 deletions pynitrokey/nethsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,49 @@ def __init__(


def _handle_api_exception(e, messages={}, roles=[], state=None):
# give priority to custom messages
if e.status in messages:
message = messages[e.status]
raise NetHSMError(message)

if e.status == 401 and roles:
message = "Unauthorized -- invalid username or password"
elif e.status == 403 and roles:
roles = [role.value for role in roles]
message = "Access denied -- this operation requires the role " + " or ".join(
roles
)
elif e.status == 401 and roles:
message = "Unauthorized -- invalid username or password"
elif e.status == 405:
# 405 "Method Not Allowed" mostly happens when the UserID or KeyID contains a character
# - that ends the path of the URL like a question mark '?' :
# /api/v1/keys/?/cert will hit the keys listing endpoint instead of the key/{KeyID}/cert endpoint
# - that doesn't count as a path parameter like a slash '/' :
# /api/v1/keys///cert will be interpreted as /api/v1/keys/cert with cert as the KeyID
message = "The ID you provided contains invalid characters"
elif e.status == 406:
message = "Invalid content type requested"
elif e.status == 412 and state:
message = f"Precondition failed -- this operation can only be used on a NetHSM in the state {state.value}"
elif e.status == 429:
message = (
"Too many requests -- you may have tried the wrong credentials too often"
)
else:
message = f"Unexpected API error {e.status}: {e.reason}"

if e.api_response:
try:
body = json.loads(e.api_response.response.data)
if "message" in body:
body = None
# "custom" requests
if hasattr(e.api_response, "text") and e.api_response.text != "":
body = json.loads(e.api_response.text)
# generated code
elif (
hasattr(e.api_response, "response")
and e.api_response.response.data != ""
):
body = json.loads(e.api_response.response.data)
if body is not None and "message" in body:
message += "\n" + body["message"]
except json.JSONDecodeError:
pass
Expand Down Expand Up @@ -254,10 +279,11 @@ def request(
method, url, params=params, data=data, headers=headers, json=json
)
if not response.ok:
e = ApiException(status=response.status_code, reason=response.reason)
e.body = response.text
e.headers = response.headers
raise e
raise ApiException(
status=response.status_code,
reason=response.reason,
api_response=response,
)
return response

def get_api(self):
Expand Down Expand Up @@ -295,6 +321,8 @@ def unlock(self, passphrase):
e,
state=State.LOCKED,
messages={
# Doc says 400 could happen when the passphrase is invalid?
400: "Access denied -- wrong unlock passphrase",
403: "Access denied -- wrong unlock passphrase",
},
)
Expand Down Expand Up @@ -326,7 +354,7 @@ def provision(self, unlock_passphrase, admin_passphrase, system_time):
e,
state=State.UNPROVISIONED,
messages={
400: "Malformed request data -- e. g. weak passphrase",
400: "Malformed request data -- e. g. weak passphrase or invalid time",
},
)

Expand Down Expand Up @@ -466,8 +494,9 @@ def add_operator_tag(self, user_id, tag):
state=State.OPERATIONAL,
roles=[Role.ADMINISTRATOR],
messages={
404: f"User {user_id} not found",
304: f"Tag is already present for {user_id}",
400: "Invalid tag format or user is not an operator",
404: f"User {user_id} not found",
},
)

Expand Down Expand Up @@ -553,7 +582,14 @@ def get_random_data(self, n):
response = self.get_api().random_post(body=body)
return response.body["random"]
except ApiException as e:
_handle_api_exception(e, state=State.OPERATIONAL, roles=[Role.OPERATOR])
_handle_api_exception(
e,
state=State.OPERATIONAL,
roles=[Role.OPERATOR],
messages={
400: "Invalid length. Must be between 1 and 1024",
},
)

def get_metrics(self):
try:
Expand Down Expand Up @@ -742,6 +778,7 @@ def generate_key(self, type, mechanisms, length, key_id):
roles=[Role.ADMINISTRATOR],
messages={
400: "Bad request -- invalid input data",
409: f"Conflict -- a key with the ID {key_id} already exists",
},
)

Expand Down Expand Up @@ -819,6 +856,8 @@ def get_key_certificate(self, key_id):
roles=[Role.ADMINISTRATOR, Role.OPERATOR],
messages={
404: f"Certificate for key {key_id} not found",
# The API returns a 406 if there is no certificate or if the key does not exist
406: f"Certificate for key {key_id} not found",
},
)

Expand Down Expand Up @@ -852,6 +891,7 @@ def set_key_certificate(self, key_id, cert, mime_type):
400: "Bad Request -- invalid certificate",
404: f"Key {key_id} not found",
409: f"Conflict -- key {key_id} already has a certificate",
415: "Invalid mime type",
},
)

Expand Down Expand Up @@ -1049,7 +1089,7 @@ def set_time(self, time):
state=State.OPERATIONAL,
roles=[Role.ADMINISTRATOR],
messages={
400: "Bad request -- invalid input data",
400: "Bad request -- invalid time format",
},
)

Expand Down Expand Up @@ -1096,6 +1136,9 @@ def backup(self):
e,
state=State.OPERATIONAL,
roles=[Role.BACKUP],
messages={
412: "NetHSM is not Operational or the backup passphrase is not set",
},
)

def restore(self, backup, passphrase, time):
Expand Down

0 comments on commit 659b308

Please sign in to comment.