Skip to content

Commit

Permalink
feat: Include problematic path and value in validation error (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
cetteup committed Aug 28, 2024
1 parent abcce6a commit be2d994
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 149 deletions.
74 changes: 52 additions & 22 deletions aspxstats/bf2/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional, Union, Callable
from typing import Dict, Optional, Union, Callable, Tuple, Any

from .schemas import GETLEADERBOARD_RESPONSE_SCHEMA, SEARCHFORPLAYERS_RESPONSE_SCHEMA, \
GETPLAYERINFO_GENERAL_STATS_RESPONSE_SCHEMA, GETPLAYERINFO_MAP_STATS_RESPONSE_SCHEMA, GETRANKINFO_RESPONSE_SCHEMA, \
Expand Down Expand Up @@ -63,14 +63,18 @@ def validate_and_parse_searchforplayers_response(self, raw_data: str) -> dict:
ParseTarget('results', as_list=True)
])

valid_data = self.is_valid_searchforplayers_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_searchforplayers_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid searchforplayers response data')
raise InvalidResponseError(
f'{self.provider} returned invalid searchforplayers response data',
invalid_path,
invalid_attribute
)

return self.parse_searchforplayers_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_searchforplayers_response_data(parsed: dict) -> bool:
def is_valid_searchforplayers_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
return is_valid_dict(parsed, SEARCHFORPLAYERS_RESPONSE_SCHEMA)

@staticmethod
Expand Down Expand Up @@ -132,14 +136,18 @@ def validate_and_parse_getleaderboard_response(self, raw_data: str) -> dict:
ParseTarget('entries', as_list=True)
])

valid_data = self.is_valid_getleaderboard_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getleaderboard_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getleaderboard response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getleaderboard response data',
invalid_path,
invalid_attribute
)

return self.parse_getleaderboard_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_getleaderboard_response_data(parsed: dict) -> bool:
def is_valid_getleaderboard_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
# TODO: Add per-leaderboard validation with respective attributes
return is_valid_dict(parsed, GETLEADERBOARD_RESPONSE_SCHEMA)

Expand Down Expand Up @@ -192,9 +200,13 @@ def validate_and_parse_getplayerinfo_response(self, key_set: PlayerinfoKeySet, r

parsed = self.fix_getplayerinfo_zero_values(parsed)

valid_data = self.is_valid_getplayerinfo_response_data(key_set, parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getplayerinfo_response_data(key_set, parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getplayerinfo response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getplayerinfo response data',
invalid_path,
invalid_attribute
)

return self.parse_getplayerinfo_response_values(key_set, parsed, self.cleaners)

Expand Down Expand Up @@ -232,7 +244,9 @@ def fix_getplayerinfo_zero_values(parsed: dict) -> dict:
return parsed

@staticmethod
def is_valid_getplayerinfo_response_data(key_set: PlayerinfoKeySet, parsed: dict) -> bool:
def is_valid_getplayerinfo_response_data(
key_set: PlayerinfoKeySet, parsed: dict
) -> Tuple[bool, Optional[str], Optional[Any]]:
if key_set is PlayerinfoKeySet.GENERAL_STATS:
return is_valid_dict(parsed, GETPLAYERINFO_GENERAL_STATS_RESPONSE_SCHEMA)
else:
Expand Down Expand Up @@ -276,14 +290,18 @@ def validate_and_parse_getrankinfo_response(self, raw_data: str) -> dict:
ParseTarget('data')
])

valid_data = self.is_valid_getrankinfo_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getrankinfo_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getrankinfo response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getrankinfo response data',
invalid_path,
invalid_attribute
)

return self.parse_getrankinfo_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_getrankinfo_response_data(parsed: dict) -> bool:
def is_valid_getrankinfo_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
return is_valid_dict(parsed, GETRANKINFO_RESPONSE_SCHEMA)

@staticmethod
Expand Down Expand Up @@ -314,14 +332,18 @@ def validate_and_parse_getawardsinfo_response(self, raw_data: str) -> dict:
ParseTarget('data', as_list=True)
])

valid_data = self.is_valid_getawardsinfo_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getawardsinfo_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getawardsinfo response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getawardsinfo response data',
invalid_path,
invalid_attribute
)

return self.parse_getawardsinfo_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_getawardsinfo_response_data(parsed: dict) -> bool:
def is_valid_getawardsinfo_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
return is_valid_dict(parsed, GETAWARDSINFO_RESPONSE_SCHEMA)

@staticmethod
Expand Down Expand Up @@ -353,14 +375,18 @@ def validate_and_parse_getunlocksinfo_response(self, raw_data: str) -> dict:
ParseTarget('data', as_list=True)
])

valid_data = self.is_valid_getunlocksinfo_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getunlocksinfo_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getunlocksinfo response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getunlocksinfo response data',
invalid_path,
invalid_attribute
)

