Skip to content

Commit

Permalink
std.parallelism support added
Browse files Browse the repository at this point in the history
bench test output:
```bash
Write benchmark:
  Normal:   42 μs and 4 hnsecs
  Parallel: 26 μs and 8 hnsecs
Read benchmark:
  Normal:   36 μs and 2 hnsecs
  Parallel: 11 μs
List benchmark:
  Normal:   31 μs and 1 hnsec
  Parallel: 17 μs and 8 hnsecs
1 modules passed unittests
```
  • Loading branch information
kassane committed Oct 17, 2024
1 parent 08f9766 commit 8f76be9
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 14 deletions.
32 changes: 31 additions & 1 deletion bindings/d/source/opendal/operator.d
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ module opendal.operator;
import std.string: toStringz;
import std.exception: enforce;
import std.conv: to;
import std.parallelism: task, TaskPool;

/// OpenDAL-C binding for D. (unsafe/@system)
private import opendal.opendal_c;

struct Operator
{
private opendal_operator* op;
private TaskPool taskPool;
private bool enabledParallelism;

this(string scheme, OperatorOptions options) @trusted
this(string scheme, OperatorOptions options, bool useParallel = false) @trusted
{
auto result = opendal_operator_new(scheme.toStringz, options.options);
enforce(result.op !is null, "Failed to create Operator");
enforce(result.error is null, "Error in Operator");
op = result.op;
enabledParallelism = useParallel;

if (enabledParallelism)
taskPool = new TaskPool();
}

void write(string path, ubyte[] data) @trusted
Expand All @@ -45,6 +52,27 @@ struct Operator
enforce(error is null, "Error writing data");
}

void writeParallel(string path, ubyte[] data) @safe
{
auto t = task!((Operator* op, string p, ubyte[] d) { op.write(p, d); })(&this, path, data);
taskPool.put(t);
t.yieldForce();
}

ubyte[] readParallel(string path) @trusted
{
auto t = task!((Operator* op, string p) { return op.read(p); })(&this, path);
taskPool.put(t);
return t.yieldForce();
}

Entry[] listParallel(string path) @trusted
{
auto t = task!((Operator* op, string p) { return op.list(p); })(&this, path);
taskPool.put(t);
return t.yieldForce();
}

ubyte[] read(string path) @trusted
{
auto result = opendal_operator_read(op, path.toStringz);
Expand Down Expand Up @@ -112,6 +140,8 @@ struct Operator
{
if (op !is null)
opendal_operator_free(op);
if (enabledParallelism)
taskPool.stop();
}
}

Expand Down
92 changes: 79 additions & 13 deletions bindings/d/source/opendal/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,15 @@

module opendal;

version (D_BetterC)
{
version (LDC)
{
pragma(LDC_no_moduleinfo);
pragma(LDC_no_typeinfo);
}
}

public import opendal.operator;

version (unittest)
{
@("Test basic Operator creation")
unittest
@safe unittest
{
/* Initialize a operator for "memory" backend, with no options */
auto options = new OperatorOptions();
OperatorOptions options = new OperatorOptions();
Operator op = Operator("memory", options);

/* Prepare some data to be written */
Expand All @@ -48,10 +39,85 @@ version (unittest)
/* We can read it out, make sure the data is the same */
auto read_bytes = op.read("/testpath");
assert(read_bytes.length == 24);
assert(cast(string)read_bytes.idup == data);
}

/* Lets print it out */
@("Benchmark parallel and normal functions")
@safe unittest
{
import std.exception: assertNotThrown;
import std.file: tempDir;
import std.path: buildPath;
import std.datetime.stopwatch: StopWatch;
import std.stdio: writeln;

writeln(cast(string)read_bytes.idup);
auto options = new OperatorOptions();
options.set("root", tempDir);
auto op = Operator("fs", options, true);

auto testPath = buildPath(tempDir, "benchmark_test.txt");
auto testData = cast(ubyte[])"Benchmarking OpenDAL async and normal functions".dup;

// Benchmark write operations
StopWatch sw;

sw.start();
assertNotThrown(op.write(testPath, testData));
sw.stop();
auto normalWriteTime = sw.peek();

sw.reset();
sw.start();
assertNotThrown(op.writeParallel(testPath, testData));
sw.stop();
auto parallelWriteTime = sw.peek();

// Benchmark read operations
sw.reset();
sw.start();
auto normalReadData = op.read(testPath);
sw.stop();
auto normalReadTime = sw.peek();

sw.reset();
sw.start();
auto parallelReadData = op.readParallel(testPath);
sw.stop();
auto parallelReadTime = sw.peek();

// Benchmark list operations
sw.reset();
sw.start();
op.list(tempDir);
sw.stop();
auto normalListTime = sw.peek();

sw.reset();
sw.start();
op.listParallel(tempDir);
sw.stop();
auto parallelListTime = sw.peek();

// Print benchmark results
writeln("Write benchmark:");
writeln(" Normal: ", normalWriteTime);
writeln(" Parallel: ", parallelWriteTime);

writeln("Read benchmark:");
writeln(" Normal: ", normalReadTime);
writeln(" Parallel: ", parallelReadTime);

writeln("List benchmark:");
writeln(" Normal: ", normalListTime);
writeln(" Parallel: ", parallelListTime);

// Verify data integrity
assert(normalReadData == testData);
assert(parallelReadData == testData);

// Clean up
op.removeObject(testPath);
assert(!op.exists(testPath));
}

}

0 comments on commit 8f76be9

Please sign in to comment.