Skip to content

Commit

Permalink
curvefs: change the data organization format
Browse files Browse the repository at this point in the history
Signed-off-by: swj <[email protected]>
  • Loading branch information
201341 authored and wuhongsong committed Apr 25, 2023
1 parent e2610d1 commit 423d86d
Show file tree
Hide file tree
Showing 45 changed files with 375 additions and 115 deletions.
3 changes: 3 additions & 0 deletions curvefs/conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ s3.bucket_name=bucket
s3.blocksize=4194304
s3.chunksize=67108864
s3.useVirtualAddressing=false
# s3 objectPrefix, if set 0, means no prefix, if set 1, means inode prefix
# if set 2 and other values mean hash prefix
s3.objectPrefix=0
# statistic info in xattr, hardlink will not be supported when enable
enableSumInDir=true

Expand Down
1 change: 1 addition & 0 deletions curvefs/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message S3Info {
required string bucketname = 4;
required uint64 blockSize = 5;
required uint64 chunkSize = 6;
optional uint32 objectPrefix = 7;
}

enum PartitionStatus {
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ void SetFuseClientS3Option(FuseClientOption *clientOption,
const S3InfoOption &fsS3Opt) {
clientOption->s3Opt.s3ClientAdaptorOpt.blockSize = fsS3Opt.blockSize;
clientOption->s3Opt.s3ClientAdaptorOpt.chunkSize = fsS3Opt.chunkSize;
clientOption->s3Opt.s3ClientAdaptorOpt.objectPrefix = fsS3Opt.objectPrefix;
clientOption->s3Opt.s3AdaptrOpt.s3Address = fsS3Opt.s3Address;
clientOption->s3Opt.s3AdaptrOpt.ak = fsS3Opt.ak;
clientOption->s3Opt.s3AdaptrOpt.sk = fsS3Opt.sk;
Expand All @@ -329,6 +330,7 @@ void S3Info2FsS3Option(const curvefs::common::S3Info& s3,
fsS3Opt->bucketName = s3.bucketname();
fsS3Opt->blockSize = s3.blocksize();
fsS3Opt->chunkSize = s3.chunksize();
fsS3Opt->objectPrefix = s3.has_objectprefix() ? s3.objectprefix() : 0;
}

} // namespace common
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ struct S3ClientAdaptorOption {
uint32_t baseSleepUs;
uint32_t maxReadRetryIntervalMs;
uint32_t readRetryIntervalMs;
uint32_t objectPrefix;
DiskCacheOption diskCacheOpt;
};

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ S3ClientAdaptorImpl::Init(
chunkFlushThreads_ = option.chunkFlushThreads;
maxReadRetryIntervalMs_ = option.maxReadRetryIntervalMs;
readRetryIntervalMs_ = option.readRetryIntervalMs;
objectPrefix_ = option.objectPrefix;
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class S3ClientAdaptor {
virtual std::shared_ptr<S3Client> GetS3Client() = 0;
virtual uint64_t GetBlockSize() = 0;
virtual uint64_t GetChunkSize() = 0;
virtual uint32_t GetObjectPrefix() = 0;
virtual bool HasDiskCache() = 0;
};

Expand Down Expand Up @@ -150,6 +151,10 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint64_t GetChunkSize() {
return chunkSize_;
}
uint32_t GetObjectPrefix() {
return objectPrefix_;
}

std::shared_ptr<FsCacheManager> GetFsCacheManager() {
return fsCacheManager_;
}
Expand Down Expand Up @@ -254,6 +259,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint32_t throttleBaseSleepUs_;
uint32_t maxReadRetryIntervalMs_;
uint32_t readRetryIntervalMs_;
uint32_t objectPrefix_;
Thread bgFlushThread_;
std::atomic<bool> toStop_;
std::mutex mtx_;
Expand Down
10 changes: 7 additions & 3 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ int FileCacheManager::ReadKVRequest(
uint64_t chunkIndex = 0, chunkPos = 0, blockIndex = 0, blockPos = 0;
uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize();
uint64_t blockSize = s3ClientAdaptor_->GetBlockSize();
uint32_t objectPrefix = s3ClientAdaptor_->GetObjectPrefix();
GetBlockLoc(req->offset, &chunkIndex, &chunkPos, &blockIndex,
&blockPos);

Expand Down Expand Up @@ -580,7 +581,7 @@ int FileCacheManager::ReadKVRequest(
assert(blockPos >= objectOffset);
std::string name = curvefs::common::s3util::GenObjName(
req->chunkId, blockIndex, req->compaction, req->fsId,
req->inodeId);
req->inodeId, objectPrefix);
char *currentBuf = dataBuf + req->readOffset + readBufOffset;

// read from localcache -> remotecache -> s3
Expand Down Expand Up @@ -630,12 +631,14 @@ void FileCacheManager::PrefetchForBlock(const S3ReadRequest &req,
uint64_t chunkSize,
uint64_t startBlockIndex) {
uint32_t prefetchBlocks = s3ClientAdaptor_->GetPrefetchBlocks();
uint32_t objectPrefix = s3ClientAdaptor_->GetObjectPrefix();
std::vector<std::pair<std::string, uint64_t>> prefetchObjs;

uint64_t blockIndex = startBlockIndex;
for (uint32_t i = 0; i < prefetchBlocks; i++) {
std::string name = curvefs::common::s3util::GenObjName(
req.chunkId, blockIndex, req.compaction, req.fsId, req.inodeId);
req.chunkId, blockIndex, req.compaction,
req.fsId, req.inodeId, objectPrefix);
uint64_t maxReadLen = (blockIndex + 1) * blockSize;
uint64_t needReadLen = maxReadLen > fileLen
? fileLen - blockIndex * blockSize
Expand Down Expand Up @@ -2297,6 +2300,7 @@ CURVEFS_ERROR DataCache::PrepareFlushTasks(uint64_t inodeId,
// generate flush task
uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize();
uint64_t blockSize = s3ClientAdaptor_->GetBlockSize();
uint32_t objectPrefix = s3ClientAdaptor_->GetObjectPrefix();
uint64_t blockPos = chunkPos_ % blockSize;
uint64_t blockIndex = chunkPos_ / blockSize;
uint64_t remainLen = len_;
Expand All @@ -2306,7 +2310,7 @@ CURVEFS_ERROR DataCache::PrepareFlushTasks(uint64_t inodeId,

// generate flush to disk or s3 task
std::string objectName = curvefs::common::s3util::GenObjName(
*chunkId, blockIndex, 0, fsId, inodeId);
*chunkId, blockIndex, 0, fsId, inodeId, objectPrefix);
int ret = 0;
uint64_t start = butil::cpuwide_time_us();
auto context = std::make_shared<PutObjectAsyncContext>();
Expand Down
91 changes: 70 additions & 21 deletions curvefs/src/client/s3/disk_cache_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ namespace client {
#define CACHE_READ_DIR "cacheread"

void DiskCacheBase::Init(std::shared_ptr<PosixWrapper> wrapper,
const std::string cacheDir) {
const std::string cacheDir, uint32_t objectPrefix) {
cacheDir_ = cacheDir;
posixWrapper_ = wrapper;
objectPrefix_ = objectPrefix;
}

int DiskCacheBase::CreateIoDir(bool writreDir) {
struct stat statFile;
bool ret;
std::string FullDirPath;

Expand Down Expand Up @@ -85,6 +85,37 @@ std::string DiskCacheBase::GetCacheIoFullDir() {
return fullPath;
}

int DiskCacheBase::CreateDir(const std::string dir) {
size_t p = dir.find_last_of('/');
std::string dirPath = dir;
if (p != -1) {
dirPath.erase(dirPath.begin()+p, dirPath.end());
}
std::vector<std::string> names;
::curve::common::SplitString(dirPath, "/", &names);
// root dir must exists
if (0 == names.size())
return 0;

std::string path;
for (size_t i = 0; i < names.size(); ++i) {
if (0 == i && dirPath[0] != '/')
path = path + names[i];
else
path = path + "/" + names[i];

if (IsFileExist(path)) {
continue;
}
// dir needs 755 permission,or “Permission denied”
if (posixWrapper_->mkdir(path.c_str(), 0755) < 0) {
LOG(WARNING) << "mkdir " << path << " failed. "<< strerror(errno);
return -errno;
}
}
return 0;
}

int DiskCacheBase::LoadAllCacheFile(std::set<std::string> *cachedObj) {
std::string cachePath = GetCacheIoFullDir();
bool ret = IsFileExist(cachePath);
Expand All @@ -94,26 +125,44 @@ int DiskCacheBase::LoadAllCacheFile(std::set<std::string> *cachedObj) {
}

VLOG(3) << "LoadAllCacheFile start, dir: " << cachePath;
DIR *cacheDir = NULL;
struct dirent *cacheDirent = NULL;
cacheDir = posixWrapper_->opendir(cachePath.c_str());
if (!cacheDir) {
LOG(ERROR) << "LoadAllCacheFile, opendir error, errno = " << errno;
return -1;
}
while ((cacheDirent = posixWrapper_->readdir(cacheDir)) != NULL) {
if ((!strncmp(cacheDirent->d_name, ".", 1)) ||
(!strncmp(cacheDirent->d_name, "..", 2)))
continue;
std::string fileName = cacheDirent->d_name;
cachedObj->emplace(fileName);
VLOG(9) << "LoadAllCacheFile obj, name = " << fileName;
}
std::function<bool(const std::string &path,
std::set<std::string> *cacheObj)> listDir;

int rc = posixWrapper_->closedir(cacheDir);
if (rc < 0) {
LOG(ERROR) << "LoadAllCacheFile, opendir error, errno = " << errno;
return rc;
listDir = [&listDir, this](const std::string &path,
std::set<std::string> *cacheObj) -> bool {
DIR *dir;
struct dirent *ent;
std::string fileName, nextdir;
if ((dir = posixWrapper_->opendir(path.c_str())) != NULL) {
while ((ent = posixWrapper_->readdir(dir)) != NULL) {
VLOG(9) << "LoadAllCacheFile obj, name = " << ent->d_name;
if (strncmp(ent->d_name, ".", 1) == 0 ||
strncmp(ent->d_name, "..", 2) == 0) {
continue;
} else if (ent->d_type == 8) {
fileName = std::string(ent->d_name);
VLOG(9) << "LoadAllCacheFile obj, name = " << fileName;
cacheObj->emplace(fileName);
} else {
nextdir = std::string(ent->d_name);
nextdir = path + '/' + nextdir;
if (!listDir(nextdir, cacheObj)) {
return false;
}
}
}
int ret = posixWrapper_->closedir(dir);
if (ret < 0) {
LOG(ERROR) << "close dir " << dir << ", error = " << errno;
}
return ret >= 0;
}
LOG(ERROR) << "LoadAllCacheFile Opendir error, path =" << path;
return false;
};
ret = listDir(cachePath, cachedObj);
if (!ret) {
return -1;
}
VLOG(3) << "LoadAllCacheReadFile end, dir: " << cachePath;
return 0;
Expand Down
6 changes: 4 additions & 2 deletions curvefs/src/client/s3/disk_cache_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <string>
#include <list>
#include <set>
#include <vector>
#include <memory>

#include "curvefs/src/common/wrap_posix.h"
Expand All @@ -45,23 +46,24 @@ class DiskCacheBase {
DiskCacheBase() {}
virtual ~DiskCacheBase() {}
virtual void Init(std::shared_ptr<PosixWrapper> wrapper,
const std::string cacheDir);
const std::string cacheDir, uint32_t objectPrefix);
/**
* @brief Create Read/Write Cache Dir.
*/
virtual int CreateIoDir(bool writreDir);
virtual bool IsFileExist(const std::string file);
virtual int CreateDir(const std::string name);
/**
* @brief Get Read/Write Cache full Dir(include CacheDir_).
*/
virtual std::string GetCacheIoFullDir();

virtual int LoadAllCacheFile(std::set<std::string> *cachedObj);
uint32_t objectPrefix_;

private:
std::string cacheIoDir_;
std::string cacheDir_;

// file system operation encapsulation
std::shared_ptr<PosixWrapper> posixWrapper_;
};
Expand Down
18 changes: 12 additions & 6 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "curvefs/src/client/s3/client_s3_adaptor.h"
#include "curvefs/src/client/s3/disk_cache_manager.h"
#include "curvefs/src/common/s3util.h"

namespace curvefs {

Expand Down Expand Up @@ -65,6 +66,7 @@ DiskCacheManager::DiskCacheManager(std::shared_ptr<PosixWrapper> posixWrapper,
safeRatio_ = 0;
diskUsedInit_ = false;
maxUsableSpaceBytes_ = 0;
objectPrefix_ = 0;
// cannot limit the size,
// because cache is been delete must after upload to s3
cachedObjName_ = std::make_shared<
Expand All @@ -85,10 +87,10 @@ int DiskCacheManager::Init(std::shared_ptr<S3Client> client,
maxUsableSpaceBytes_ = option.diskCacheOpt.maxUsableSpaceBytes;
maxFileNums_ = option.diskCacheOpt.maxFileNums;
cmdTimeoutSec_ = option.diskCacheOpt.cmdTimeoutSec;

cacheWrite_->Init(client_, posixWrapper_, cacheDir_,
option.diskCacheOpt.asyncLoadPeriodMs, cachedObjName_);
cacheRead_->Init(posixWrapper_, cacheDir_);
objectPrefix_ = option.objectPrefix;
cacheWrite_->Init(client_, posixWrapper_, cacheDir_, objectPrefix_,
option.diskCacheOpt.asyncLoadPeriodMs, cachedObjName_);
cacheRead_->Init(posixWrapper_, cacheDir_, objectPrefix_);
int ret;
ret = CreateDir();
if (ret < 0) {
Expand Down Expand Up @@ -390,8 +392,12 @@ void DiskCacheManager::TrimCache() {
}

VLOG(6) << "obj will be removed01: " << cacheKey;
cacheReadFile = cacheReadFullDir + "/" + cacheKey;
cacheWriteFile = cacheWriteFullDir + "/" + cacheKey;
cacheReadFile = cacheReadFullDir + "/" +
curvefs::common::s3util::GenPathByObjName(
cacheKey, objectPrefix_);
cacheWriteFile = cacheWriteFullDir + "/" +
curvefs::common::s3util::GenPathByObjName(
cacheKey, objectPrefix_);
struct stat statFile;
int ret = 0;
ret = posixWrapper_->stat(cacheWriteFile.c_str(), &statFile);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/s3/disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class DiskCacheManager {
uint32_t safeRatio_;
uint64_t maxUsableSpaceBytes_;
uint64_t maxFileNums_;
uint32_t objectPrefix_;
// used bytes of disk cache
std::atomic<int64_t> usedBytes_;
// used ratio of the file system in disk cache
Expand Down
16 changes: 13 additions & 3 deletions curvefs/src/client/s3/disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace curvefs {
namespace client {

void DiskCacheRead::Init(std::shared_ptr<PosixWrapper> posixWrapper,
const std::string cacheDir) {
const std::string cacheDir, uint32_t objectPrefix) {
posixWrapper_ = posixWrapper;
DiskCacheBase::Init(posixWrapper, cacheDir);
DiskCacheBase::Init(posixWrapper, cacheDir, objectPrefix);
}

int DiskCacheRead::ReadDiskFile(const std::string name, char *buf,
Expand Down Expand Up @@ -85,7 +85,7 @@ int DiskCacheRead::LinkWriteToRead(const std::string fileName,
const std::string fullWriteDir,
const std::string fullReadDir) {
VLOG(6) << "LinkWriteToRead start. name = " << fileName;
std::string fullReadPath, fullWritePath;
std::string fullReadPath, fullWritePath, dirPath;
fullWritePath = fullWriteDir + "/" + fileName;
fullReadPath = fullReadDir + "/" + fileName;
int ret;
Expand All @@ -94,6 +94,16 @@ int DiskCacheRead::LinkWriteToRead(const std::string fileName,
<< ", file = " << fullWritePath;
return -1;
}

if (objectPrefix_ != 0) {
ret = CreateDir(fullReadPath);
if (ret < 0 && errno != EEXIST) {
LOG(ERROR) << "Mkdir error. ret = " << ret << ", errno = " << errno
<< ", path is " << fullReadPath;
return -1;
}
}

ret = posixWrapper_->link(fullWritePath.c_str(), fullReadPath.c_str());
if (ret < 0 &&
errno != EEXIST ) {
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DiskCacheRead : public DiskCacheBase {
DiskCacheRead() {}
virtual ~DiskCacheRead() {}
virtual void Init(std::shared_ptr<PosixWrapper> posixWrapper,
const std::string cacheDir);
const std::string cacheDir, uint32_t objectPrefix);
virtual int ReadDiskFile(const std::string name, char *buf, uint64_t offset,
uint64_t length);
virtual int WriteDiskFile(const std::string fileName, const char *buf,
Expand Down
Loading

0 comments on commit 423d86d

Please sign in to comment.