From d98bdf5c57e6ef5349e02006c4fdb7b07e2e54b0 Mon Sep 17 00:00:00 2001 From: Christian Weilbach Date: Wed, 29 Mar 2023 20:39:08 -0700 Subject: [PATCH] Support prefixing the store inside a bucket. Delete in batches of 1000. --- README.md | 6 +++-- src/konserve_s3/core.clj | 49 +++++++++++++++++++++------------- test/konserve_s3/core_test.clj | 3 ++- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index cc7674e..7caff90 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,9 @@ For asynchronous execution take a look at the [konserve example](https://github. (def s3-spec {:region "us-west-1" - :bucket "konserve-demo"}) + :bucket "konserve-demo" + :store-id "test-store" ;; allows multiple stores per bucket + }) (def store (connect-s3-store s3-spec :opts {:sync? true})) @@ -40,7 +42,7 @@ For asynchronous execution take a look at the [konserve example](https://github. (k/bassoc store :binbar (byte-array (range 10)) {:sync? true}) (k/bget store :binbar (fn [{:keys [input-stream]}] - (map byte (slurp input-stream))) + (map byte (slurp input-stream))) {:sync? true}) ``` diff --git a/src/konserve_s3/core.clj b/src/konserve_s3/core.clj index 2976174..589315a 100644 --- a/src/konserve_s3/core.clj +++ b/src/konserve_s3/core.clj @@ -25,7 +25,7 @@ ListObjectsRequest ListObjectsResponse GetObjectRequest GetObjectResponse PutObjectRequest PutObjectRequest - CopyObjectRequest DeleteObjectRequest HeadObjectRequest + CopyObjectRequest Delete DeleteObjectRequest DeleteObjectsRequest HeadObjectRequest NoSuchBucketException NoSuchKeyException] [software.amazon.awssdk.core.sync RequestBody])) @@ -124,13 +124,22 @@ (.key key) (.build)))) -(def ^:const default-bucket "konserve") +(defn delete-keys [client bucket keys] + (.deleteObject client (-> (DeleteObjectsRequest/builder) + (.bucket bucket) + (.delete (-> (Delete/builder) + (.objects keys) + (.build))) + (.build)))) (extend-protocol PBackingLock Boolean (-release [_ env] (if (:sync? env) nil (go-try- nil)))) +(defn ->key [store-id key] + (str store-id "_" key)) + (defrecord S3Blob [bucket key data fetched-object] PBackingBlob (-sync [_ env] @@ -192,25 +201,25 @@ (async+sync (:sync? env) *default-sync-translation* (go-try- (swap! data assoc :value blob))))) -(defrecord S3Bucket [client bucket] +(defrecord S3Bucket [client bucket store-id] PBackingStore (-create-blob [this store-key env] (async+sync (:sync? env) *default-sync-translation* - (go-try- (S3Blob. this store-key (atom {}) (atom nil))))) + (go-try- (S3Blob. this (->key store-id store-key) (atom {}) (atom nil))))) (-delete-blob [_ store-key env] (async+sync (:sync? env) *default-sync-translation* - (go-try- (delete client bucket store-key)))) + (go-try- (delete client bucket (->key store-id store-key))))) (-blob-exists? [_ store-key env] (async+sync (:sync? env) *default-sync-translation* - (go-try- (exists? client bucket store-key)))) + (go-try- (exists? client bucket (->key store-id store-key))))) (-copy [_ from to env] (async+sync (:sync? env) *default-sync-translation* - (go-try- (copy client bucket from to)))) + (go-try- (copy client bucket (->key store-id from) (->key store-id to))))) (-atomic-move [_ from to env] (async+sync (:sync? env) *default-sync-translation* (go-try- - (copy client bucket from to) - (delete client bucket from)))) + (copy client bucket (->key store-id from) (->key store-id to)) + (delete client bucket (->key store-id from))))) (-migratable [_ _key _store-key env] (if (:sync? env) nil (go-try- nil))) (-migrate [_ _migration-key _key-vec _serializer _read-handlers _write-handlers env] @@ -226,21 +235,22 @@ (async+sync (:sync? env) *default-sync-translation* (go-try- (when (bucket-exists? client bucket) (info "This will delete all konserve files, but won't delete the bucket. You can use konserve-s3.core/delete-bucket if you intend to delete the bucket as well.") - (doseq [key (filter (fn [^String key] - (or (.endsWith key ".ksv") - (.endsWith key ".ksv.new") - (.endsWith key ".ksv.backup"))) - (list-objects client bucket))] - (delete client bucket key)) + (doseq [keys (partition 1000 (list-objects client bucket))] + (delete-keys client bucket keys)) (.close client))))) (-keys [_ env] (async+sync (:sync? env) *default-sync-translation* - (go-try- (list-objects client bucket))))) + (go-try- (filter (fn [^String key] + (and (.startsWith key store-id) + (or (.endsWith key ".ksv") + (.endsWith key ".ksv.new") + (.endsWith key ".ksv.backup")))) + (list-objects client bucket)))))) (defn connect-store [s3-spec & {:keys [opts] :as params}] (let [complete-opts (merge {:sync? true} opts) - backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec)) + backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec)) config (merge {:opts complete-opts :config {:sync-blob? true :in-place? false @@ -258,7 +268,7 @@ (defn delete-store [s3-spec & {:keys [opts]}] (let [complete-opts (merge {:sync? true} opts) - backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec))] + backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec))] (-delete-store backing complete-opts))) (comment @@ -266,7 +276,8 @@ (require '[konserve.core :as k]) (def s3-spec {:region "us-west-1" - :bucket "konserve-s3-test2" + :bucket "konserve-s3" + :store-id "test2" :x-ray? true :access-key "ACCESS_KEY" :password "SECRET"}) diff --git a/test/konserve_s3/core_test.clj b/test/konserve_s3/core_test.clj index 374af62..f92c1f6 100644 --- a/test/konserve_s3/core_test.clj +++ b/test/konserve_s3/core_test.clj @@ -4,7 +4,8 @@ [konserve.compliance-test :refer [compliance-test]] [konserve-s3.core :refer [connect-store release delete-store]])) -(def s3-spec {:region "us-west-1"}) +(def s3-spec {:region "us-west-1" + :store-id "test-store"}) (deftest s3-compliance-sync-test (let [s3-spec (assoc s3-spec :bucket "konserve-s3-sync-test")