Skip to content

Commit c7be621

Browse files
committed
FUSEX: Avoid null pointer deref and further timeout recovery EOS-6326, EOS-6217
1 parent 6b4480a commit c7be621

File tree

5 files changed

+167
-81
lines changed

5 files changed

+167
-81
lines changed

fusex/data/cachesyncer.cc

+47-30
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <XrdCl/XrdClXRootDResponses.hh>
1313
#include <XrdSys/XrdSysPthread.hh>
1414

15+
#include <algorithm>
1516
#include <vector>
1617

1718
class CollectiveHandler : public XrdCl::ResponseHandler
@@ -68,41 +69,57 @@ int cachesyncer::sync(int fd, interval_tree<uint64_t,
6869
return 0;
6970
}
7071

71-
CollectiveHandler handler(journal.size() + ((truncatesize != -1) ? 1 : 0));
72-
std::map<size_t, bufferll> bufferm;
73-
size_t i = 0;
74-
75-
for (auto itr = journal.begin(); itr != journal.end(); ++itr) {
76-
off_t cacheoff = itr->value + offshift;
77-
size_t size = itr->high - itr->low;
78-
bufferm[i].resize(size);
79-
int bytesRead = pread(fd, bufferm[i].ptr(), size, cacheoff);
80-
81-
if (bytesRead < 0) {
82-
// TODO handle error
83-
return -1;
84-
}
85-
86-
if (bytesRead < (int) size) {
87-
// TODO handle error
72+
const size_t ntot = journal.size();
73+
const size_t nbatch = 256;
74+
size_t nsub = 0;
75+
auto itr = journal.begin();
76+
bool needtrunc = (truncatesize != -1);
77+
78+
while(nsub<ntot || needtrunc) {
79+
const size_t n = std::min(ntot - nsub, nbatch);
80+
const bool dotrunc = (n < nbatch && needtrunc);
81+
CollectiveHandler handler(n + (dotrunc ? 1 : 0));
82+
83+
std::map<size_t, bufferll> bufferm;
84+
size_t i = 0;
85+
while(i<n) {
86+
off_t cacheoff = itr->value + offshift;
87+
size_t size = itr->high - itr->low;
88+
bufferm[i].resize(size);
89+
int bytesRead = pread(fd, bufferm[i].ptr(), size, cacheoff);
90+
91+
if (bytesRead < 0) {
92+
// TODO handle error
93+
return -1;
94+
}
95+
96+
if (bytesRead < (int) size) {
97+
// TODO handle error
98+
}
99+
100+
// do async write
101+
XrdCl::XRootDStatus st = file.Write(itr->low, size, bufferm[i].ptr(), &handler);
102+
++nsub;
103+
104+
if (!st.IsOK()) {
105+
handler.Report(new XrdCl::XRootDStatus(st));
106+
}
107+
108+
++i;
109+
++itr;
88110
}
89111

90-
// do async write
91-
XrdCl::XRootDStatus st = file.Write(itr->low, size, bufferm[i].ptr(), &handler);
92-
93-
if (!st.IsOK()) {
112+
// there might be a truncate call after the writes to be applied
113+
if (dotrunc) {
114+
XrdCl::XRootDStatus st = file.Truncate(truncatesize);
94115
handler.Report(new XrdCl::XRootDStatus(st));
116+
needtrunc = false;
95117
}
96118

97-
i++;
98-
}
99-
100-
// there might be a truncate call after the writes to be applied
101-
if (truncatesize != -1) {
102-
XrdCl::XRootDStatus st = file.Truncate(truncatesize);
103-
handler.Report(new XrdCl::XRootDStatus(st));
119+
handler.Wait();
120+
int rc = handler.WasSuccessful() ? 0 : -1;
121+
if (rc) return rc;
104122
}
105123

106-
handler.Wait();
107-
return handler.WasSuccessful() ? 0 : -1;
124+
return 0;
108125
}

fusex/data/data.cc

+56-33
Original file line numberDiff line numberDiff line change
@@ -1244,8 +1244,7 @@ data::datax::recover_ropen(fuse_req_t req)
12441244
}
12451245

12461246
// Issue a new open requesting also a new TCP connection
1247-
const struct fuse_ctx* ctx = fuse_req_ctx(req);
1248-
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(ctx);
1247+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(proxy, true);
12491248
newproxy->OpenAsync(newproxy, mRemoteUrlRO.c_str(), targetFlags, mode, 0);
12501249
// wait this time for completion
12511250

@@ -1407,8 +1406,7 @@ data::datax::try_ropen(fuse_req_t req, XrdCl::shared_proxy &proxy,
14071406
}
14081407

14091408
// Issue a new open requesting also a new TCP connection
1410-
const struct fuse_ctx* ctx = fuse_req_ctx(req);
1411-
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(ctx);
1409+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(proxy, true);
14121410
newproxy->OpenAsync(newproxy, open_url.c_str(), targetFlags, mode, 0);
14131411
// wait this time for completion
14141412

@@ -1557,8 +1555,7 @@ data::datax::try_wopen(fuse_req_t req, XrdCl::shared_proxy &proxy,
15571555

15581556
eos_warning("recover reopening file for writing");
15591557
// Issue a new open requesting also a new TCP connection
1560-
const struct fuse_ctx* ctx = fuse_req_ctx(req);
1561-
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(ctx);
1558+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(proxy, true);
15621559
newproxy->OpenAsync(newproxy, open_url.c_str(), targetFlags, mode, 0);
15631560
// wait this time for completion
15641561

@@ -1739,8 +1736,7 @@ data::datax::recover_write(fuse_req_t req)
17391736
}
17401737

17411738
// Issue a new open requesting also a new TCP connection
1742-
const struct fuse_ctx* ctx = fuse_req_ctx(req);
1743-
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(ctx);
1739+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(proxy, true);
17441740

17451741
if (!recover_from_file_cache && !recover_truncate) {
17461742
// we need to open this file because it is not complete locally
@@ -1904,7 +1900,8 @@ data::datax::recover_write(fuse_req_t req)
19041900
}
19051901

19061902
// upload into identical inode using the drop & replace option (repair flag)
1907-
XrdCl::shared_proxy uploadproxy = XrdCl::Proxy::Factory();
1903+
// Issue a new open requesting also a new TCP connection
1904+
XrdCl::shared_proxy uploadproxy = XrdCl::Proxy::Factory(proxy, true);
19081905
uploadproxy->inherit_attached(proxy);
19091906
uploadproxy->inherit_writequeue(uploadproxy, proxy);
19101907

@@ -1913,15 +1910,14 @@ data::datax::recover_write(fuse_req_t req)
19131910
eos_warning("failed to signal begin-flush");
19141911
}
19151912

