Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions bin/cvmfs_sync
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import stat
import sys
import time
import errno
import xattr
import queue
import random
import signal
Expand Down Expand Up @@ -62,6 +61,8 @@ def parse_opts():
help="Enable verbose printout.")
parser.add_option("-t", "--max-time", dest="max_time", type="int", default=0,
help="Time threshold for starting new transfers, in minutes.")
parser.add_option("--check-mtime", default=False, action="store_true",
help="Detect file changes with mtime")

opts, args = parser.parse_args()
if len(args) < 2:
Expand Down Expand Up @@ -122,7 +123,7 @@ def graft_filename(filename):
return os.path.join(parent_dir, ".cvmfsgraft-" + fname)


def should_skip(input_url, output_filename, size, output_mode):
def should_skip(input_url, output_filename, size, output_mode, mtime=None):
global g_skip_count
graftfile = graft_filename(output_filename)

Expand All @@ -139,7 +140,13 @@ def should_skip(input_url, output_filename, size, output_mode):
size_ok = size == st.st_size
mode_ok = output_mode == stat.S_IMODE(st.st_mode)

if size_ok and mode_ok:
# Compare mtime if we received it
if mtime:
mtime_ok = mtime == st.st_mtime
else:
mtime_ok = True

if size_ok and mode_ok and mtime_ok:
g_skip_count += 1
return True
elif os.path.exists(graftfile):
Expand All @@ -152,14 +159,14 @@ def should_skip(input_url, output_filename, size, output_mode):
break
except:
pass
if size == graft_size and mode_ok:
if size == graft_size and mode_ok and mtime_ok:
g_skip_count += 1
return True
logging.warning("Graft size/mode mismatch! Regenerating file %s (size %d) -> %s (graft size %d)" % \
(input_url, size, output_filename, graft_size))
logging.warning("Graft size/mode/mtime mismatch! Regenerating file %s (size %d, %d) -> %s (graft size %d, %d)" % \
(input_url, size, mtime, output_filename, graft_size, st.st_mtime))
else:
logging.warning("Size/mode mismatch! Regenerating file %s (size %d) -> %s (size %d)" % \
(input_url, size, output_filename, st.st_size))
logging.warning("Size/mode/mtime mismatch! Regenerating file %s (size %d, %d) -> %s (size %d, %d)" % \
(input_url, size, mtime, output_filename, st.st_size, st.st_mtime))
os.unlink(output_filename)
try:
os.makedirs(os.path.split(output_filename)[0])
Expand All @@ -169,7 +176,7 @@ def should_skip(input_url, output_filename, size, output_mode):
return False


def process_download_file(xrootd_url, output_filename, output_mode, deadline):
def process_download_file(xrootd_url, output_filename, output_mode, mtime, deadline):
"""
Download an entire file to the local host and stream its contents through cvmfs_swissknife
in order to create checksums.
Expand Down Expand Up @@ -228,6 +235,7 @@ def process_download_file(xrootd_url, output_filename, output_mode, deadline):
os.close(writefp)
os.waitpid(pid, 0)
os.chmod(output_filename, output_mode)
os.utime(output_filename, (mtime, mtime))
break
else:
try:
Expand All @@ -241,15 +249,15 @@ def process_download_file(xrootd_url, output_filename, output_mode, deadline):
return next_offset


def process_checksum_file(gridftp_url, xrootd_url, output_filename, output_mode, deadline):
def process_checksum_file(gridftp_url, xrootd_url, output_filename, output_mode, mtime, deadline):
"""
Process and create a CVMFS checksum / graft file.

If a GridFTP server is available, use that to generate a checksum file. Otherwise,
utilize Xrootd to download the file to memory and checksum it locally.
"""
if not gridftp_url:
return process_download_file(xrootd_url, output_filename, output_mode, deadline)
return process_download_file(xrootd_url, output_filename, output_mode, mtime, deadline)

if (deadline > 0) and (time.time() > deadline):
raise Exception("URL %s timed out when still in processing queue." % xrootd_url)
Expand Down Expand Up @@ -284,7 +292,7 @@ def process_checksum_file(gridftp_url, xrootd_url, output_filename, output_mode,
return 0


def process_files(base_url, gridftp_base_url, output_dir, count, ignore=[], include=[], thread_count=2, deadline=0):
def process_files(base_url, gridftp_base_url, output_dir, count, ignore=[], include=[], thread_count=2, deadline=0, compare_mtime=False):
global g_bytes_xfer

current = 0
Expand Down Expand Up @@ -319,11 +327,16 @@ def process_files(base_url, gridftp_base_url, output_dir, count, ignore=[], incl
else:
output_mode = 0o644 # Else 644

if should_skip(input_url, output_filename, size, output_mode):
if compare_mtime:
mtime = statinfo.modtime
else:
mtime = None

if should_skip(input_url, output_filename, size, output_mode, mtime):
continue
if should_ignore_filename(filename):
continue
future = pool.apply_async(process_checksum_file, (gridftp_url, xrootd_url, output_filename, output_mode, deadline))
future = pool.apply_async(process_checksum_file, (gridftp_url, xrootd_url, output_filename, output_mode, mtime, deadline))
future.filename = filename
future.size = size
futures.append(future)
Expand Down Expand Up @@ -524,7 +537,7 @@ def main():
gridftp_src = ''
if len(info) == 2:
src, gridftp_src = info
process_files(src, gridftp_src, dest, concurrency, ignore=opts.ignore, include=opts.include, thread_count=opts.metadata_concurrency, deadline=deadline)
process_files(src, gridftp_src, dest, concurrency, ignore=opts.ignore, include=opts.include, thread_count=opts.metadata_concurrency, deadline=deadline, compare_mtime=opts.check_mtime)

processing_time = time.time() - starttime
print("Total of %.1f GB in %d files processed in %.2f seconds." % (g_bytes_xfer/(1024.**3), len(g_processed_files), processing_time))
Expand Down
3 changes: 3 additions & 0 deletions update-scripts/cvmfs-sync-driver
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CvmfsSyncDriver(object):
if config.has_option(section, "gridftp_source"):
self.gridftp_source = config.get(section, "gridftp_source")
self.destination = config.get(section, "destination")
self.check_mtime = config.getboolean(section, "check_mtime")

def run(self):
"""
Expand All @@ -73,6 +74,8 @@ class CvmfsSyncDriver(object):
command += ["--ignore", ",".join(self.ignore)]
if self.include:
command += ["--include", ",".join(self.include)]
if self.check_mtime:
command += ["--check-mtime"]
command += ["--verbose"]
source = self.source
if self.gridftp_source:
Expand Down