return self.parse_getunlocksinfo_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_getunlocksinfo_response_data(parsed: dict) -> bool:
def is_valid_getunlocksinfo_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
return is_valid_dict(parsed, GETUNLOCKSINFO_RESPONSE_SCHEMA)

@staticmethod
Expand All @@ -386,14 +412,18 @@ def validate_and_parse_getbackendinfo_response(self, raw_data: str) -> dict:
ParseTarget('unlocks', as_list=True)
])

valid_data = self.is_valid_getbackendinfo_response_data(parsed)
valid_data, invalid_path, invalid_attribute = self.is_valid_getbackendinfo_response_data(parsed)
if not valid_data:
raise InvalidResponseError(f'{self.provider} returned invalid getbackendinfo response data')
raise InvalidResponseError(
f'{self.provider} returned invalid getbackendinfo response data',
invalid_path,
invalid_attribute
)

return self.parse_getbackendinfo_response_values(parsed, self.cleaners)

@staticmethod
def is_valid_getbackendinfo_response_data(parsed: dict) -> bool:
def is_valid_getbackendinfo_response_data(parsed: dict) -> Tuple[bool, Optional[str], Optional[Any]]:
return is_valid_dict(parsed, GETBACKENDINFO_RESPONSE_SCHEMA)

@staticmethod
Expand Down
17 changes: 16 additions & 1 deletion aspxstats/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Optional, Any


class Error(Exception):
pass

Expand All @@ -11,7 +14,19 @@ class TimeoutError(Error):


class InvalidResponseError(Error):
pass
path: Optional[str]
attribute: Optional[Any]

def __init__(self, message: str, path: Optional[str] = None, attribute: Optional[Any] = None):
super().__init__(message)
self.path = path
self.attribute = attribute

def __str__(self) -> str:
if self.path is not None and self.attribute is not None:
return f'{super().__str__()}: {self.path}={self.attribute}'

return super().__str__()


class NotFoundError(Error):
Expand Down
51 changes: 37 additions & 14 deletions aspxstats/validation.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,46 @@
from typing import Dict, Union
from typing import Dict, Union, Tuple, Optional, Any

from .schema import AttributeSchema


def is_valid_dict(data: dict, schema: Dict[str, Union[dict, AttributeSchema]]) -> bool:
return all(is_valid_attribute(data.get(key), attribute_schema) for (key, attribute_schema) in schema.items())
def is_valid_dict(
data: dict,
schema: Dict[str, Union[dict, AttributeSchema]],
root: str = ''
) -> Tuple[bool, Optional[str], Optional[Any]]:
for key, attribute_schema in schema.items():
valid, path, attribute = is_valid_attribute(join(root, key), data.get(key), attribute_schema)
if not valid:
return False, path, attribute
return True, None, None


def is_valid_attribute(attribute: Union[str, dict, list], schema: Dict[str, Union[dict, AttributeSchema]]) -> bool:
def is_valid_attribute(
path: str,
attribute: Union[str, dict, list],
schema: Union[AttributeSchema, Dict[str, AttributeSchema]]
) -> Tuple[bool, Optional[str], Optional[Any]]:
if isinstance(schema, AttributeSchema):
if isinstance(attribute, str) and schema.type == str and schema.is_numeric:
return is_numeric(attribute)
return is_numeric(attribute), path, attribute
if isinstance(attribute, str) and schema.type == str and schema.is_booly:
return is_booly(attribute)
return is_booly(attribute), path, attribute
if isinstance(attribute, str) and schema.type == str and schema.is_floaty:
return is_floaty(attribute)
return is_floaty(attribute), path, attribute
if isinstance(attribute, str) and schema.type == str and schema.is_ratio:
return is_ratio(attribute)
return is_ratio(attribute), path, attribute
if isinstance(attribute, list) and schema.type == list:
return all(is_valid_attribute(child, schema.children) for child in attribute)
else:
return isinstance(attribute, schema.type)
for index, child in enumerate(attribute):
child_valid, child_path, child_attribute = is_valid_attribute(join(path, index), child, schema.children)
if not child_valid:
return False, child_path, child_attribute
return True, None, None

return isinstance(attribute, schema.type), path, attribute
elif isinstance(attribute, dict) and isinstance(schema, dict):
return is_valid_dict(attribute, schema)
else:
return False
return is_valid_dict(attribute, schema, root=path)

return False, path, attribute


def is_numeric(value: str) -> bool:
Expand Down Expand Up @@ -67,3 +83,10 @@ def is_ratio(value: str) -> bool:
"""
elements = value.split(':', 1)
return len(elements) == 2 and all(is_numeric(elem) for elem in elements) or value == '0'


def join(path: str, key: Union[str, int]) -> str:
if path == '':
return key

return path + '.' + str(key)
Loading

0 comments on commit be2d994

Please sign in to comment.