Skip to content

Added support for merge #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions RocksDbSharp/DbOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -588,5 +588,15 @@ public DbOptions PrepareForBulkLoad()
return this;
}

/// <summary>
/// Sets Merge Operator for the database.
/// </summary>
public DbOptions SetMergeOperator(IMergeOperator mergeOperator)
{
if (mergeOperator == null)
throw new ArgumentNullException(nameof(mergeOperator));
Native.Instance.rocksdb_options_set_merge_operator(Handle, mergeOperator.Handle);
return this;
}
}
}
98 changes: 98 additions & 0 deletions RocksDbSharp/MergeOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Runtime.InteropServices;

namespace RocksDbSharp
{
public interface IMergeOperator
{
IntPtr Handle { get; }
}

public abstract class MergeOperatorBase : IMergeOperator, IDisposable
{
public IntPtr Handle { get; private set; }
private GCHandle gcHandle;
public string Name { get; private set; }
private IntPtr NamePtr { get; set; }

protected unsafe MergeOperatorBase(string name)
{
if (name == null)
throw new ArgumentNullException(nameof(name));

Name = name;
gcHandle = GCHandle.Alloc(this);
Handle = Native.Instance.rocksdb_mergeoperator_create(GCHandle.ToIntPtr(gcHandle), CallbackDestructor, CallbackFullMerge, CallbackPartialMerge, CallbackDeleteValue, CallbackName);
NamePtr = Marshal.StringToHGlobalAnsi(Name);
}

protected virtual void OnDestroy()
{
Handle = IntPtr.Zero;
gcHandle.Free();
Marshal.FreeHGlobal(NamePtr);
NamePtr = IntPtr.Zero;
}
protected abstract byte[] OnFullMerge(byte[] key, byte[] existingValue, byte[][] operands, out bool success);
protected abstract byte[] OnPartialMerge(byte[] key, byte[][] operands, out bool success);

public void Dispose()
{
if (Handle != IntPtr.Zero)
Native.Instance.rocksdb_mergeoperator_destroy(Handle);
}

private static void CallbackDestructor(IntPtr target) => ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnDestroy();

private unsafe static byte* CallbackFullMerge(IntPtr target, byte* key, UIntPtr keyLength, byte* existingValue, UIntPtr existingValueLength, byte** operandsList, UIntPtr* operandsLengthList, int operandsCount, out bool success, out UIntPtr newValueLength)
{
var keyArr = new byte[(uint)keyLength];

Marshal.Copy((IntPtr)key, keyArr, 0, keyArr.Length);
byte[] existingValueArr = null;
if ((IntPtr)existingValue != IntPtr.Zero)
{
existingValueArr = new byte[(uint)existingValueLength];
Marshal.Copy((IntPtr)existingValue, existingValueArr, 0, existingValueArr.Length);
}

var operands = new byte[operandsCount][];
for (int i = 0; i < operandsCount; i++)
{
var operand = new byte[(uint)operandsLengthList[i]];
Marshal.Copy((IntPtr)operandsList[i], operand, 0, operand.Length);
operands[i] = operand;
}

var ret = ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnFullMerge(keyArr, existingValueArr, operands, out success);
var arr = Marshal.AllocHGlobal(ret.Length);
Marshal.Copy(ret, 0, arr, ret.Length);
newValueLength = (UIntPtr)ret.Length;
return (byte*)arr;
}

private unsafe static byte* CallbackPartialMerge(IntPtr target, byte* key, UIntPtr keyLength, byte** operandsList, UIntPtr* operandsLengthList, int operandsCount, out bool success, out UIntPtr newValueLength)
{
var keyArr = new byte[(uint)keyLength];
Marshal.Copy((IntPtr)key, keyArr, 0, keyArr.Length);

var operands = new byte[operandsCount][];
for (int i = 0; i < operandsCount; i++)
{
var operand = new byte[(uint)operandsLengthList[i]];
Marshal.Copy((IntPtr)operandsList[i], operand, 0, operand.Length);
operands[i] = operand;
}

var ret = ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).OnPartialMerge(keyArr, operands, out success);
var arr = Marshal.AllocHGlobal(ret.Length);
Marshal.Copy(ret, 0, arr, ret.Length);
newValueLength = (UIntPtr)ret.Length;
return (byte*)arr;
}

private static void CallbackDeleteValue(IntPtr target, IntPtr value, UIntPtr valueLength) => Marshal.FreeHGlobal(value);

private static IntPtr CallbackName(IntPtr target) => ((MergeOperatorBase)GCHandle.FromIntPtr(target).Target).NamePtr;
}
}
37 changes: 37 additions & 0 deletions RocksDbSharp/Native.Marshaled.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,43 @@ public void rocksdb_put(
}
}

