Skip to content

Commit

Permalink
[Celery]Using multiple queues
Browse files Browse the repository at this point in the history
Using multiple queues the tasks from different types can be served in parallel.
  • Loading branch information
cronosnull committed Jun 4, 2020
1 parent 74e63fa commit 38c656a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
2 changes: 1 addition & 1 deletion celery_spider_cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def main_driver(args):
# for logging pourposes.
# The propagate false will prevent it to raise
# an exception if any of the schedds query failed.
groups = res.collect(propagate=False)
groups = res.get(propagate=False)
logging.debug(groups)
print(time.time() - start_time)

Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
- CMS_ES_CONF_FILE=/run/secrets/es-conf
- REQUESTS_CA_BUNDLE=/etc/pki/ca-trust/extracted/openssl/ca-bundle.trust.crt
- LOGLEVEL=DEBUG
entrypoint: ['celery', "-A", "htcondor_es.celery.celery", "worker"]
entrypoint: ['celery', "-A", "htcondor_es.celery.celery", "worker","-Q","celery,es_post,convert"]
secrets: &secrets
- amq-username
- amq-password
Expand All @@ -49,7 +49,7 @@ services:
restart: 'no'
environment: *env
# entrypoint: ["python","/cms-htcondor-es/tests/celery_test.py"]
entrypoint: ["python","/cms-htcondor-es/celery_spider_cms.py","--feed_es"]
entrypoint: ["python","/cms-htcondor-es/celery_spider_cms.py","--feed_es","--query_queue_batch_size","5000"]
depends_on:
- spider-worker

Expand Down
12 changes: 9 additions & 3 deletions src/htcondor_es/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,20 @@ def query_schedd(
keep_full_queue_data=keep_full_queue_data,
feed_es=feed_es,
es_index=es_index_template,
metadata={"spider_source": f"condor_{query_type}"}
metadata={"spider_source": f"condor_{query_type}"},
)
if query_type == "history":
getRedisConnection().set(schedd_ad["name"], hist_time)
return (schedd_ad["name"], n_tasks)


@app.task(ignore_result=True)
@app.task(
ignore_result=True,
queue="convert",
acks_late=True,
max_retries=3,
autoretry_for=(OSError,),
)
def process_docs(
docs,
reduce_data=True,
Expand Down Expand Up @@ -169,7 +175,7 @@ def process_docs(
return post_ads(converted_docs) if converted_docs else []


@app.task(ignore_result=True)
@app.task(ignore_result=True, queue="es_post")
def post_ads_es(es_docs, es_index, metadata=None):
"""
Send the messages to ES.
Expand Down

0 comments on commit 38c656a

Please sign in to comment.