Skip to content

Commit

Permalink
send logs separately. add unpin for unsended datalog
Browse files Browse the repository at this point in the history
  • Loading branch information
tubleronchik committed Aug 21, 2024
1 parent c6204f9 commit da47f80
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
40 changes: 33 additions & 7 deletions custom_components/robonomics_report_service/ipfs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import typing as tp
import os
import json

from homeassistant.core import HomeAssistant
from pinatapy import PinataPy
Expand All @@ -18,6 +20,12 @@ async def pin_to_pinata(self, dirname: str) -> tp.Optional[str]:
if pinata is not None:
ipfs_hash = await self.hass.async_add_executor_job(self._pin_to_pinata, dirname, pinata)
return ipfs_hash

async def unpin_from_pinata(self, ipfs_hashes_dict: str) -> tp.Optional[str]:
pinata = await self._get_pinata_with_creds()
ipfs_hashes_dict = json.loads(ipfs_hashes_dict)
if pinata is not None:
await self.hass.async_add_executor_job(self._unpin_from_pinata, ipfs_hashes_dict, pinata)

async def _get_pinata_with_creds(self) -> tp.Optional[PinataPy]:
storage_data = await async_load_from_store(self.hass, STORAGE_PINATA_CREDS)
Expand All @@ -27,13 +35,31 @@ async def _get_pinata_with_creds(self) -> tp.Optional[PinataPy]:
def _pin_to_pinata(self, dirname: str, pinata: PinataPy) -> tp.Optional[str]:
try:
res = None
res = pinata.pin_file_to_ipfs(dirname, save_absolute_paths=False)
ipfs_hash: tp.Optional[str] = res.get("IpfsHash")
if ipfs_hash is None:
_LOGGER.error(f"Can't pin to pinata with responce response: {res}")
return None
_LOGGER.debug(f"Directory {dirname} was added to Pinata with cid: {ipfs_hash}")
return ipfs_hash
dict_with_hashes = {}

_LOGGER.debug(f"tmp dir: {dirname}")
file_names = [f for f in os.listdir(dirname) if os.path.isfile(os.path.join(dirname, f))]
_LOGGER.debug(f"file names: {file_names}")
for file in file_names:
path_to_file = f"{dirname}/{file}"
res = pinata.pin_file_to_ipfs(path_to_file, save_absolute_paths=False)
ipfs_hash: tp.Optional[str] = res.get("IpfsHash")
if ipfs_hash:
_LOGGER.debug(f"Added file {file} to Pinata. Hash is: {ipfs_hash}")
dict_with_hashes[file] = ipfs_hash

else:
_LOGGER.error(f"Can't pin to pinata with responce response: {res}")
_LOGGER.debug(f"Dict with hashes: {dict_with_hashes}")
if dict_with_hashes:
return dict_with_hashes
except Exception as e:
_LOGGER.error(f"Exception in pinata pin: {e}, pinata response: {res}")

def _unpin_from_pinata(self, ipfs_hashes_dict: tp.Dict, pinata: PinataPy) -> None:
_LOGGER.debug(f"Start removing pins: {ipfs_hashes_dict}")
for key in ipfs_hashes_dict:
current_hash = ipfs_hashes_dict[key]
res = pinata.remove_pin_from_ipfs(current_hash)
_LOGGER.debug(f"Remove response for pin {current_hash}: {res}")

2 changes: 1 addition & 1 deletion custom_components/robonomics_report_service/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Robonomics Report Service",
"config_flow": true,
"codeowners": ["@pinoutcloud"],
"version": "0.1.1",
"version": "0.2.0",
"dependencies": ["persistent_notification", "http", "frontend"],
"requirements": ["robonomics-interface~=1.6.0", "pinatapy-vourhey==0.1.9", "tenacity==8.2.2", "py-ws-libp2p-proxy==0.2.0"],
"documentation": "https://wiki.robonomics.network/",
Expand Down
6 changes: 3 additions & 3 deletions custom_components/robonomics_report_service/report_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def register(self) -> None:
async def send_problem_report(self, call: ServiceCall) -> None:
_LOGGER.debug(f"send problem service: {call.data.get('description')}")
tempdir = await self._create_temp_dir_with_report_data(call)
ipfs_hash = await self.ipfs.pin_to_pinata(tempdir)
data_to_send = await self.ipfs.pin_to_pinata(tempdir)
self._remove_tempdir(tempdir)
if ipfs_hash is not None:
await self.robonomics.send_datalog(ipfs_hash)
if data_to_send is not None:
await self.robonomics.send_datalog(json.dumps(data_to_send))

async def _create_temp_dir_with_report_data(self, call: ServiceCall) -> str:
if call.data.get("attach_logs"):
Expand Down
27 changes: 16 additions & 11 deletions custom_components/robonomics_report_service/robonomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from collections import deque

from .const import ROBONOMICS_WSS, OWNER_ADDRESS
from .ipfs import IPFS

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,8 +56,8 @@ async def wait_for_rws(self) -> None:
while self.subscriber is not None:
await asyncio.sleep(1)

async def send_datalog(self, ipfs_hash: str) -> None:
await self._handle_datalog_request(ipfs_hash)
async def send_datalog(self, data_to_send: str) -> None:
await self._handle_datalog_request(data_to_send)

def _retry_decorator(func: tp.Callable):
def wrapper(self, *args, **kwargs):
Expand All @@ -77,35 +78,39 @@ def wrapper(self, *args, **kwargs):
raise e
else:
_LOGGER.warning(f"Datalog sending exception: {e}")
return None
return False
except Exception as e:
_LOGGER.warning(f"Datalog sending exeption: {e}")
return None
return False

return wrapper

async def _handle_datalog_request(self, ipfs_hash: str) -> None:
self._datalog_queue.append(ipfs_hash)
async def _handle_datalog_request(self, data_to_send: str) -> None:
self._datalog_queue.append(data_to_send)
_LOGGER.debug(f"New datalog request, queue length: {len(self._datalog_queue)}")
if not self._datalogs_are_sending:
await self._async_send_datalog_from_queue()

async def _async_send_datalog_from_queue(self) -> None:
self._datalogs_are_sending = True
ipfs_hash = self._datalog_queue.popleft()
await self.hass.async_add_executor_job(self._send_datalog, ipfs_hash)
data_to_send = self._datalog_queue.popleft()
res = await self.hass.async_add_executor_job(self._send_datalog, data_to_send)
if not res:
await IPFS(self.hass).unpin_from_pinata(data_to_send)
if len(self._datalog_queue) > 0:
asyncio.ensure_future(self._async_send_datalog_from_queue())
else:
self._datalogs_are_sending = False

@_retry_decorator
def _send_datalog(self, ipfs_hash: str) -> None:
_LOGGER.debug(f"Start creating datalog with ipfs hash: {ipfs_hash}")
def _send_datalog(self, data_to_send: str) -> bool:
_LOGGER.debug(f"Start creating datalog with data: {data_to_send}")
datalog = Datalog(
self.sender_account, rws_sub_owner=OWNER_ADDRESS
)
receipt = datalog.record(ipfs_hash)
receipt = datalog.record(data_to_send)
_LOGGER.debug(f"Datalog created with hash: {receipt}, {len(self._datalog_queue)} datalogs left in the queue")
return True

def _check_sender_in_rws(self) -> bool:
rws = RWS(self.sender_account)
Expand Down

0 comments on commit da47f80

Please sign in to comment.