diff --git a/RocksDbSharp/DbOptions.cs b/RocksDbSharp/DbOptions.cs index 2031f88..5b09e45 100644 --- a/RocksDbSharp/DbOptions.cs +++ b/RocksDbSharp/DbOptions.cs @@ -588,5 +588,15 @@ public DbOptions PrepareForBulkLoad() return this; } + /// + /// Sets Merge Operator for the database. + /// + public DbOptions SetMergeOperator(IMergeOperator mergeOperator) + { + if (mergeOperator == null) + throw new ArgumentNullException(nameof(mergeOperator)); + Native.Instance.rocksdb_options_set_merge_operator(Handle, mergeOperator.Handle); + return this; + } } } diff --git a/RocksDbSharp/MergeOperator.cs b/RocksDbSharp/MergeOperator.cs new file mode 100644 index 0000000..dd2db34 --- /dev/null +++ b/RocksDbSharp/MergeOperator.cs @@ -0,0 +1,98 @@ +using System; +using System.Runtime.InteropServices; + +namespace RocksDbSharp +{ + public interface IMergeOperator + { + IntPtr Handle { get; } + } + + public abstract class MergeOperatorBase : IMergeOperator, IDisposable + { + public IntPtr Handle { get; private set; } + private GCHandle gcHandle; + public string Name { get; private set; } + private IntPtr NamePtr { get; set; } + + protected unsafe MergeOperatorBase(string name) + { + if (name == null) + throw new ArgumentNullException(nameof(name)); + + Name = name; + gcHandle = GCHandle.Alloc(this); + Handle = Native.Instance.rocksdb_mergeoperator_create(GCHandle.ToIntPtr(gcHandle), CallbackDestructor, CallbackFullMerge, CallbackPartialMerge, CallbackDeleteValue, CallbackName); + NamePtr = Marshal.StringToHGlobalAnsi(Name); + } + + protected virtual void OnDestroy() + { + Handle = IntPtr.Zero; + gcHandle.Free(); + Marshal.FreeHGlobal(NamePtr); + NamePtr = IntPtr.Zero; + } + protected abstract byte[] OnFullMerge(byte[] key, byte[] existingValue, byte[][] operands, out bool success); + protected abstract byte[] OnPartialMerge(byte[] key, byte[][] operands, out bool success); + + public void Dispose() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_mergeoperator_destroy(Handle); + } + + private static void CallbackDestructor(IntPtr target) => ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnDestroy(); + + private unsafe static byte* CallbackFullMerge(IntPtr target, byte* key, UIntPtr keyLength, byte* existingValue, UIntPtr existingValueLength, byte** operandsList, UIntPtr* operandsLengthList, int operandsCount, out bool success, out UIntPtr newValueLength) + { + var keyArr = new byte[(uint)keyLength]; + + Marshal.Copy((IntPtr)key, keyArr, 0, keyArr.Length); + byte[] existingValueArr = null; + if ((IntPtr)existingValue != IntPtr.Zero) + { + existingValueArr = new byte[(uint)existingValueLength]; + Marshal.Copy((IntPtr)existingValue, existingValueArr, 0, existingValueArr.Length); + } + + var operands = new byte[operandsCount][]; + for (int i = 0; i < operandsCount; i++) + { + var operand = new byte[(uint)operandsLengthList[i]]; + Marshal.Copy((IntPtr)operandsList[i], operand, 0, operand.Length); + operands[i] = operand; + } + + var ret = ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnFullMerge(keyArr, existingValueArr, operands, out success); + var arr = Marshal.AllocHGlobal(ret.Length); + Marshal.Copy(ret, 0, arr, ret.Length); + newValueLength = (UIntPtr)ret.Length; + return (byte*)arr; + } + + private unsafe static byte* CallbackPartialMerge(IntPtr target, byte* key, UIntPtr keyLength, byte** operandsList, UIntPtr* operandsLengthList, int operandsCount, out bool success, out UIntPtr newValueLength) + { + var keyArr = new byte[(uint)keyLength]; + Marshal.Copy((IntPtr)key, keyArr, 0, keyArr.Length); + + var operands = new byte[operandsCount][]; + for (int i = 0; i < operandsCount; i++) + { + var operand = new byte[(uint)operandsLengthList[i]]; + Marshal.Copy((IntPtr)operandsList[i], operand, 0, operand.Length); + operands[i] = operand; + } + + var ret = ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnPartialMerge(keyArr, operands, out success); + var arr = Marshal.AllocHGlobal(ret.Length); + Marshal.Copy(ret, 0, arr, ret.Length); + newValueLength = (UIntPtr)ret.Length; + return (byte*)arr; + } + + private static void CallbackDeleteValue(IntPtr target, IntPtr value, UIntPtr valueLength) => Marshal.FreeHGlobal(value); + + private static IntPtr CallbackName(IntPtr target) => ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).NamePtr; + } +} diff --git a/RocksDbSharp/Native.Marshaled.cs b/RocksDbSharp/Native.Marshaled.cs index 81b5329..02c5f54 100644 --- a/RocksDbSharp/Native.Marshaled.cs +++ b/RocksDbSharp/Native.Marshaled.cs @@ -82,6 +82,43 @@ public void rocksdb_put( } } + public void rocksdb_merge( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + string key, + string val, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + int klength = key.Length; + int vlength = val.Length; + int bklength = encoding.GetByteCount(k, klength); + int bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + byte* bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + if (cf == null) + rocksdb_merge(db, writeOptions, bk, (ulong)bklength, bv, (ulong)bvlength, out errptr); + else + rocksdb_merge_cf(db, writeOptions, cf.Handle, bk, (ulong)bklength, bv, (ulong)bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + public string rocksdb_get( /*rocksdb_t**/ IntPtr db, /*const rocksdb_readoptions_t**/ IntPtr read_options, diff --git a/RocksDbSharp/Native.Raw.cs b/RocksDbSharp/Native.Raw.cs index e770d37..36f5bc7 100644 --- a/RocksDbSharp/Native.Raw.cs +++ b/RocksDbSharp/Native.Raw.cs @@ -43,6 +43,20 @@ namespace RocksDbSharp public delegate void WriteBatchIteratePutCallback(IntPtr s, /*(const char*)*/ IntPtr k, /*(size_t)*/ ulong klen, /*(const char*)*/ IntPtr v, /*(size_t)*/ ulong vlen); //void (*deleted)(void*, const char* k, /*(size_t)*/ ulong klen) public delegate void WriteBatchIterateDeleteCallback(IntPtr s, /*(const char*)*/ IntPtr k, /*(size_t)*/ ulong klen); +public delegate void MergeOperatorDestructorCallback(IntPtr s); +//char* (*full_merge)(void*, /*const*/ byte* key, size_t key_length, const char* existing_value, size_t existing_value_length, const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length), +public unsafe delegate /*char* */ byte* MergeOperatorFullMergeCallback(IntPtr s, /*const byte* */ byte* key, /*(size_t)*/UIntPtr key_length, +/*const char* */ byte* existing_value, /* size_t existing_value_length */UIntPtr existing_value_length, +/*const char* const* */ byte** operands_list, /*const size_t**/UIntPtr* operands_length_list, int num_operands, +/*unsigned char* */ [MarshalAs(UnmanagedType.U1), Out]out bool success, /*size_t* */out UIntPtr new_value_length); +//char* (*partial_merge)(void*, /*const*/ byte* key, size_t key_length, const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length), +public unsafe delegate /*char* */ byte* MergeOperatorPartialMergeCallback(IntPtr s, /*const byte* */ byte* key, /*(size_t)*/UIntPtr key_length, +/*const char* const* */ byte** operands_list, /*const size_t**/UIntPtr* operands_length_list, int num_operands, /*unsigned char* */[MarshalAs(UnmanagedType.U1), Out]out bool success/* IntPtr success*/, /*size_t* */out UIntPtr new_value_length); +//void (*delete_value)(void*, /*const*/ byte* value, size_t value_length), +public delegate void MergeOperatorDeleteValueCallback(IntPtr s, /*const byte* */ IntPtr key, /*(size_t)*/UIntPtr key_length); +//const char* (*name)(void*)); +public delegate /*const char* */ IntPtr MergeOperatorNameCallback(IntPtr s); + public abstract partial class Native { /* BEGIN c.h */ @@ -219,11 +233,20 @@ public unsafe abstract void rocksdb_merge( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*const*/ byte* key, ulong keylen, /*const*/ byte* val, ulong vallen, out IntPtr errptr); +public abstract void rocksdb_merge( + /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*const*/ byte[] key, + long keylen, /*const*/ byte[] val, long vallen, out IntPtr errptr); + public unsafe abstract void rocksdb_merge_cf( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*const*/ byte* key, ulong keylen, /*const*/ byte* val, ulong vallen, out IntPtr errptr); +public abstract void rocksdb_merge_cf( + /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*const*/ byte[] key, + long keylen, /*const*/ byte[] val, long vallen, out IntPtr errptr); + public abstract void rocksdb_write( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*(rocksdb_writebatch_t*)*/ IntPtr writeBatch, out IntPtr errptr); @@ -1116,26 +1139,12 @@ public abstract void rocksdb_filterpolicy_destroy( #endregion #region Merge Operator -#if ROCKSDB_MERGE_OPERATOR public abstract /* rocksdb_mergeoperator_t* */ IntPtr rocksdb_mergeoperator_create( - void* state, void (*destructor)(void*), - char* (*full_merge)(void*, /*const*/ byte* key, size_t key_length, - const char* existing_value, - size_t existing_value_length, - const char* const* operands_list, - const size_t* operands_list_length, int num_operands, - unsigned char* success, size_t* new_value_length), - char* (*partial_merge)(void*, /*const*/ byte* key, size_t key_length, - const char* const* operands_list, - const size_t* operands_list_length, int num_operands, - unsigned char* success, size_t* new_value_length), - void (*delete_value)(void*, /*const*/ byte* value, size_t value_length), - const char* (*name)(void*)); + IntPtr state, MergeOperatorDestructorCallback destructor, MergeOperatorFullMergeCallback full_merge, MergeOperatorPartialMergeCallback partial_merge, MergeOperatorDeleteValueCallback delete_value, MergeOperatorNameCallback name); public abstract void rocksdb_mergeoperator_destroy( - rocksdb_mergeoperator_t*); + /*rocksdb_mergeoperator_t*)*/IntPtr mergeoperator); -#endif #endregion #region Read options diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index 32eb951..c14bd06 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -104,6 +104,36 @@ public void rocksdb_put( throw new RocksDbException(errptr); } + public void rocksdb_merge( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + string key, + string val, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + rocksdb_merge(db, writeOptions, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_merge( + IntPtr db, + IntPtr writeOptions, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + if (cf == null) + rocksdb_merge(db, writeOptions, key, keyLength, value, valueLength, out errptr); + else + rocksdb_merge_cf(db, writeOptions, cf.Handle, key, keyLength, value, valueLength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } public string rocksdb_get( /*rocksdb_t**/ IntPtr db, diff --git a/RocksDbSharp/RocksDb.cs b/RocksDbSharp/RocksDb.cs index 2b54559..1851b36 100644 --- a/RocksDbSharp/RocksDb.cs +++ b/RocksDbSharp/RocksDb.cs @@ -170,6 +170,21 @@ public void Put(byte[] key, long keyLength, byte[] value, long valueLength, Colu Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); } + public void Merge(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) + { + Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Merge(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Merge(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); + } + + public void Merge(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); + } + public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { IntPtr iteratorHandle = cf == null diff --git a/tests/RocksDbSharpTest/FunctionalTests.cs b/tests/RocksDbSharpTest/FunctionalTests.cs index cc2872b..b0b649c 100644 --- a/tests/RocksDbSharpTest/FunctionalTests.cs +++ b/tests/RocksDbSharpTest/FunctionalTests.cs @@ -23,11 +23,24 @@ public void FunctionalTest() var options = new DbOptions() .SetCreateIfMissing(true) - .EnableStatistics(); + .EnableStatistics() + .SetMergeOperator(new TestConcatMergeOperator()); // Using standard open using (var db = RocksDb.Open(options, path)) { + db.Put("mergable", "a"); + db.Merge("mergable", "b"); + db.Merge("mergable", "c"); + Assert.Equal("abc", db.Get("mergable")); + db.Remove("mergable"); + + db.Merge("mergable2", "a"); + db.Merge("mergable2", "b"); + db.Merge("mergable2", "c"); + Assert.Equal("abc", db.Get("mergable2")); + db.Remove("mergable2"); + // With strings string value = db.Get("key"); db.Put("key", "value"); diff --git a/tests/RocksDbSharpTest/TestConcatMergeOperator.cs b/tests/RocksDbSharpTest/TestConcatMergeOperator.cs new file mode 100644 index 0000000..9c4ac19 --- /dev/null +++ b/tests/RocksDbSharpTest/TestConcatMergeOperator.cs @@ -0,0 +1,42 @@ +using System; +using System.Linq; +using RocksDbSharp; + +namespace RocksDbSharpTest +{ + public sealed class TestConcatMergeOperator : MergeOperatorBase + { + public TestConcatMergeOperator() : base("CONCAT") { } + + protected override byte[] OnFullMerge(byte[] key, byte[] existingValue, byte[][] operands, out bool success) + { + var pos = 0; + var arr = new byte[(existingValue?.Length ?? 0) + operands.Sum(x => x.Length)]; + if (existingValue != null) + { + Buffer.BlockCopy(existingValue, 0, arr, 0, existingValue.Length); + pos = existingValue.Length; + } + foreach (var operand in operands) + { + Buffer.BlockCopy(operand, 0, arr, pos, operand.Length); + pos += operand.Length; + } + success = true; + return arr; + } + + protected override byte[] OnPartialMerge(byte[] key, byte[][] operands, out bool success) + { + var arr = new byte[operands.Sum(x => x.Length)]; + var pos = 0; + foreach (var operand in operands) + { + Buffer.BlockCopy(operand, 0, arr, pos, operand.Length); + pos += operand.Length; + } + success = true; + return arr; + } + } +}