Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ shlib-exports-*.txt
/test/test_khash
/test/test_kstring
/test/test_mod
/test/test_hfile_libcurl
/test/test_nibbles
/test/test-parse-reg
/test/test_realn
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ BUILT_TEST_PROGRAMS = \
test/test-bcf-translate \
test/test-parse-reg \
test/test_introspection \
test/test-bcf_set_variant_type
test/test-bcf_set_variant_type \
test/test_hfile_libcurl

BUILT_THRASH_PROGRAMS = \
test/thrash_threads1 \
Expand Down Expand Up @@ -713,6 +714,7 @@ check test: all $(HTSCODECS_TEST_TARGETS)
else \
REF_PATH=: ./test.pl $(REF_CACHE_TEST_OPTS) $${TEST_OPTS:-} ; \
fi
test/test_hfile_libcurl

test/hts_endian: test/hts_endian.o
$(CC) $(LDFLAGS) -o $@ test/hts_endian.o $(LIBS)
Expand Down Expand Up @@ -805,6 +807,9 @@ test/test_introspection: test/test_introspection.o libhts.a
test/test-bcf_set_variant_type: test/test-bcf_set_variant_type.o libhts.a
$(CC) $(LDFLAGS) -o $@ test/test-bcf_set_variant_type.o libhts.a $(LIBS) -lpthread

test/test_hfile_libcurl: test/test_hfile_libcurl.o libhts.a
$(CC) $(LDFLAGS) -o $@ test/test_hfile_libcurl.o libhts.a $(LIBS) -lpthread

# Extra tests for bundled htscodecs
test_htscodecs_rans4x8: htscodecs/tests/rans4x8
cd htscodecs/tests && srcdir=. && export srcdir && ./rans4x8.test
Expand Down Expand Up @@ -884,6 +889,7 @@ test/test-bcf-sr.o: test/test-bcf-sr.c config.h $(htslib_hts_defs_h) $(htslib_sy
test/test-bcf-translate.o: test/test-bcf-translate.c config.h $(htslib_vcf_h)
test/test_introspection.o: test/test_introspection.c config.h $(htslib_hts_h) $(htslib_hfile_h)
test/test-bcf_set_variant_type.o: test/test-bcf_set_variant_type.c config.h $(htslib_hts_h) vcf.c
test/test_hfile_libcurl.o: test/test_hfile_libcurl.c config.h $(htslib_hfile_h) $(htslib_hts_h)

# Standalone target not added to $(BUILT_TEST_PROGRAMS) as some may not
# have a compiler that compiles as C++ when given a .cpp source file.
Expand Down Expand Up @@ -1015,6 +1021,7 @@ htslib-uninstalled.pc: htslib.pc.tmp


testclean:
-rm -f test/hfile_libcurl.tmp
-rm -f test/*.tmp test/*.tmp.* test/faidx/*.tmp* \
test/longrefs/*.tmp.* test/ref_cache/*.tmp.* test/tabix/*.tmp.* \
test/bgzf_boundaries/*.tmp.* test/*/FAIL* \
Expand Down
196 changes: 194 additions & 2 deletions hfile_libcurl.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DEALINGS IN THE SOFTWARE. */
# include <sys/select.h>
#endif
#include <assert.h>
#include <time.h>

#include "hfile_internal.h"
#ifdef ENABLE_PLUGINS
Expand Down Expand Up @@ -111,11 +112,13 @@ typedef struct {
unsigned can_seek : 1; // Can (attempt to) seek on this handle
unsigned is_recursive:1; // Opened by hfile_libcurl itself
unsigned tried_seek : 1; // At least one seek has been attempted
unsigned needs_reconnect : 1; // Deferred reconnect after retryable error
int nrunning;
http_headers headers;

off_t delayed_seek; // Location to seek to before reading
off_t last_offset; // Location we're seeking from
off_t stream_pos; // Current position in remote file for retry
char *preserved; // Preserved buffer content on seek
size_t preserved_bytes; // Number of preserved bytes
size_t preserved_size; // Size of preserved buffer
Expand Down Expand Up @@ -227,6 +230,43 @@ static int easy_errno(CURL *easy, CURLcode err)
}
}

static int is_retryable(CURL *easy, CURLcode err)
{
switch (err) {
case CURLE_COULDNT_CONNECT:
case CURLE_SEND_ERROR:
case CURLE_RECV_ERROR:
case CURLE_PARTIAL_FILE:
case CURLE_OPERATION_TIMEDOUT:
case CURLE_GOT_NOTHING:
case CURLE_SSL_CONNECT_ERROR:
#ifdef CURLE_HTTP2
case CURLE_HTTP2:
#endif
#ifdef CURLE_HTTP2_STREAM
case CURLE_HTTP2_STREAM:
#endif
return 1;

case CURLE_HTTP_RETURNED_ERROR: {
long response = 0;
if (curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &response)
== CURLE_OK) {
switch (response) {
case 429: case 500: case 502: case 503: case 504:
return 1;
default:
break;
}
}
return 0;
}

default:
return 0;
}
}

