Skip to content

Commit

Permalink
Merge pull request #160 from 0chain/feat/ftextsearch
Browse files Browse the repository at this point in the history
File content Search
  • Loading branch information
dabasov authored Dec 2, 2024
2 parents 48325ee + 3ad8e12 commit 5cd6fe5
Show file tree
Hide file tree
Showing 17 changed files with 670 additions and 36 deletions.
10 changes: 10 additions & 0 deletions client-api/s3api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ func Handler(w http.ResponseWriter, r *http.Request) {
break
}
JSON(w, 200, statObjectResponse)
case "search":
query := r.URL.Query().Get("query")
searchResObj, err := searchObject(query)
if err != nil {
JSON(w, 500, map[string]string{"error": err.Error()})
break
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(searchResObj)
default:
JSON(w, 500, map[string]string{"message": "feature not avaliable"})
}
Expand Down
28 changes: 28 additions & 0 deletions client-api/s3api/search_object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package s3api

import (
"fmt"
"io"
"log"
"net/http"
"net/url"
)

func searchObject(query string) ([]byte, error) {
u, _ := url.Parse("http://zsearch:3003/search")
q := u.Query()
q.Set("query", query)
u.RawQuery = q.Encode()
log.Println("search url", u.String())
resp, err := http.Get(u.String())
if err != nil {
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return body, nil
}
79 changes: 73 additions & 6 deletions cmd/disk-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -113,12 +114,13 @@ type cacheObjects struct {
// number of accesses after which to cache an object
after int
// commit objects in async manner
commitWriteback bool
commitWritethrough bool
maxCacheFileSize int64
uploadWorkers int
uploadQueueTh int

commitWriteback bool
commitWritethrough bool
maxCacheFileSize int64
uploadWorkers int
uploadQueueTh int
indexSvcUrl string
contentSearchEnable string
// if true migration is in progress from v1 to v2
migrating bool
// retry queue for writeback cache mode to reattempt upload to backend
Expand Down Expand Up @@ -191,9 +193,36 @@ func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string,
}
dcache.Delete(ctx, bucket, object)
c.deleteFromListTree(bucket + "/" + object)
if c.contentSearchEnable == "true" {
go c.deleteObjectFromIndex(bucket, object)
}
return objInfoB, errB
}

func (c *cacheObjects) deleteObjectFromIndex(bucket string, object string) {
u, err := url.Parse(c.indexSvcUrl + "/delete")
if err != nil {
log.Println("err parsing url of zsearch delete", err)
return
}
q := u.Query()
q.Set("bucketName", bucket)
q.Set("objName", object)
u.RawQuery = q.Encode()
dreq, err := http.NewRequest(http.MethodDelete, u.String(), nil)
if err != nil {
log.Println("err creating delete req", err)
return
}
client := &http.Client{}
resp, err := client.Do(dreq)
if err != nil {
log.Println("err sending delete file req", err)
return
}
defer resp.Body.Close()
}

// DeleteObjects batch deletes objects in slice, and clears any cached entries
func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
errs := make([]error, len(objects))
Expand Down Expand Up @@ -984,6 +1013,42 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
time.Sleep(time.Second * time.Duration(retryCnt%10+1))
c.queueWritebackRetry(oi)
}
if c.contentSearchEnable == "true" {
log.Println("indexing file started")
cReader2, _, bErr2 := dcache.Get(ctx, oi.Bucket, oi.Name, nil, http.Header{}, ObjectOptions{})
if bErr2 != nil {
return
}
defer cReader2.Close()
c.indexFile(cReader2, oi.Bucket, oi.Name)
}
}

func (c *cacheObjects) indexFile(body io.ReadCloser, bucket string, object string) {
log.Println("indexing file", bucket+"/"+object)

indexUrl := c.indexSvcUrl + "/zindex"
u, err := url.Parse(indexUrl)
if err != nil {
log.Println("err parsing url of zsearch", err)
return
}
query := u.Query()
query.Set("bucketName", bucket)
query.Set("objName", object)
u.RawQuery = query.Encode()
req, err := http.NewRequest("PUT", u.String(), body)
if err != nil {
log.Println("err creating index req", err)
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Println("err sending file for indexing", err)
return
}
defer resp.Body.Close()
}

