From d55514f3735a9e2314f2e427d130b4357f621cb3 Mon Sep 17 00:00:00 2001 From: thenav56 Date: Fri, 22 May 2026 18:05:30 +0545 Subject: [PATCH 1/2] feat(molnix): use generator with local cache NOTE: This is not the best approach, but it keeps the changes small to avoid unexpected breaking changes. --- api/management/commands/sync_molnix.py | 12 ++-- api/molnix_utils.py | 79 ++++++++++++++++++++++---- 2 files changed, 75 insertions(+), 16 deletions(-) diff --git a/api/management/commands/sync_molnix.py b/api/management/commands/sync_molnix.py index 88c2ed22a..c9ae86916 100644 --- a/api/management/commands/sync_molnix.py +++ b/api/management/commands/sync_molnix.py @@ -257,13 +257,13 @@ def add_tags_to_obj(obj, tags): def sync_deployments(molnix_deployments, molnix_api, countries): - molnix_ids = [d["id"] for d in molnix_deployments] + molnix_ids = [d["id"] for d in molnix_deployments] # XXX: LOOP 1 warnings = [] messages = [] successful_creates = 0 successful_updates = 0 # Ensure there are PersonnelDeployment instances for every unique emergency - events = [get_go_event(d["tags"]) for d in molnix_deployments] + events = [get_go_event(d["tags"]) for d in molnix_deployments] # XXX: LOOP 2 event_ids = [ev.id for ev in events if ev] unique_event_ids = list(set(event_ids)) for event_id in unique_event_ids: @@ -292,10 +292,10 @@ def sync_deployments(molnix_deployments, molnix_api, countries): p.save() # Create Personnel objects - for md in molnix_deployments: # LOOP1 + for md in molnix_deployments: # XXX: LOOP 3 if "position_id" not in md: # changed structure § md2 = molnix_api.get_deployment(md["id"]) - md |= md2["deployment"] + md |= md2["deployment"] # XXX: Potential issue due to mutation? if skip_this(md["tags"]): warning = "Deployment id %d skipped due to No-GO" % md["id"] logger.warning(warning) @@ -459,13 +459,13 @@ def sync_deployments(molnix_deployments, molnix_api, countries): def sync_open_positions(molnix_positions, molnix_api, countries): - molnix_ids = [p["id"] for p in molnix_positions] + molnix_ids = [p["id"] for p in molnix_positions] # XXX: Loop 1 warnings = [] messages = [] successful_creates = 0 successful_updates = 0 - for position in molnix_positions: # LOOP2 + for position in molnix_positions: # XXX: LOOP 2 logger.warning("× " + str(position["id"])) if skip_this(position["tags"]): warning = "Position id %d skipped due to No-GO" % position["id"] diff --git a/api/molnix_utils.py b/api/molnix_utils.py index bdb320743..35570f78e 100644 --- a/api/molnix_utils.py +++ b/api/molnix_utils.py @@ -1,7 +1,64 @@ import json +import os +import tempfile +import weakref import requests +from api.logger import logger + + +def _safe_unlink(path): + try: + os.unlink(path) + except OSError: + pass + + +class _CachedPaginated: + """Iterable wrapper around a streaming generator factory. + + The first iteration drains the source generator and tees each record into a + JSONL cache file under /tmp/. Subsequent iterations replay from that cache. + Memory stays bounded to one record at a time, even when the caller iterates + the same result more than once (as sync_molnix does for tags + main loop). + """ + + def __init__(self, source_factory, label=""): + self._source_factory = source_factory + self._label = label + fd, self._cache_path = tempfile.mkstemp(suffix=".jsonl", prefix="molnix_paginated_") + os.close(fd) + self._cached = False + self._iter_count = 0 + weakref.finalize(self, _safe_unlink, self._cache_path) + + def __iter__(self): + self._iter_count += 1 + if self._cached: + logger.warning( + "_CachedPaginated[%s] re-iteration #%d from cache %s", + self._label, + self._iter_count, + self._cache_path, + ) + with open(self._cache_path) as f: + for line in f: + yield json.loads(line) + return + logger.warning( + "_CachedPaginated[%s] first iteration #%d streaming from source -> %s", + self._label, + self._iter_count, + self._cache_path, + ) + with open(self._cache_path, "w") as f: + for item in self._source_factory(): + f.write(json.dumps(item)) + f.write("\n") + yield item + self._cached = True + class MolnixApi: @@ -29,19 +86,15 @@ def call_api(self, path, method="GET", params={}): def call_api_paginated(self, path, response_key=None, params={}): page = 1 - next_page = True - results = [] - while next_page: + while True: params["page"] = page data = self.call_api(path=path, params=params) if response_key: data = data[response_key]["data"] - results += data if len(data) == 0: - next_page = False - else: - page += 1 - return results + return + yield from data + page += 1 def login(self): params = {"username": self.username, "password": self.password} @@ -63,7 +116,10 @@ def get_open_positions(self): def get_not_only_open_positions(self): # return self.call_api_paginated(path="positions", response_key="positions") - return self.call_api_paginated(path="positions", response_key="positions", params={"limit": 999999}) + return _CachedPaginated( + lambda: self.call_api_paginated(path="positions", response_key="positions", params={"limit": 999999}), + label="positions", + ) def get_deployments(self): deployments_filter = { @@ -77,7 +133,10 @@ def get_deployments(self): "criterias": "[]", } params = {"filter": json.dumps(deployments_filter)} - return self.call_api_paginated(path="deployments", response_key="deployments", params=params) + return _CachedPaginated( + lambda: self.call_api_paginated(path="deployments", response_key="deployments", params=params), + label="deployments", + ) """ WARNING: If position is not found or generates an error, we return None From 37f1a265febc1054126a1d33949cc24e94dd5af1 Mon Sep 17 00:00:00 2001 From: thenav56 Date: Fri, 22 May 2026 18:16:39 +0545 Subject: [PATCH 2/2] feat(helm): lower the memory requirement for molnix --- deploy/helm/ifrcgo-helm/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy/helm/ifrcgo-helm/values.yaml b/deploy/helm/ifrcgo-helm/values.yaml index aaba31e63..a6db27b07 100644 --- a/deploy/helm/ifrcgo-helm/values.yaml +++ b/deploy/helm/ifrcgo-helm/values.yaml @@ -227,10 +227,10 @@ cronjobs: schedule: '10 */2 * * *' resources: requests: - memory: 5Gi + memory: 1Gi cpu: 0.1 limits: - memory: 6Gi + memory: 2Gi cpu: 4 - command: 'ingest_appeals' schedule: '*/30 * * * *'