public void rocksdb_merge(
/*rocksdb_t**/ IntPtr db,
/*const rocksdb_writeoptions_t**/ IntPtr writeOptions,
string key,
string val,
out IntPtr errptr,
ColumnFamilyHandle cf = null,
Encoding encoding = null)
{
unsafe
{
if (encoding == null)
encoding = Encoding.UTF8;
fixed (char* k = key, v = val)
{
int klength = key.Length;
int vlength = val.Length;
int bklength = encoding.GetByteCount(k, klength);
int bvlength = encoding.GetByteCount(v, vlength);
var buffer = Marshal.AllocHGlobal(bklength + bvlength);
byte* bk = (byte*)buffer.ToPointer();
encoding.GetBytes(k, klength, bk, bklength);
byte* bv = bk + bklength;
encoding.GetBytes(v, vlength, bv, bvlength);

if (cf == null)
rocksdb_merge(db, writeOptions, bk, (ulong)bklength, bv, (ulong)bvlength, out errptr);
else
rocksdb_merge_cf(db, writeOptions, cf.Handle, bk, (ulong)bklength, bv, (ulong)bvlength, out errptr);
#if DEBUG
Zero(bk, bklength);
#endif
Marshal.FreeHGlobal(buffer);
}
}
}

public string rocksdb_get(
/*rocksdb_t**/ IntPtr db,
/*const rocksdb_readoptions_t**/ IntPtr read_options,
Expand Down
41 changes: 25 additions & 16 deletions RocksDbSharp/Native.Raw.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ namespace RocksDbSharp
public delegate void WriteBatchIteratePutCallback(IntPtr s, /*(const char*)*/ IntPtr k, /*(size_t)*/ ulong klen, /*(const char*)*/ IntPtr v, /*(size_t)*/ ulong vlen);
//void (*deleted)(void*, const char* k, /*(size_t)*/ ulong klen)
public delegate void WriteBatchIterateDeleteCallback(IntPtr s, /*(const char*)*/ IntPtr k, /*(size_t)*/ ulong klen);
public delegate void MergeOperatorDestructorCallback(IntPtr s);
//char* (*full_merge)(void*, /*const*/ byte* key, size_t key_length, const char* existing_value, size_t existing_value_length, const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length),
public unsafe delegate /*char* */ byte* MergeOperatorFullMergeCallback(IntPtr s, /*const byte* */ byte* key, /*(size_t)*/UIntPtr key_length,
/*const char* */ byte* existing_value, /* size_t existing_value_length */UIntPtr existing_value_length,
/*const char* const* */ byte** operands_list, /*const size_t**/UIntPtr* operands_length_list, int num_operands,
/*unsigned char* */ [MarshalAs(UnmanagedType.U1), Out]out bool success, /*size_t* */out UIntPtr new_value_length);
//char* (*partial_merge)(void*, /*const*/ byte* key, size_t key_length, const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length),
public unsafe delegate /*char* */ byte* MergeOperatorPartialMergeCallback(IntPtr s, /*const byte* */ byte* key, /*(size_t)*/UIntPtr key_length,
/*const char* const* */ byte** operands_list, /*const size_t**/UIntPtr* operands_length_list, int num_operands, /*unsigned char* */[MarshalAs(UnmanagedType.U1), Out]out bool success/* IntPtr success*/, /*size_t* */out UIntPtr new_value_length);
//void (*delete_value)(void*, /*const*/ byte* value, size_t value_length),
public delegate void MergeOperatorDeleteValueCallback(IntPtr s, /*const byte* */ IntPtr key, /*(size_t)*/UIntPtr key_length);
//const char* (*name)(void*));
public delegate /*const char* */ IntPtr MergeOperatorNameCallback(IntPtr s);

public abstract partial class Native
{
/* BEGIN c.h */
Expand Down Expand Up @@ -219,11 +233,20 @@ public unsafe abstract void rocksdb_merge(
/*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*const*/ byte* key,
ulong keylen, /*const*/ byte* val, ulong vallen, out IntPtr errptr);

public abstract void rocksdb_merge(
/*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*const*/ byte[] key,
long keylen, /*const*/ byte[] val, long vallen, out IntPtr errptr);

public unsafe abstract void rocksdb_merge_cf(
/*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*const*/ byte* key,
ulong keylen, /*const*/ byte* val, ulong vallen, out IntPtr errptr);

public abstract void rocksdb_merge_cf(
/*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions,
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family, /*const*/ byte[] key,
long keylen, /*const*/ byte[] val, long vallen, out IntPtr errptr);

public abstract void rocksdb_write(
/*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions,
/*(rocksdb_writebatch_t*)*/ IntPtr writeBatch, out IntPtr errptr);
Expand Down Expand Up @@ -1116,26 +1139,12 @@ public abstract void rocksdb_filterpolicy_destroy(
#endregion

#region Merge Operator
#if ROCKSDB_MERGE_OPERATOR

public abstract /* rocksdb_mergeoperator_t* */ IntPtr rocksdb_mergeoperator_create(
void* state, void (*destructor)(void*),
char* (*full_merge)(void*, /*const*/ byte* key, size_t key_length,
const char* existing_value,
size_t existing_value_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length),
char* (*partial_merge)(void*, /*const*/ byte* key, size_t key_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length),
void (*delete_value)(void*, /*const*/ byte* value, size_t value_length),
const char* (*name)(void*));
IntPtr state, MergeOperatorDestructorCallback destructor, MergeOperatorFullMergeCallback full_merge, MergeOperatorPartialMergeCallback partial_merge, MergeOperatorDeleteValueCallback delete_value, MergeOperatorNameCallback name);
public abstract void rocksdb_mergeoperator_destroy(
rocksdb_mergeoperator_t*);
/*rocksdb_mergeoperator_t*)*/IntPtr mergeoperator);

#endif
#endregion

#region Read options
Expand Down
30 changes: 30 additions & 0 deletions RocksDbSharp/Native.Wrap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,36 @@ public void rocksdb_put(
throw new RocksDbException(errptr);
}

public void rocksdb_merge(
/*rocksdb_t**/ IntPtr db,
/*const rocksdb_writeoptions_t**/ IntPtr writeOptions,
string key,
string val,
ColumnFamilyHandle cf = null,
Encoding encoding = null)
{
rocksdb_merge(db, writeOptions, key, val, out IntPtr errptr, cf, encoding);
if (errptr != IntPtr.Zero)
throw new RocksDbException(errptr);
}

public void rocksdb_merge(
IntPtr db,
IntPtr writeOptions,
byte[] key,
long keyLength,
byte[] value,
long valueLength,
ColumnFamilyHandle cf)
{
IntPtr errptr;
if (cf == null)
rocksdb_merge(db, writeOptions, key, keyLength, value, valueLength, out errptr);
else
rocksdb_merge_cf(db, writeOptions, cf.Handle, key, keyLength, value, valueLength, out errptr);
if (errptr != IntPtr.Zero)
throw new RocksDbException(errptr);
}

public string rocksdb_get(
/*rocksdb_t**/ IntPtr db,
Expand Down
15 changes: 15 additions & 0 deletions RocksDbSharp/RocksDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ public void Put(byte[] key, long keyLength, byte[] value, long valueLength, Colu
Native.Instance.rocksdb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf);
}

public void Merge(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null)
{
Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding);
}

