Skip to content

Commit

Permalink
WIP: Add protos and updated protos for mitm_mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Grennith committed Oct 22, 2023
1 parent 831384a commit f63c256
Show file tree
Hide file tree
Showing 38 changed files with 106,076 additions and 538 deletions.
6 changes: 4 additions & 2 deletions mapadroid/data_handler/StandaloneMitmMapperAndStatsHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ async def stats_collect_seen_type(self, encounter_ids: List[int], type_of_detect
async def get_last_possibly_moved(self, worker: str) -> int:
return await self.__mitm_data_handler.get_last_possibly_moved(worker)

async def update_latest(self, worker: str, key: str, value: Union[list, dict], timestamp_received_raw: float = None,
async def update_latest(self, worker: str, key: str, value: Union[List, Dict, bytes],
timestamp_received_raw: float = None,
timestamp_received_receiver: float = None, location: Location = None) -> None:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, self.__mitm_data_handler.update_latest, worker, key, value, timestamp_received_raw,
loop.run_in_executor(None, self.__mitm_data_handler.update_latest, worker, key, value,
timestamp_received_raw,
timestamp_received_receiver, location)

async def request_latest(self, worker: str, key: str,
Expand Down
4 changes: 3 additions & 1 deletion mapadroid/data_handler/grpc/MitmMapperClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def get_last_possibly_moved(self, worker: str) -> int:
# TODO: Return time.time() to continue scans or throw a custom exception that needs to be handled?
return 0

async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
async def update_latest(self, worker: str, key: str, value: Union[List, Dict, bytes],
timestamp_received_raw: float = None,
timestamp_received_receiver: float = None, location: Location = None) -> None:
request: mitm_mapper_pb2.LatestMitmDataEntryUpdateRequest = mitm_mapper_pb2.LatestMitmDataEntryUpdateRequest()
Expand All @@ -86,6 +86,8 @@ async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
request.data.some_list.extend(value)
elif isinstance(value, dict):
request.data.some_dictionary.update(value)
elif isinstance(value, bytes):
request.data.raw_message = value
else:
raise ValueError("Cannot handle data")
try:
Expand Down
2 changes: 1 addition & 1 deletion mapadroid/data_handler/mitm_data/AbstractMitmMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def get_last_possibly_moved(self, worker: str) -> int:
pass

@abstractmethod
async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
async def update_latest(self, worker: str, key: str, value: Union[List, Dict, bytes],
timestamp_received_raw: float = None,
timestamp_received_receiver: float = None, location: Location = None) -> None:
pass
Expand Down
2 changes: 1 addition & 1 deletion mapadroid/data_handler/mitm_data/MitmMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self):
async def get_last_possibly_moved(self, worker: str) -> int:
return await self._mitm_data_handler.get_last_possibly_moved(worker)

async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
async def update_latest(self, worker: str, key: str, value: Union[List, Dict, bytes],
timestamp_received_raw: float = None,
timestamp_received_receiver: float = None, location: Location = None) -> None:
loop = asyncio.get_running_loop()
Expand Down
19 changes: 13 additions & 6 deletions mapadroid/data_handler/mitm_data/RedisMitmMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from mapadroid.db.DbWrapper import DbWrapper
from mapadroid.utils.ProtoIdentifier import ProtoIdentifier
from mapadroid.utils.collections import Location
import mapadroid.mitm_receiver.protos.Rpc_pb2 as pogoprotos


class RedisMitmMapper(AbstractMitmMapper):
Expand Down Expand Up @@ -46,7 +47,7 @@ async def get_last_possibly_moved(self, worker: str) -> int:
last_moved: Optional[int] = await self.__cache.get(RedisMitmMapper.LAST_POSSIBLY_MOVED_KEY.format(worker))
return int(last_moved) if last_moved else 0

async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
async def update_latest(self, worker: str, key: str, value: Union[List, Dict, bytes],
timestamp_received_raw: float = None,
timestamp_received_receiver: float = None, location: Location = None) -> None:
if timestamp_received_raw is None:
Expand All @@ -71,12 +72,18 @@ async def update_latest(self, worker: str, key: str, value: Union[List, Dict],
await self.__parse_gmo_for_location(worker, value, timestamp_received_raw, location)
await self.__cache.set(RedisMitmMapper.IS_INJECTED_KEY.format(worker), 1)

async def __parse_gmo_for_location(self, worker: str, gmo_payload: Dict, timestamp: int,
async def __parse_gmo_for_location(self, worker: str, gmo_payload: Union[Dict, bytes], timestamp: int,
location: Optional[Location]):
cells = gmo_payload.get("cells", None)
if not cells:
return
cell_ids: List[int] = [cell['id'] for cell in cells]
if isinstance(gmo_payload, dict):
cells = gmo_payload.get("cells", None)
if not cells:
return
cell_ids: List[int] = [cell['id'] for cell in cells]
else:
# Raw proto...
gmo_proto: pogoprotos.GetMapObjectsOutProto = pogoprotos.GetMapObjectsOutProto.ParseFromString(
gmo_payload)
cell_ids: List[int] = [cell.s2_cell_id for cell in gmo_proto.map_cell]
last_cell_ids_raw: Optional[str] = await self.__cache.get(RedisMitmMapper.LAST_CELL_IDS_KEY.format(worker))
last_cell_ids: List[int] = []
if last_cell_ids_raw:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

class LatestMitmDataEntry:
def __init__(self, location: Optional[Location], timestamp_received: Optional[int],
timestamp_of_data_retrieval: Optional[int], data: Union[List, Dict]):
timestamp_of_data_retrieval: Optional[int], data: Union[List, Dict, bytes]):
self.location: Optional[Location] = location
# The time MAD received the data from a device/worker
self.timestamp_received: Optional[int] = timestamp_received
# The time that the device/worker received the data
self.timestamp_of_data_retrieval: Optional[int] = timestamp_of_data_retrieval
self.data: Union[List, Dict] = data
self.data: Union[List, Dict, bytes] = data

@staticmethod
async def from_json(json_data: Union[bytes, str]) -> Optional[LatestMitmDataEntry]:
Expand Down
Loading

0 comments on commit f63c256

Please sign in to comment.