func (c *cacheObjects) deleteFromListTree(key string) {
Expand Down Expand Up @@ -1024,6 +1089,8 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
maxCacheFileSize: config.MaxCacheFileSize,
uploadWorkers: config.UploadWorkers,
uploadQueueTh: config.UploadQueueTh,
indexSvcUrl: config.IndexSvcUrl,
contentSearchEnable: config.ContentSearchEnable,
cacheStats: newCacheStats(),
listTree: newThreadSafeListTree(),
writeBackUploadBufferCh: make(chan ObjectInfo, 100000),
Expand Down
24 changes: 23 additions & 1 deletion environment/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ services:
MINIO_CACHE_QUOTA: 99 # maximum permitted usage of the cache in percentage
MINIO_CACHE_WATERMARK_LOW: 90 # % of cache quota at which cache eviction stops
MINIO_CACHE_WATERMARK_HIGH: 95 # % of cache quota at which cache eviction starts
MINIO_WRITE_BACK_INTERVAL: 900 # interval in seconds
MINIO_WRITE_BACK_INTERVAL: 900 # interval in seconds for ticker to upload any remaining file to backend
MINIO_MAX_CACHE_FILE_SIZE: 1000000000 # max file size in bytes
MINIO_WRITE_BACK_UPLOAD_WORKERS: 20
MINIO_UPLOAD_QUEUE_TH: 10
INDEX_SVC_URL: "http://zsearch:3003"
CONTENT_SEARCH_ENABLE: "true"
links:
- logsearchapi:logsearchapi
volumes:
Expand All @@ -78,7 +80,27 @@ services:
networks:
zs_network:

tika:
image: apache/tika:latest
container_name: tika-server
ports:
- "9998:9998"
networks:
zs_network:

zsearch:
build: ../zsearch
container_name: zsearch
ports:
- 3003:3003
volumes:
- bleve-index:/vindex
networks:
zs_network:


volumes:
bleve-index:
db:
driver: local

Expand Down
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ use (
.
./client-api
./cmd/gateway/zcn/multipart-test/s3gateway
./zsearch
)
32 changes: 17 additions & 15 deletions internal/config/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,23 @@ const (

// Config represents cache config settings
type Config struct {
Enabled bool `json:"-"`
Drives []string `json:"drives"`
Expiry int `json:"expiry"`
MaxUse int `json:"maxuse"`
Quota int `json:"quota"`
Exclude []string `json:"exclude"`
After int `json:"after"`
WatermarkLow int `json:"watermark_low"`
WatermarkHigh int `json:"watermark_high"`
Range bool `json:"range"`
CacheCommitMode string `json:"commit"`
WriteBackInterval int `json:"wb_interval"`
MaxCacheFileSize int64 `json:"max_cache_file_size"`
UploadWorkers int `json:"upload_workers"`
UploadQueueTh int `json:"upload_queue_th"`
Enabled bool `json:"-"`
Drives []string `json:"drives"`
Expiry int `json:"expiry"`
MaxUse int `json:"maxuse"`
Quota int `json:"quota"`
Exclude []string `json:"exclude"`
After int `json:"after"`
WatermarkLow int `json:"watermark_low"`
WatermarkHigh int `json:"watermark_high"`
Range bool `json:"range"`
CacheCommitMode string `json:"commit"`
WriteBackInterval int `json:"wb_interval"`
MaxCacheFileSize int64 `json:"max_cache_file_size"`
UploadWorkers int `json:"upload_workers"`
UploadQueueTh int `json:"upload_queue_th"`
IndexSvcUrl string `json:"index_svc_url"`
ContentSearchEnable string `json:"content_search_enable"`
}

// UnmarshalJSON - implements JSON unmarshal interface for unmarshalling
Expand Down
37 changes: 23 additions & 14 deletions internal/config/cache/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ const (
Range = "range"
Commit = "commit"

EnvCacheDrives = "MINIO_CACHE_DRIVES"
EnvCacheExclude = "MINIO_CACHE_EXCLUDE"
EnvCacheExpiry = "MINIO_CACHE_EXPIRY"
EnvCacheMaxUse = "MINIO_CACHE_MAXUSE"
EnvCacheQuota = "MINIO_CACHE_QUOTA"
EnvCacheAfter = "MINIO_CACHE_AFTER"
EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW"
EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH"
EnvCacheRange = "MINIO_CACHE_RANGE"
EnvCacheCommit = "MINIO_CACHE_COMMIT"
EnvWriteBackInterval = "MINIO_WRITE_BACK_INTERVAL"
EnvMaxCacheFileSize = "MINIO_MAX_CACHE_FILE_SIZE"
EnvUploadWorkers = "MINIO_WRITE_BACK_UPLOAD_WORKERS"
EnvUploadQueueTh = "MINIO_UPLOAD_QUEUE_TH"
EnvCacheDrives = "MINIO_CACHE_DRIVES"
EnvCacheExclude = "MINIO_CACHE_EXCLUDE"
EnvCacheExpiry = "MINIO_CACHE_EXPIRY"
EnvCacheMaxUse = "MINIO_CACHE_MAXUSE"
EnvCacheQuota = "MINIO_CACHE_QUOTA"
EnvCacheAfter = "MINIO_CACHE_AFTER"
EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW"
EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH"
EnvCacheRange = "MINIO_CACHE_RANGE"
EnvCacheCommit = "MINIO_CACHE_COMMIT"
EnvWriteBackInterval = "MINIO_WRITE_BACK_INTERVAL"
EnvMaxCacheFileSize = "MINIO_MAX_CACHE_FILE_SIZE"
EnvUploadWorkers = "MINIO_WRITE_BACK_UPLOAD_WORKERS"
EnvUploadQueueTh = "MINIO_UPLOAD_QUEUE_TH"
EnvIndexSvcUrl = "INDEX_SVC_URL"
EnvContentSearchEnable = "CONTENT_SEARCH_ENABLE"

EnvCacheEncryptionKey = "MINIO_CACHE_ENCRYPTION_SECRET_KEY"

Expand Down Expand Up @@ -260,5 +262,12 @@ func LookupConfig(kvs config.KVS) (Config, error) {
return cfg, config.ErrInvalidUploadQueueTh(err)
}
}

if indexSvcUrl := env.Get(EnvIndexSvcUrl, "http://zsearch:3003"); indexSvcUrl != "" {
cfg.IndexSvcUrl = indexSvcUrl
}
if contentSearchEnable := env.Get(EnvContentSearchEnable, "false"); contentSearchEnable != "" {
cfg.ContentSearchEnable = contentSearchEnable
}
return cfg, nil
}
18 changes: 18 additions & 0 deletions zsearch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM golang:1.21-alpine

RUN apk update && apk add --no-cache git

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . ./

RUN go build -o /zsearch

# VOLUME ["/vindex"]

EXPOSE 3003

CMD ["/zsearch"]
21 changes: 21 additions & 0 deletions zsearch/delete/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package handler

import (
"net/http"
"zsearch/delete/model"
)

func DeleteHandler(delChan chan<- model.DelReq) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
bucketName := r.URL.Query().Get("bucketName")
objName := r.URL.Query().Get("objName")
delReq := model.DelReq{
Path: bucketName + "/" + objName,
Filename: objName,
}

delChan <- delReq
w.WriteHeader(http.StatusOK)
w.Write([]byte("Files deleted successfully"))
}
}
6 changes: 6 additions & 0 deletions zsearch/delete/model/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package model

