Skip to content

Commit

Permalink
Merge branch 'fscache' of https://github.com/0chain/zs3server into fe…
Browse files Browse the repository at this point in the history
…at/enterprise-cache
  • Loading branch information
Hitenjain14 committed Aug 20, 2024
2 parents af2bb02 + 82a4f33 commit b4171a5
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 41 deletions.
101 changes: 92 additions & 9 deletions cmd/bucket-listobjects-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package cmd

import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/minio/minio/internal/logger"
Expand All @@ -46,12 +48,60 @@ func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
g.Wait()
}

func mergeListObjects(l1, l2 []ObjectInfo) []ObjectInfo {
mergedMap := make(map[string]ObjectInfo)

// Helper function to add/update map entries
addOrUpdate := func(obj ObjectInfo) {
if existingObj, found := mergedMap[obj.Name]; !found || obj.ModTime.After(existingObj.ModTime) {
mergedMap[obj.Name] = obj
}
}
for _, obj := range l1 {
addOrUpdate(obj)
}
for _, obj := range l2 {
addOrUpdate(obj)
}

mergedList := make([]ObjectInfo, 0, len(mergedMap))
for _, obj := range mergedMap {
mergedList = append(mergedList, obj)
}

return mergedList
}

func mergePrefixes(l1, l2 []string) []string {
mergedMap := make(map[string]bool)

// Helper function to add/update map entries
addOrUpdate := func(pre string) {
if _, found := mergedMap[pre]; !found {
mergedMap[pre] = true
}
}
for _, pre := range l1 {
addOrUpdate(pre)
}
for _, pre := range l2 {
addOrUpdate(pre)
}

mergedList := make([]string, 0, len(mergedMap))
for pre, _ := range mergedMap {
mergedList = append(mergedList, pre)
}

return mergedList
}

// Validate all the ListObjects query arguments, returns an APIErrorCode
// if one of the args do not meet the required conditions.
// Special conditions required by MinIO server are as below
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int) APIErrorCode {
// Max keys cannot be negative.
if maxKeys < 0 {
Expand Down Expand Up @@ -200,6 +250,12 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
// NOTE: It is recommended that this API to be used for application development.
// MinIO continues to support ListObjectsV1 for supporting legacy tools.
func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
fmt.Println("ListObjectsV2Handler")
st := time.Now()
defer func() {
elapsed := time.Since(st).Milliseconds()
fmt.Printf("ListObjectsV2Handler took %d ms\n", elapsed)
}()
ctx := newContext(r, w, "ListObjectsV2")

defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
Expand Down Expand Up @@ -235,9 +291,15 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
}

var (
listObjectsV2Info ListObjectsV2Info
err error
listObjectsV2Info ListObjectsV2Info
listObjectsV2InfoCache ListObjectsV2Info
err error
errC error
)
listObjectsV2Cache := objectAPI.ListObjectsV2
if api.CacheAPI() != nil {
listObjectsV2Cache = api.CacheAPI().ListObjectsV2
}

if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(prefix, archivePattern) {
// Inititate a list objects operation inside a zip file based in the input params
Expand All @@ -247,11 +309,18 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
listObjectsV2Info, err = objectAPI.ListObjectsV2(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
listObjectsV2InfoCache, errC = listObjectsV2Cache(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
}
if err != nil {
if err != nil || errC != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// fmt.Printf("Harsh listObjectsV2Infooo %+v\n", listObjectsV2Info)
// fmt.Printf("Harsh listObjectsV2InfoCache %+v\n", listObjectsV2InfoCache)
mergeObjects := mergeListObjects(listObjectsV2Info.Objects, listObjectsV2InfoCache.Objects)
mergePrefixes := mergePrefixes(listObjectsV2Info.Prefixes, listObjectsV2InfoCache.Prefixes)
listObjectsV2Info.Objects = mergeObjects
listObjectsV2Info.Prefixes = mergePrefixes

concurrentDecryptETag(ctx, listObjectsV2Info.Objects)

Expand Down Expand Up @@ -306,8 +375,13 @@ func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http
// This implementation of the GET operation returns some or all (up to 1000)
// of the objects in a bucket. You can use the request parameters as selection
// criteria to return a subset of the objects in a bucket.
//
func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
fmt.Println("ListObjectsV1Handler")
st := time.Now()
defer func() {
elapsed := time.Since(st).Milliseconds()
fmt.Printf("ListObjectsV1Handler took %d ms\n", elapsed)
}()
ctx := newContext(r, w, "ListObjectsV1")

defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
Expand Down Expand Up @@ -340,15 +414,24 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
}

listObjects := objectAPI.ListObjects

listObjectsCache := objectAPI.ListObjects
if api.CacheAPI() != nil {
listObjectsCache = api.CacheAPI().ListObjects
}
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
listObjectsInfo, err := listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
listObjectsInfoCache, errC := listObjectsCache(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil || errC != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
mergeObjects := mergeListObjects(listObjectsInfo.Objects, listObjectsInfoCache.Objects)
mergePrefixes := mergePrefixes(listObjectsInfo.Prefixes, listObjectsInfoCache.Prefixes)

listObjectsInfo.Objects = mergeObjects
listObjectsInfo.Prefixes = mergePrefixes

concurrentDecryptETag(ctx, listObjectsInfo.Objects)

Expand Down
24 changes: 22 additions & 2 deletions cmd/disk-cache-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,22 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
go cache.purgeWait(ctx)
go cache.cleanupStaleUploads(ctx)
if cache.commitWriteback {
go cache.scanCacheWritebackFailures(ctx)
go func() {
tickInterval := time.Duration(config.WriteBackInterval) * time.Second
fmt.Println("write back time interval", tickInterval)
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
defer close(cache.retryWritebackCh)
for {
select {
case <-ticker.C:
cache.scanCacheWritebackFailures(ctx)
case <-ctx.Done():
return
}
}
}()
//go cache.scanCacheWritebackFailures(ctx)
}
cache.diskSpaceAvailable(0) // update if cache usage is already high.
cache.NewNSLockFn = func(cachePath string) RWLocker {
Expand Down Expand Up @@ -1068,6 +1083,7 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
// Get returns ObjectInfo and reader for object from disk cache
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
fmt.Println("cacheObjPath getCacheSHADir", cacheObjPath)
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
Expand Down Expand Up @@ -1168,6 +1184,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
go func() {
if writebackInProgress(objInfo.UserDefined) {
cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object)
fmt.Println("cacheObjPath getCacheSHADir", cacheObjPath)
}
filePath := pathJoin(cacheObjPath, cacheFile)
err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw)
Expand Down Expand Up @@ -1225,20 +1242,23 @@ func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {

// queues writeback upload failures on server startup
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
defer close(c.retryWritebackCh)
fmt.Println("scan cache write back failures")
//defer close(c.retryWritebackCh) // don't close the channel
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
return nil
}
cacheDir := pathJoin(c.dir, name)
fmt.Println("cachedir to be uploaded in backend", cacheDir)
meta, _, _, err := c.statCachedMeta(ctx, cacheDir)
if err != nil {
return nil
}

objInfo := meta.ToObjectInfo()
status, ok := objInfo.UserDefined[writeBackStatusHeader]
fmt.Println("status of file", status)
if !ok || status == CommitComplete.String() {
return nil
}
Expand Down
Loading

0 comments on commit b4171a5

Please sign in to comment.