static int multi_errno(CURLMcode errm)
{
switch (errm) {
Expand Down Expand Up @@ -255,10 +295,16 @@ static struct {
char *auth_path;
khash_t(auth_map) *auth_map;
int allow_unencrypted_auth_header;
int retry_max; // Max retry attempts (HTS_RETRY_MAX, default 3)
long retry_delay_ms; // Initial retry delay in ms (HTS_RETRY_DELAY, default 500)
long retry_max_delay_ms; // Max retry delay in ms (HTS_RETRY_MAX_DELAY, default 60000)
long low_speed_limit; // Bytes/sec threshold (HTS_LOW_SPEED_LIMIT, default 1)
long low_speed_time; // Seconds below threshold (HTS_LOW_SPEED_TIME, default 60)
pthread_mutex_t auth_lock;
pthread_mutex_t share_lock;
} curl = { { 0, 0, NULL }, NULL, NULL, NULL, 0, PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER };
} curl = { { 0, 0, NULL }, NULL, NULL, NULL, 0,
3, 500, 60000, 1, 60,
PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };

static void share_lock(CURL *handle, curl_lock_data data,
curl_lock_access access, void *userptr) {
Expand Down Expand Up @@ -772,14 +818,79 @@ static size_t header_callback(void *contents, size_t size, size_t nmemb,
}


static void refresh_retry_config(void)
{
const char *val;
if ((val = getenv("HTS_RETRY_MAX")) != NULL)
curl.retry_max = atoi(val);
if ((val = getenv("HTS_RETRY_DELAY")) != NULL)
curl.retry_delay_ms = atol(val);
if ((val = getenv("HTS_RETRY_MAX_DELAY")) != NULL)
curl.retry_max_delay_ms = atol(val);
if ((val = getenv("HTS_LOW_SPEED_LIMIT")) != NULL)
curl.low_speed_limit = atol(val);
if ((val = getenv("HTS_LOW_SPEED_TIME")) != NULL)
curl.low_speed_time = atol(val);
}

static void retry_sleep(long delay_ms)
{
#ifdef _WIN32
Sleep(delay_ms);
#else
struct timespec ts;
ts.tv_sec = delay_ms / 1000;
ts.tv_nsec = (delay_ms % 1000) * 1000000L;
nanosleep(&ts, NULL);
#endif
}

static int retry_reconnect(hFILE_libcurl *fp, off_t pos)
{
int attempt;
long delay = curl.retry_delay_ms;
int save_can_seek;

for (attempt = 0; attempt < curl.retry_max; attempt++) {
hts_log_warning("Retrying connection (attempt %d/%d) at offset %lld",
attempt + 1, curl.retry_max, (long long) pos);
retry_sleep(delay);

save_can_seek = fp->can_seek;
if (restart_from_position(fp, pos) == 0) {
fp->needs_reconnect = 0;
return 0;
}
// restart_from_position sets can_seek=0 on failure; restore it
fp->can_seek = save_can_seek;

// Exponential backoff
delay *= 2;
if (delay > curl.retry_max_delay_ms)
delay = curl.retry_max_delay_ms;
}

return -1;
}

static ssize_t libcurl_read(hFILE *fpv, void *bufferv, size_t nbytes)
{
hFILE_libcurl *fp = (hFILE_libcurl *) fpv;
char *buffer = (char *) bufferv;
off_t to_skip = -1;
ssize_t got = 0;
CURLcode err;
int retry_attempts = 0;

// Handle deferred reconnection from a previous retryable error
if (fp->needs_reconnect) {
if (retry_reconnect(fp, fp->stream_pos) < 0) {
errno = EIO;
return -1;
}
}

retry:
if (fp->delayed_seek >= 0) {
assert(fp->base.offset == fp->delayed_seek);

Expand All @@ -799,6 +910,7 @@ static ssize_t libcurl_read(hFILE *fpv, void *bufferv, size_t nbytes)
} else {
fp->last_offset = fp->delayed_seek = -1;
}
fp->stream_pos += bytes;
return bytes;
}

Expand Down Expand Up @@ -852,10 +964,42 @@ static ssize_t libcurl_read(hFILE *fpv, void *bufferv, size_t nbytes)
fp->buffer.len = 0;

if (fp->finished && fp->final_result != CURLE_OK) {
if (is_retryable(fp->easy, fp->final_result)
&& curl.retry_max > 0) {
if (got > 0) {
// Return partial data; defer reconnection to next call
fp->needs_reconnect = 1;
fp->stream_pos += got;
return got;
}
// No data; retry inline
if (retry_attempts < curl.retry_max) {
long delay = curl.retry_delay_ms;
int save_can_seek = fp->can_seek;
int i;
for (i = 0; i < retry_attempts; i++) {
delay *= 2;
if (delay > curl.retry_max_delay_ms) {
delay = curl.retry_max_delay_ms;
break;
}
}
hts_log_warning("Retrying read (attempt %d/%d) at offset %lld",
retry_attempts + 1, curl.retry_max,
(long long) fp->stream_pos);
retry_sleep(delay);
if (restart_from_position(fp, fp->stream_pos) == 0) {
retry_attempts++;
goto retry;
}
fp->can_seek = save_can_seek;
}
}
errno = easy_errno(fp->easy, fp->final_result);
return -1;
}

fp->stream_pos += got;
return got;
}

Expand Down Expand Up @@ -971,6 +1115,7 @@ static off_t libcurl_seek(hFILE *fpv, off_t offset, int whence)
preserve_buffer_content(fp);
}
fp->delayed_seek = pos;
fp->stream_pos = pos;
return pos;
}

