Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue997 - Eliminate retry file race condition and make AM work more like a sarracenia flow #1005

Merged
merged 6 commits into from
Mar 28, 2024
Merged
100 changes: 66 additions & 34 deletions sarracenia/flowcb/gather/am.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(self, options):
self.minnum = 00000
self.maxnum = 99999
self.remoteHost = None
self.timeout = 0.1

# Initialise socket
## Create a TCP socket
Expand All @@ -119,23 +120,45 @@ def __WaitForRemoteConnection__(self):
# Bind socket to specified host and listen
self.s.bind((self.host, self.port))
self.s.listen(1)
# Set timeout higher so that exponential backoff isn't triggered on startup.
self.s.settimeout(self.timeout*100)
logger.info("Socket listening on host %s and port %d.", self.host, self.port)
logger.info("Trying to accept connection.")
except socket.error as e:
logger.error(f"Bind failed. Retrying. Error message: {e.args}")
time.sleep(5)

child_inst = 2
child_inst = 1
n = 1

while True:

try:
# Accept the connection from socket
logger.info("Trying to accept connection.")

try:
conn, self.remoteHost = self.s.accept()

# Write out file descriptor of the connected socket so that the child can pick it up.
n = 1
child_inst += 1
conn_filename = sarracenia.config.get_pid_filename(
None, self.o.component, self.o.config, child_inst)
conn_filename = conn_filename.replace('pid','conn')
conn_fd = open(conn_filename, 'w')
conn_fd.write(str(conn.fileno()))
conn_fd.close()

os.set_inheritable(conn.fileno(), True)
time.sleep(1)

except TimeoutError:
n = n * 2
if n > 64: n = 64
logger.info(f"No new connections. Waiting {n} seconds.")
time.sleep(n)
continue

except Exception as e:
logger.error(f"Stopping accept. Exiting. Error message: {e}")
sys.exit(0)
Expand All @@ -148,20 +171,24 @@ def __WaitForRemoteConnection__(self):

# Instance forks
## Instance 1 (Parent, pid=child_pid): Stays in the loop trying to accept other connections.
## Instance 2 (Child, pid=0): Exits loop. Proceeds to initialise the service with the remote host.
## Insys.argv[2] = str(child_instance)
sys.argv[2] = str(child_inst)
pid = os.fork()

if pid == 0:
## Close the unconnected socket instance as it is unused in the service.
self.s.close()

self.o.no = child_inst

## Set the logfiles properly
sarracenia.config.cfglogs(self.o, self.o.component, self.o.config, self.o.logLevel, child_inst)

self.o.no = child_inst
logger.info(f"Starting up service with host {self.remoteHost[0]}")
break

os.execl(sys.executable , sys.executable , *sys.argv )
logger.critical(f"Failed to launch child! {sys.argv=}")

elif pid == -1:
raise logger.exception("Connection could not fork. Exiting.")

Expand All @@ -175,35 +202,33 @@ def __WaitForRemoteConnection__(self):
## Close the connected socket instance as it is unused in the parent
conn.close()
logger.info(f"Forked child from host {self.remoteHost[0]} with instance number {child_inst} and pid {pid}")

child_inst += 1
pass

except Exception:
logger.error(f"Couldn't accept connection. Parent or child failed. Retrying to accept.")
# self.s.close()
# conn.close()
time.sleep(1)

logger.info("Connection accepted with IP %s on port %d. Starting service.", self.remoteHost[0], self.port)

return conn


def on_start(self):
# Set ipadresses in proper format
for IP in self.o.AllowIPs:
IP = ipaddress.ip_address(IP)

# If there are remaining instances, delete their filepaths and exit.
if self.o.no != 1:
pidfilename = sarracenia.config.get_pid_filename(None, self.o.component, self.o.config, self.o.no)
if os.path.exists(pidfilename):
os.unlink(pidfilename)
sys.exit(0)

self.conn = self.__WaitForRemoteConnection__()


if self.o.no == 1:
# Set ipadresses in proper format
for IP in self.o.AllowIPs:
IP = ipaddress.ip_address(IP)