1913+
std::string tmpUrl = mRemoteUrlRW;
19161914
// add the repair flag to drop existing locations and select new ones
1917-
mRemoteUrlRW += "&eos.repair=1";
1915+
tmpUrl += "&eos.repair=1";
19181916
// request enough space for this recovery upload
1919-
mRemoteUrlRW += "&eos.bookingsize=0";
1917+
tmpUrl += "&eos.bookingsize=0";
19201918
eos_warning("re-opening with repair flag for recovery %s",
1921-
mRemoteUrlRW.c_str());
1922-
int rc = try_wopen(req, uploadproxy, mRemoteUrlRW);
1923-
mRemoteUrlRW.erase(mRemoteUrlRW.length() -
1924-
std::string("&eos.repair=1").length());
1919+
tmpUrl.c_str());
1920+
int rc = try_wopen(req, uploadproxy, tmpUrl);
19251921

19261922
// put back the flush indicator
19271923
if (req && begin_flush(req)) {
@@ -2641,7 +2637,7 @@ data::datax::peek_pread(fuse_req_t req, char*& buf, size_t count, off_t offset)
26412637
int tret = 0;
26422638

26432639
// call recovery for an open
2644-
if ((tret = TryRecovery(req, false))) {
2640+
if ((tret = TryRecovery(req, true))) {
26452641
mRecoveryStack.push_back(eos_log(LOG_SILENT,
26462642
"status='%s' errno='%d' hint='failed TryRecovery'",
26472643
status.ToString().c_str(), tret));
@@ -2702,7 +2698,7 @@ data::datax::peek_pread(fuse_req_t req, char*& buf, size_t count, off_t offset)
27022698
mRecoveryStack.push_back(eos_log(LOG_SILENT,
27032699
"status='%s' hint='will TryRecovery'",
27042700
status.ToString().c_str()));
2705-
recovery = TryRecovery(req, false);
2701+
recovery = TryRecovery(req, mFile->has_xrdioro(req) ? false : true);
27062702

27072703
if (recovery) {
27082704
// recovery failed
@@ -3026,6 +3022,18 @@ data::datax::set_remote(const std::string& hostport,
30263022
}
30273023
}
30283024

3025+
/* -------------------------------------------------------------------------- */
3026+
std::string
3027+
/* -------------------------------------------------------------------------- */
3028+
data::datax::get_remote(bool isRW)
3029+
/* -------------------------------------------------------------------------- */
3030+
{
3031+
if (isRW) {
3032+
return mRemoteUrlRW;
3033+
}
3034+
return mRemoteUrlRO;
3035+
}
3036+
30293037
/* -------------------------------------------------------------------------- */
30303038
void
30313039
data::datax::dump_recovery_stack()
@@ -3355,33 +3363,48 @@ data::dmap::ioflush(ThreadAssistant& assistant)
33553363
// let's see if the initial OpenAsync got a timeout, this we should retry always
33563364
XrdCl::XRootDStatus status = fit->second->opening_state();
33573365
bool rescue = true;
3366+
bool canreissue = true;
3367+
const std::string opname =
3368+
(fit->second->state() == XrdCl::Proxy::CLOSEFAILED) ? "CloseAsync" : "OpenAsync";
3369+
3370+
if (fit->second->state() == XrdCl::Proxy::CLOSEFAILED &&
3371+
fit->second->opening_state().code == XrdCl::errOperationExpired) {
3372+
// to trigger new tcp conneciton for next time
3373+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(fit->second, true);
3374+
canreissue = false;
3375+
}
33583376

3359-
if (
3377+
if (canreissue && (
33603378
(status.code == XrdCl::errConnectionError) ||
33613379
(status.code == XrdCl::errSocketTimeout) ||
33623380
(status.code == XrdCl::errOperationExpired) ||
3363-
(status.code == XrdCl::errSocketDisconnected)) {
3364-
eos_static_warning("re-issuing OpenAsync request after timeout - ino:%16lx err-code:%d",
3365-
(*it)->id(), status.code);
3381+
(status.code == XrdCl::errSocketDisconnected))){
3382+
eos_static_warning("re-issuing %s request after timeout - ino:%16lx err-code:%d",
3383+
opname.c_str(),(*it)->id(), status.code);
33663384
// Recover such errors by force creation of a new XrdCl
3367-
// File object and a new TCP connection to avoid pilling
3368-
// up requests on a "blocked" TCP due, for example, to a
3369-
// slow close operation.
3370-
fuse_id id = fit->second->fuseid();
3371-
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(nullptr, &id);
3372-
newproxy->OpenAsync(newproxy, fit->second->url(), fit->second->flags(),
3373-
fit->second->mode(), 0);
3385+
// File object. Also try to use a new TCP connection for our
3386+
// fuseid(), to avoid pilling up requests on a "blocked" TCP due,
3387+
// for example, to a slow close operation. A new conneciton may
3388+
// not always used for the re-issued open below, e.g. if the
3389+
// process identified by the proxy's fuseid() has exited, the
3390+
// processCache will not increment the conneciton counter.
3391+
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory(fit->second, true);
3392+
// For the open use the url from the ioctx (datax) object rather than
3393+
// the previous proxy. The previous proxy may have used url options
3394+
// we don't want here, such as eos.repair.
3395+
newproxy->OpenAsync(newproxy, (*it)->get_remote(true),
3396+
fit->second->flags(), fit->second->mode(), 0);
33743397
newproxy->inherit_attached(fit->second);
33753398
newproxy->inherit_protocol(fit->second);
33763399
map[fit->first] = newproxy;
33773400
continue;
33783401
} else {
3379-
eos_static_warning("OpenAsync failed - trying recovery - ino:%16lx err-code:%d",
3380-
(*it)->id(), status.code);
3381-
33823402
if (status.errNo == kXR_noserver) {
33833403
int tret = 0;
33843404

3405+
eos_static_warning("%s failed - trying recovery - ino:%16lx err-code:%d",
3406+
opname.c_str(),(*it)->id(), status.code);
3407+
33853408
if (!(tret = (*it)->TryRecovery(0, true))) {
33863409
(*it)->recoverystack().push_back
33873410
(eos_static_log(LOG_SILENT, "hint='success TryRecovery'"));
@@ -3402,8 +3425,8 @@ data::dmap::ioflush(ThreadAssistant& assistant)
34023425
}
34033426
}
34043427

3405-
eos_static_warning("giving up OpenAsync request - ino:%16lx err-code:%d",
3406-
(*it)->id(), status.code);
3428+
eos_static_warning("giving up %s request - ino:%16lx err-code:%d",
3429+
opname.c_str(),(*it)->id(), status.code);
34073430

34083431
if (status.errNo == kXR_overQuota) {
34093432
// don't preserve these files, they got an application error beforehand

fusex/data/data.hh

+2
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ public:
139139
fuse_req_t req,
140140
bool isRW);
141141

142+
std::string get_remote(bool isRW);
143+
142144
// IO bridge interface
143145
ssize_t pread(fuse_req_t req, void* buf, size_t count, off_t offset);
144146
ssize_t pwrite(fuse_req_t req, const void* buf, size_t count, off_t offset);

0 commit comments

Comments
 (0)