type DelReq struct {
Filename string `json:"filename"`
Path string `json:"path"`
}
40 changes: 40 additions & 0 deletions zsearch/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module zsearch

go 1.21

require (
github.com/bbalet/stopwords v1.0.0
github.com/blevesearch/bleve/v2 v2.4.1
github.com/google/go-tika v0.3.1
)

require (
github.com/RoaringBitmap/roaring v1.9.3 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blevesearch/bleve_index_api v1.1.9 // indirect
github.com/blevesearch/geo v0.1.20 // indirect
github.com/blevesearch/go-faiss v1.0.19 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
github.com/blevesearch/gtreap v0.1.1 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/blevesearch/scorch_segment_api/v2 v2.2.14 // indirect
github.com/blevesearch/segment v0.9.1 // indirect
github.com/blevesearch/snowballstem v0.9.0 // indirect
github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect
github.com/blevesearch/vellum v1.0.10 // indirect
github.com/blevesearch/zapx/v11 v11.3.10 // indirect
github.com/blevesearch/zapx/v12 v12.3.10 // indirect
github.com/blevesearch/zapx/v13 v13.3.10 // indirect
github.com/blevesearch/zapx/v14 v14.3.10 // indirect
github.com/blevesearch/zapx/v15 v15.3.13 // indirect
github.com/blevesearch/zapx/v16 v16.1.4 // indirect
github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/json-iterator/go v0.0.0-20171115153421-f7279a603ede // indirect
github.com/mschoch/smat v0.2.0 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
Loading

0 comments on commit 5cd6fe5

Please sign in to comment.