Skip to content

Commit

Permalink
[Celery] change the create affiliation to be a task
Browse files Browse the repository at this point in the history
(black applied)
  • Loading branch information
cronosnull committed May 28, 2020
1 parent 5f29a19 commit f0073b6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 40 deletions.
43 changes: 24 additions & 19 deletions celery_spider_cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time
import traceback
from celery import group
from htcondor_es.celery.tasks import query_schedd
from htcondor_es.celery.tasks import query_schedd, create_affiliation_dir
from htcondor_es.celery.celery import app
from htcondor_es.utils import get_schedds, get_schedds_from_file

Expand All @@ -33,30 +33,35 @@ def main_driver(args):
schedd_ads = get_schedds(args, collectors=args.collectors)
_types = []
if not args.skip_history:
_types.append('history')
_types.append("history")
if not args.skip_queues:
_types.append('queue')
res = group(
query_schedd.s(
sched,
dry_run=args.dry_run,
start_time=start_time,
keep_full_queue_data=args.keep_full_queue_data,
bunch=args.amq_bunch_size,
query_type=_type,
es_index_template=args.es_index_template,
feed_es=args.feed_es,
_types.append("queue")
res = (
create_affiliation_dir.si()
| group(
query_schedd.si(
sched,
dry_run=args.dry_run,
start_time=start_time,
keep_full_queue_data=args.keep_full_queue_data,
chunk_size=args.query_queue_batch_size,
bunch=args.amq_bunch_size,
query_type=_type,
es_index_template=args.es_index_template,
feed_es=args.feed_es,
)
for _type in _types
for sched in schedd_ads
)
for _type in _types
for sched in schedd_ads
).apply_async(serializer="pickle")
# Use the get to wait for results
# We could also chain it to a chord to process the responses
# for logging pourposes.
# The propagate false will prevent it to raise
# an exception if any of the schedds query failed.
# The propagate false will prevent it to raise
# an exception if any of the schedds query failed.
groups = res.get(propagate=False)
print([g for g in groups.collect()])

print([g for g in groups])


def main():
Expand Down Expand Up @@ -128,7 +133,7 @@ def main():

parser.add_argument(
"--query_queue_batch_size",
default=50,
default=500,
type=int,
dest="query_queue_batch_size",
help=(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
htcondor==8.7.9
elasticsearch>=7.0.0,<8.0.0
CMSMonitoring==0.3.3
CMSMonitoring==0.4.2
#git+https://github.com/cronosnull/CMSMonitoring.git@python3#egg=CMSMonitoring
requests==2.22.0
celery[redis]
34 changes: 14 additions & 20 deletions src/htcondor_es/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ def post_ads_es(es_docs, es_index, metadata=None):
traceback.print_exc()


@app.task()
def create_affiliation_dir(days=1):
try:
output_file = os.getenv(
"AFFILIATION_DIR_LOCATION",
AffiliationManager._AffiliationManager__DEFAULT_DIR_PATH,
)
AffiliationManager(recreate_older_days=days, dir_file=output_file)
except AffiliationManagerException as ex:
logging.warning("Error creating the AffiliationManager %s", str(ex))
traceback.print_exc()
pass


# ---Utils---
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks
Expand Down Expand Up @@ -266,23 +280,3 @@ def getRedisConnection():
os.getenv("SPIDER_CHECKPOINT", "redis://localhost/1")
)
return __REDIS_CONN


# ---- Signals----
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
"""
Setup the worker before starting to use them for tasks.
In this case it will check that the affiliation dir file exists
(or create it otherwise)
"""
try:
output_file = os.getenv(
"AFFILIATION_DIR_LOCATION",
AffiliationManager._AffiliationManager__DEFAULT_DIR_PATH,
)
AffiliationManager(recreate_older_days=60, dir_file=output_file)
except AffiliationManagerException as ex:
logging.warning("Error creating the AffiliationManager %s", str(ex))
traceback.print_exc()
pass

0 comments on commit f0073b6

Please sign in to comment.