Skip to content

Commit

Permalink
don't publish to mqtt, just write to disk
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Oct 29, 2024
1 parent e12b287 commit c2c729a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
13 changes: 3 additions & 10 deletions pioreactor/actions/leader/backup_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pioreactor.config import config
from pioreactor.exc import RsyncError
from pioreactor.logging import create_logger
from pioreactor.pubsub import subscribe
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.networking import resolve_to_address
Expand All @@ -18,15 +18,8 @@


def count_writes_occurring(unit: str) -> int:
msg_or_none = subscribe(
f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/mqtt_to_db_streaming/inserts_in_last_60s",
timeout=2,
)
if msg_or_none is not None:
count = int(msg_or_none.payload.decode())
else:
count = 0
return count
with local_intermittent_storage("mqtt_to_db_streaming") as c:
return c.get("local_intermittent_cache", 0)


def backup_database(output_file: str, force: bool = False, backup_to_workers: int = 0) -> None:
Expand Down
10 changes: 4 additions & 6 deletions pioreactor/background_jobs/leader/mqtt_to_db_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pioreactor.config import config
from pioreactor.hardware import PWM_TO_PIN
from pioreactor.pubsub import QOS
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils.sqlite_worker import Sqlite3Worker
from pioreactor.utils.timing import current_utc_datetime
from pioreactor.utils.timing import RepeatedTimer
Expand Down Expand Up @@ -59,11 +60,6 @@ class TopicToCallback(Struct):

class MqttToDBStreamer(LongRunningBackgroundJob):
job_name = "mqtt_to_db_streaming"
published_settings = {
"inserts_in_last_60s": {"datatype": "integer", "settable": False},
}

inserts_in_last_60s = 0
_inserts_in_last_60s = 0

def __init__(
Expand Down Expand Up @@ -92,7 +88,9 @@ def __init__(
self.initialize_callbacks(topics_and_callbacks)

def publish_stats(self) -> None:
self.inserts_in_last_60s = self._inserts_in_last_60s
with local_intermittent_storage(self.job_name) as c:
c["inserts_in_last_60s"] = self._inserts_in_last_60s

self._inserts_in_last_60s = 0

def on_disconnected(self) -> None:
Expand Down

0 comments on commit c2c729a

Please sign in to comment.