Expand All @@ -982,6 +1127,7 @@ static off_t libcurl_seek(hFILE *fpv, off_t offset, int whence)
}

fp->tried_seek = 1;
fp->stream_pos = pos;
return pos;
}

Expand Down Expand Up @@ -1177,6 +1323,8 @@ libcurl_open(const char *url, const char *modes, http_headers *headers)
kstring_t in_header = {0, 0, NULL};
long response;

refresh_retry_config();

is_recursive = strchr(modes, 'R') != NULL;

if ((s = strpbrk(modes, "rwa+")) != NULL) {
Expand Down Expand Up @@ -1204,7 +1352,9 @@ libcurl_open(const char *url, const char *modes, http_headers *headers)
fp->paused = fp->closing = fp->finished = fp->perform_again = 0;
fp->can_seek = 1;
fp->tried_seek = 0;
fp->needs_reconnect = 0;
fp->delayed_seek = fp->last_offset = -1;
fp->stream_pos = 0;
fp->preserved = NULL;
fp->preserved_bytes = fp->preserved_size = 0;
fp->is_recursive = is_recursive;
Expand Down Expand Up @@ -1249,6 +1399,12 @@ libcurl_open(const char *url, const char *modes, http_headers *headers)
}
}
err |= curl_easy_setopt(fp->easy, CURLOPT_USERAGENT, curl.useragent.s);
if (curl.low_speed_limit > 0 && curl.low_speed_time > 0) {
err |= curl_easy_setopt(fp->easy, CURLOPT_LOW_SPEED_LIMIT,
curl.low_speed_limit);
err |= curl_easy_setopt(fp->easy, CURLOPT_LOW_SPEED_TIME,
curl.low_speed_time);
}
if (fp->headers.callback) {
if (add_callback_headers(fp) != 0) goto error;
}
Expand Down Expand Up @@ -1285,10 +1441,32 @@ libcurl_open(const char *url, const char *modes, http_headers *headers)
}

if (fp->finished && fp->final_result != CURLE_OK) {
if (is_retryable(fp->easy, fp->final_result)
&& curl.retry_max > 0) {
long delay = curl.retry_delay_ms;
int attempt, save_can_seek;
for (attempt = 0; attempt < curl.retry_max; attempt++) {
hts_log_warning("Retrying open (attempt %d/%d)",
attempt + 1, curl.retry_max);
retry_sleep(delay);
save_can_seek = fp->can_seek;
if (restart_from_position(fp, 0) == 0) {
if (!fp->finished || fp->final_result == CURLE_OK)
goto open_ok;
if (!is_retryable(fp->easy, fp->final_result))
break;
}
fp->can_seek = save_can_seek;
delay *= 2;
if (delay > curl.retry_max_delay_ms)
delay = curl.retry_max_delay_ms;
}
}
errno = easy_errno(fp->easy, fp->final_result);
goto error_remove;
}

open_ok:
if (fp->headers.redirect) {
if (response >= 300 && response < 400) { // redirection
kstring_t new_url = {0, 0, NULL};
Expand Down Expand Up @@ -1552,6 +1730,20 @@ int PLUGIN_GLOBAL(hfile_plugin_init,_libcurl)(struct hFILE_plugin *self)
curl.allow_unencrypted_auth_header = 1;
}

{
const char *val;
if ((val = getenv("HTS_RETRY_MAX")) != NULL)
curl.retry_max = atoi(val);
if ((val = getenv("HTS_RETRY_DELAY")) != NULL)
curl.retry_delay_ms = atol(val);
if ((val = getenv("HTS_RETRY_MAX_DELAY")) != NULL)
curl.retry_max_delay_ms = atol(val);
if ((val = getenv("HTS_LOW_SPEED_LIMIT")) != NULL)
curl.low_speed_limit = atol(val);
if ((val = getenv("HTS_LOW_SPEED_TIME")) != NULL)
curl.low_speed_time = atol(val);
}

info = curl_version_info(CURLVERSION_NOW);
ksprintf(&curl.useragent, "htslib/%s libcurl/%s", version, info->version);

Expand Down
3 changes: 2 additions & 1 deletion synced_bcf_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ static void bcf_sr_destroy1(bcf_sr_t *reader, int closefile)
if ( reader->bcf_idx ) hts_idx_destroy(reader->bcf_idx);
bcf_hdr_destroy(reader->header);
if (closefile) {
hts_close(reader->file);
if (hts_close(reader->file) < 0)
hts_log_error("Error on closing %s", reader->fname);
}
if ( reader->itr ) tbx_itr_destroy(reader->itr);
int j;
Expand Down
Loading