public void Merge(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null)
{
Merge(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions);
}

public void Merge(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null)
{
Native.Instance.rocksdb_merge(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf);
}

public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null)
{
IntPtr iteratorHandle = cf == null
Expand Down
15 changes: 14 additions & 1 deletion tests/RocksDbSharpTest/FunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,24 @@ public void FunctionalTest()

var options = new DbOptions()
.SetCreateIfMissing(true)
.EnableStatistics();
.EnableStatistics()
.SetMergeOperator(new TestConcatMergeOperator());

// Using standard open
using (var db = RocksDb.Open(options, path))
{
db.Put("mergable", "a");
db.Merge("mergable", "b");
db.Merge("mergable", "c");
Assert.Equal("abc", db.Get("mergable"));
db.Remove("mergable");

db.Merge("mergable2", "a");
db.Merge("mergable2", "b");
db.Merge("mergable2", "c");
Assert.Equal("abc", db.Get("mergable2"));
db.Remove("mergable2");

// With strings
string value = db.Get("key");
db.Put("key", "value");
Expand Down
42 changes: 42 additions & 0 deletions tests/RocksDbSharpTest/TestConcatMergeOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Linq;
using RocksDbSharp;

namespace RocksDbSharpTest
{
public sealed class TestConcatMergeOperator : MergeOperatorBase
{
public TestConcatMergeOperator() : base("CONCAT") { }

protected override byte[] OnFullMerge(byte[] key, byte[] existingValue, byte[][] operands, out bool success)
{
var pos = 0;
var arr = new byte[(existingValue?.Length ?? 0) + operands.Sum(x => x.Length)];
if (existingValue != null)
{
Buffer.BlockCopy(existingValue, 0, arr, 0, existingValue.Length);
pos = existingValue.Length;
}
foreach (var operand in operands)
{
Buffer.BlockCopy(operand, 0, arr, pos, operand.Length);
pos += operand.Length;
}
success = true;
return arr;
}

protected override byte[] OnPartialMerge(byte[] key, byte[][] operands, out bool success)
{
var arr = new byte[operands.Sum(x => x.Length)];
var pos = 0;
foreach (var operand in operands)
{
Buffer.BlockCopy(operand, 0, arr, pos, operand.Length);
pos += operand.Length;
}
success = true;
return arr;
}
}
}