self.conn = self.__WaitForRemoteConnection__()
else:
# Recreate the socket from the connection state file, created by the parent.
conn_filename = sarracenia.config.get_pid_filename(None, self.o.component, self.o.config, self.o.no)
conn_filename = conn_filename.replace('pid','conn')
conn_fd = open(conn_filename)
conn_fd_str = conn_fd.read()
conn_fd.close()
# Remove the .conn file as we don't need it anymore
os.unlink(conn_filename)
self.conn = socket.fromfd(int(conn_fd_str), socket.AF_INET, socket.SOCK_STREAM)
self.conn.settimeout(self.timeout)

def on_stop(self):
logger.info("On stop called. Exiting.")
Expand All @@ -217,18 +242,25 @@ def on_stop(self):
sys.exit(0)


def AddBuffer(self):
def addBuffer(self):
# try:
try:
tmp = self.conn.recv(self.limit)

# Socket returns b'' on disconnect.
if tmp == b'':
raise Exception()

# We don't want to wait on a hanging connection. We use the timeout error to exit out of the reception if there is nothing.
# This in turn makes the whole flow the same as any other sarracenia flow.
except TimeoutError:
return

except Exception as e:
tmp = ''
logger.error(f"Reception has been interrupted. Closing connection and exiting. Error message: {e}")

if tmp == '':
self.stop_requested = True
self.conn.close()
raise Exception()
sys.exit(1)

self.inBuffer = self.inBuffer + tmp

Expand All @@ -237,7 +269,7 @@ def AddBuffer(self):
# logger.warning("Type: %s, Value: %s, [socket.recv(%d)]" % (type, value, self.limit))


def CheckNextMsgStatus(self):
def checkNextMsgStatus(self):

# Only unpack data if a bulletin is received
## When unpacking, the length of the header is vital since it allows the receiver to extract the bulletin contents from the buffer.
Expand All @@ -260,7 +292,7 @@ def CheckNextMsgStatus(self):

def unwrapmsg(self):

status = self.CheckNextMsgStatus()
status = self.checkNextMsgStatus()

if status == 'OK':
(self.header,src_inet,dst_inet,threads,start,length,firsttime,timestamp,future) = \
Expand Down Expand Up @@ -368,12 +400,12 @@ def correctContents(self, bulletin, bulletin_firstchars, lines, missing_ahl, bul

def gather(self, messageCountMax):

self.AddBuffer()
self.addBuffer()

newmsg = []

while True:
status = self.CheckNextMsgStatus()
status = self.checkNextMsgStatus()

if status == 'INCOMPLETE':
break
Expand Down
24 changes: 13 additions & 11 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,6 @@ def __init__(self, options) -> None:

#queuedriver = os.getenv('SR3_QUEUEDRIVER', 'disk')

if self.o.retry_driver == 'redis':
from sarracenia.redisqueue import RedisQueue
self.download_retry = RedisQueue(options, 'work_retry')
self.post_retry = RedisQueue(options, 'post_retry')
else:
from sarracenia.diskqueue import DiskQueue
self.download_retry_name = 'work_retry_%02d' % options.no
self.download_retry = DiskQueue(options, self.download_retry_name)
self.post_retry_name = 'post_retry_%03d' % options.no
self.post_retry = DiskQueue(options, self.post_retry_name)

logger.debug('logLevel=%s' % self.o.logLevel)


Expand Down Expand Up @@ -176,6 +165,19 @@ def on_housekeeping(self) -> None:
self.download_retry.on_housekeeping()
self.post_retry.on_housekeeping()

def on_start(self) -> None:

if self.o.retry_driver == 'redis':
from sarracenia.redisqueue import RedisQueue
self.download_retry = RedisQueue(self.o, 'work_retry')
self.post_retry = RedisQueue(self.o, 'post_retry')
else:
from sarracenia.diskqueue import DiskQueue
self.download_retry_name = 'work_retry_%02d' % self.o.no
self.download_retry = DiskQueue(self.o, self.download_retry_name)
self.post_retry_name = 'post_retry_%03d' % self.o.no
self.post_retry = DiskQueue(self.o, self.post_retry_name)

def on_stop(self) -> None:
self.download_retry.close()
self.post_retry.close()
Loading