Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added etcd locking mechanism #491

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion scripts/globusmonitor/Dockerfile_uploader
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ RUN apt-get -y update \
&& pip install flask-restful \
python-logstash \
globusonline-transfer-api-client \
psycopg2
psycopg2 python-etcd



COPY *.py *.json /home/globusmonitor/

Expand Down
29 changes: 27 additions & 2 deletions scripts/globusmonitor/globus_uploader_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import signal
import psycopg2
import socket
import re
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from io import BlockingIOError
from urllib3.filepost import encode_multipart_formdata
import etcd, traceback

from pyclowder.datasets import download_metadata
from terrautils.metadata import clean_metadata
Expand Down Expand Up @@ -50,6 +52,8 @@
current_task = None


etcd_client = etcd.Client(host='etcd2.terraref', port=4001)

# ----------------------------------------------------------
# SHARED UTILS
# ----------------------------------------------------------
Expand Down Expand Up @@ -208,7 +212,7 @@ def writeTaskToDatabase(task):
recv = task['received']
comp = task['completed']
guser = task['user']
jbody = json.dumps(task['contents'])
jbody = json.dumps(task['contents']).replace("'", "")

# Attempt to insert, update if globus ID already exists
q_insert = "INSERT INTO globus_tasks (globus_id, status, received, completed, globus_user, contents) " \
Expand Down Expand Up @@ -384,6 +388,13 @@ def notifyClowderOfCompletedTask(task):
c_sensor, c_date, c_year, c_month = ds, None, None, None



lockname = re.sub(" |_|-", "", ds)
logger.info("Acquiring lock for task %s" % lockname)
lock = etcd.Lock(etcd_client, lockname)
lock.acquire(blocking=True, lock_ttl=300, timeout=60)
logger.info("Acquired lock")

# Get dataset from clowder, or create & associate with collections
try:
hierarchy_host = clowder_host + ("/" if not clowder_host.endswith("/") else "")
Expand All @@ -396,6 +407,10 @@ def notifyClowderOfCompletedTask(task):
response = "RETRY"
continue

logger.info("Releasing lock")
lock.release()
logger.info("Lock released")

if dsid:
dsFileList = fetchDatasetFileList(dsid, sess)
# Only send files not already present in dataset by path
Expand Down Expand Up @@ -479,7 +494,7 @@ def notifyClowderOfCompletedTask(task):
headers={'Content-Type':header},
data=content)

if fi.status_code in [104, 500, 502, 504]:
if fi.status_code in [500, 502, 504]:
logger.error("[%s] failed to attach files (%s: %s)" % (ds, fi.status_code, fi.text))
updatedTask['contents'][ds]['files'][datasetMDFile]['retry'] = "%s: %s" % (fi.status_code, fi.text)
response = "RETRY"
Expand Down Expand Up @@ -552,6 +567,11 @@ def clowderSubmissionLoop():
logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
writeTaskToDatabase(task)
except etcd.EtcdException as e:
logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
traceback.print_exc()
writeTaskToDatabase(task)
except requests.ConnectionError as e:
logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
Expand Down Expand Up @@ -589,6 +609,11 @@ def clowderSubmissionLoop():
logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
writeTaskToDatabase(task)
except etcd.EtcdException as e:
logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
traceback.print_exc()
writeTaskToDatabase(task)
except requests.ConnectionError as e:
logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e)))
task['status'] = 'RETRY'
Expand Down