Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions api/management/commands/sync_molnix.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ def add_tags_to_obj(obj, tags):


def sync_deployments(molnix_deployments, molnix_api, countries):
molnix_ids = [d["id"] for d in molnix_deployments]
molnix_ids = [d["id"] for d in molnix_deployments] # XXX: LOOP 1
warnings = []
messages = []
successful_creates = 0
successful_updates = 0
# Ensure there are PersonnelDeployment instances for every unique emergency
events = [get_go_event(d["tags"]) for d in molnix_deployments]
events = [get_go_event(d["tags"]) for d in molnix_deployments] # XXX: LOOP 2
event_ids = [ev.id for ev in events if ev]
unique_event_ids = list(set(event_ids))
for event_id in unique_event_ids:
Expand Down Expand Up @@ -292,10 +292,10 @@ def sync_deployments(molnix_deployments, molnix_api, countries):
p.save()

# Create Personnel objects
for md in molnix_deployments: # LOOP1
for md in molnix_deployments: # XXX: LOOP 3
if "position_id" not in md: # changed structure §
md2 = molnix_api.get_deployment(md["id"])
md |= md2["deployment"]
md |= md2["deployment"] # XXX: Potential issue due to mutation?
if skip_this(md["tags"]):
warning = "Deployment id %d skipped due to No-GO" % md["id"]
logger.warning(warning)
Expand Down Expand Up @@ -459,13 +459,13 @@ def sync_deployments(molnix_deployments, molnix_api, countries):


def sync_open_positions(molnix_positions, molnix_api, countries):
molnix_ids = [p["id"] for p in molnix_positions]
molnix_ids = [p["id"] for p in molnix_positions] # XXX: Loop 1
warnings = []
messages = []
successful_creates = 0
successful_updates = 0

for position in molnix_positions: # LOOP2
for position in molnix_positions: # XXX: LOOP 2
logger.warning("× " + str(position["id"]))
if skip_this(position["tags"]):
warning = "Position id %d skipped due to No-GO" % position["id"]
Expand Down
79 changes: 69 additions & 10 deletions api/molnix_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,64 @@
import json
import os
import tempfile
import weakref

import requests

from api.logger import logger


def _safe_unlink(path):
try:
os.unlink(path)
except OSError:
pass


class _CachedPaginated:
"""Iterable wrapper around a streaming generator factory.

The first iteration drains the source generator and tees each record into a
JSONL cache file under /tmp/. Subsequent iterations replay from that cache.
Memory stays bounded to one record at a time, even when the caller iterates
the same result more than once (as sync_molnix does for tags + main loop).
"""

def __init__(self, source_factory, label=""):
self._source_factory = source_factory
self._label = label
fd, self._cache_path = tempfile.mkstemp(suffix=".jsonl", prefix="molnix_paginated_")
os.close(fd)
self._cached = False
self._iter_count = 0
weakref.finalize(self, _safe_unlink, self._cache_path)

def __iter__(self):
self._iter_count += 1
if self._cached:
logger.warning(
"_CachedPaginated[%s] re-iteration #%d from cache %s",
self._label,
self._iter_count,
self._cache_path,
)
with open(self._cache_path) as f:
for line in f:
yield json.loads(line)
return
logger.warning(
"_CachedPaginated[%s] first iteration #%d streaming from source -> %s",
self._label,
self._iter_count,
self._cache_path,
)
with open(self._cache_path, "w") as f:
for item in self._source_factory():
f.write(json.dumps(item))
f.write("\n")
yield item
self._cached = True


class MolnixApi:

Expand Down Expand Up @@ -29,19 +86,15 @@ def call_api(self, path, method="GET", params={}):

def call_api_paginated(self, path, response_key=None, params={}):
page = 1
next_page = True
results = []
while next_page:
while True:
params["page"] = page
data = self.call_api(path=path, params=params)
if response_key:
data = data[response_key]["data"]
results += data
if len(data) == 0:
next_page = False
else:
page += 1
return results
return
yield from data
page += 1

def login(self):
params = {"username": self.username, "password": self.password}
Expand All @@ -63,7 +116,10 @@ def get_open_positions(self):

def get_not_only_open_positions(self):
# return self.call_api_paginated(path="positions", response_key="positions")
return self.call_api_paginated(path="positions", response_key="positions", params={"limit": 999999})
return _CachedPaginated(
lambda: self.call_api_paginated(path="positions", response_key="positions", params={"limit": 999999}),
label="positions",
)

def get_deployments(self):
deployments_filter = {
Expand All @@ -77,7 +133,10 @@ def get_deployments(self):
"criterias": "[]",
}
params = {"filter": json.dumps(deployments_filter)}
return self.call_api_paginated(path="deployments", response_key="deployments", params=params)
return _CachedPaginated(
lambda: self.call_api_paginated(path="deployments", response_key="deployments", params=params),
label="deployments",
)

"""
WARNING: If position is not found or generates an error, we return None
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/ifrcgo-helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ cronjobs:
schedule: '10 */2 * * *'
resources:
requests:
memory: 5Gi
memory: 1Gi
cpu: 0.1
limits:
memory: 6Gi
memory: 2Gi
cpu: 4
- command: 'ingest_appeals'
schedule: '*/30 * * * *'
Expand Down
Loading