Skip to content

Commit

Permalink
working with http for citypage flow
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Aug 11, 2024
1 parent 80cd92a commit 1b54d15
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
31 changes: 24 additions & 7 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -403,22 +403,39 @@ It's named */this/20160123/pattern/RAW_MERGER_GRIB/directory* if the notificati
acceptSizeWrong: <boolean> (default: False)
-------------------------------------------

When a file is downloaded and its size does not match the one advertised, it is
normally rejected, as a failure. This option accepts the file even with the wrong
size. helpful when file is changing frequently, and there is some queueing, so
the file is changed by the time it is retrieved.
When acceptSizeWrong is set to True, the download accepts the file even if it's size
does not match what is in the notification message received. This is helpful when
resources are changing frequently, and there is some queueing, so the file is changed
by the time it is retrieved.

In the default case (acceptSizeWrong set to False), size mismatch is
considered a download failure. Sarracenia then checks if the
what was downloaded matches what is on the upstream server currently.

If the modification date on the upstream server is newer than in the message::

2024-08-11 00:00:47,978 [INFO] sarracenia.flow download upstream resource is newer, so message https://hpfx.collab.science.gc.ca //20240811/WXO-DD/citypage_weather/xml/NB/s0000653_e.xml is obsolete. Discarding.

If it does match the upstream size, than no error has occurred on download,
it's just that the size of the message announcing the new resource does not
match what is currently available. There is no point retrying the download.
In the both cases, the file downloaded and the corresponding message are both
discarded.

If the check of the upstream server fails, or the retrieve itself has failed,
then it puts the resource on the retry queue for later attempts.


attempts <count> (default: 3)
-----------------------------

The **attempts** option indicates how many times to
attempt downloading the data before giving up. The default of 3 should be appropriate
in most cases. When the **retry** option is false, the file is then dropped immediately.
attempt downloading the data before giving up. The default of 3 should be appropriate
in most cases. When the **retry** option is false, the file is then dropped immediately.

When The **retry** option is set (default), a failure to download after prescribed number
of **attempts** (or send, in a sender) will cause the notification message to be added to a queue file
for later retry. When there are no notification messages ready to consume from the AMQP queue,
for later retry. When there are no notification messages ready to consume from the AMQP queue,
the retry queue will be queried.


Expand Down
10 changes: 5 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ def do_download(self) -> None:
logger.warning("downloading again, attempt %d" % i)

ok = self.download(msg, self.o)
if ok==1:
if ok == 1:
logger.debug("downloaded ok: %s" % new_path)
msg.setReport(201, "Download successful" )
# if content is present, but downloaded anyways, then it is no good, and should not be forwarded.
Expand All @@ -1882,7 +1882,7 @@ def do_download(self) -> None:
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
break
elif ok==-1:
elif ok == -1:
logger.debug("download failed permanently, discarding transfer: %s" % new_path)
msg.setReport(410, "message received for content that is no longer available" )
self.worklist.rejected.append(msg)
Expand Down Expand Up @@ -2195,13 +2195,13 @@ def download(self, msg, options) -> int:
mtime = sarracenia.timestr2flt(msg['pubTime'])

if current_stat and current_stat.st_mtime > mtime:
logger.error( f'upstream source has changed, message obsolete. ' )
logger.info( f'upstream resource is newer, so message {msg.getIDStr()} is obsolete. Discarding.' )
retval=-1
elif current_stat and current_stat.st_size == len_written:
logger.warning( f'downloads ok, but upstream source not as announced. Perhaps AcceptSizeWrong? ' )
logger.info( f'download matches upstream source, {msg.getIDStr()} but not announcement. Discarding.' )
retval=-1
else:
logger.error( f"unexplained size discrepancy, will retry later" )
logger.error( f"unexplained size discrepancy in {msg.getIDStr()}, will retry later" )
elif len_written > block_length:
logger.error( f'download more {len_written} than expected {block_length} bytes for {new_inflight_path}' )
else:
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def read_writelocal(self,
# 2022/12/02 - pas should see a lot of these messages in HPC case from now on...

if not self.o.acceptSizeWrong and length != 0 and rw_length != length:
logger.warning(
logger.debug(
"util/writelocal mismatched file length writing %s. Message said to expect %d bytes. Got %d bytes."
% (local_file, length, rw_length))

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/transfer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def getcwd(self):
return self.cwd

def stat(self,path,message=None):
spath = path if path[0] == '/' else self.cwd + '/' + path
spath = path if path[0] == '/' else self.path + '/' + path
return sarracenia.stat(spath)

# ls
Expand Down
7 changes: 3 additions & 4 deletions sarracenia/transfer/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,15 @@ def __open__(self, path, remote_offset=0, length=0):
def stat(self,path,msg) -> sarracenia.filemetadata.FmdStat:
st = sarracenia.filemetadata.FmdStat()

#logger.debug( f" baseUrl:{msg['baseUrl']}, path:{self.path}, cwd:{self.cwd}, path:{path} " )

url = msg['baseUrl']
if msg['baseUrl'][-1] != '/' and self.cwd[0] != '/' :
if msg['baseUrl'][-1] != '/' and self.path[0] != '/' :
url += '/'
url += self.cwd
logger.critical( f" 1 url={url}")
url += self.path
if url[-1] != '/':
url += '/'
url += path
logger.critical( f" 2 url={url}")

try:
resp = requests.head(url)
Expand Down

0 comments on commit 1b54d15

Please sign in to comment.