From 8f76be976726a02d84106635a5e9addc577d636f Mon Sep 17 00:00:00 2001 From: Matheus Catarino Date: Thu, 17 Oct 2024 15:09:40 -0300 Subject: [PATCH] `std.parallelism` support added MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bench test output: ```bash Write benchmark: Normal: 42 μs and 4 hnsecs Parallel: 26 μs and 8 hnsecs Read benchmark: Normal: 36 μs and 2 hnsecs Parallel: 11 μs List benchmark: Normal: 31 μs and 1 hnsec Parallel: 17 μs and 8 hnsecs 1 modules passed unittests ``` --- bindings/d/source/opendal/operator.d | 32 +++++++++- bindings/d/source/opendal/package.d | 92 ++++++++++++++++++++++++---- 2 files changed, 110 insertions(+), 14 deletions(-) diff --git a/bindings/d/source/opendal/operator.d b/bindings/d/source/opendal/operator.d index a62a6755d82..b994c82d760 100644 --- a/bindings/d/source/opendal/operator.d +++ b/bindings/d/source/opendal/operator.d @@ -22,6 +22,7 @@ module opendal.operator; import std.string: toStringz; import std.exception: enforce; import std.conv: to; +import std.parallelism: task, TaskPool; /// OpenDAL-C binding for D. (unsafe/@system) private import opendal.opendal_c; @@ -29,13 +30,19 @@ private import opendal.opendal_c; struct Operator { private opendal_operator* op; + private TaskPool taskPool; + private bool enabledParallelism; - this(string scheme, OperatorOptions options) @trusted + this(string scheme, OperatorOptions options, bool useParallel = false) @trusted { auto result = opendal_operator_new(scheme.toStringz, options.options); enforce(result.op !is null, "Failed to create Operator"); enforce(result.error is null, "Error in Operator"); op = result.op; + enabledParallelism = useParallel; + + if (enabledParallelism) + taskPool = new TaskPool(); } void write(string path, ubyte[] data) @trusted @@ -45,6 +52,27 @@ struct Operator enforce(error is null, "Error writing data"); } + void writeParallel(string path, ubyte[] data) @safe + { + auto t = task!((Operator* op, string p, ubyte[] d) { op.write(p, d); })(&this, path, data); + taskPool.put(t); + t.yieldForce(); + } + + ubyte[] readParallel(string path) @trusted + { + auto t = task!((Operator* op, string p) { return op.read(p); })(&this, path); + taskPool.put(t); + return t.yieldForce(); + } + + Entry[] listParallel(string path) @trusted + { + auto t = task!((Operator* op, string p) { return op.list(p); })(&this, path); + taskPool.put(t); + return t.yieldForce(); + } + ubyte[] read(string path) @trusted { auto result = opendal_operator_read(op, path.toStringz); @@ -112,6 +140,8 @@ struct Operator { if (op !is null) opendal_operator_free(op); + if (enabledParallelism) + taskPool.stop(); } } diff --git a/bindings/d/source/opendal/package.d b/bindings/d/source/opendal/package.d index 6a44e0e6c9c..18a2abf331d 100644 --- a/bindings/d/source/opendal/package.d +++ b/bindings/d/source/opendal/package.d @@ -19,24 +19,15 @@ module opendal; -version (D_BetterC) -{ - version (LDC) - { - pragma(LDC_no_moduleinfo); - pragma(LDC_no_typeinfo); - } -} - public import opendal.operator; version (unittest) { @("Test basic Operator creation") - unittest + @safe unittest { /* Initialize a operator for "memory" backend, with no options */ - auto options = new OperatorOptions(); + OperatorOptions options = new OperatorOptions(); Operator op = Operator("memory", options); /* Prepare some data to be written */ @@ -48,10 +39,85 @@ version (unittest) /* We can read it out, make sure the data is the same */ auto read_bytes = op.read("/testpath"); assert(read_bytes.length == 24); + assert(cast(string)read_bytes.idup == data); + } - /* Lets print it out */ + @("Benchmark parallel and normal functions") + @safe unittest + { + import std.exception: assertNotThrown; + import std.file: tempDir; + import std.path: buildPath; + import std.datetime.stopwatch: StopWatch; import std.stdio: writeln; - writeln(cast(string)read_bytes.idup); + auto options = new OperatorOptions(); + options.set("root", tempDir); + auto op = Operator("fs", options, true); + + auto testPath = buildPath(tempDir, "benchmark_test.txt"); + auto testData = cast(ubyte[])"Benchmarking OpenDAL async and normal functions".dup; + + // Benchmark write operations + StopWatch sw; + + sw.start(); + assertNotThrown(op.write(testPath, testData)); + sw.stop(); + auto normalWriteTime = sw.peek(); + + sw.reset(); + sw.start(); + assertNotThrown(op.writeParallel(testPath, testData)); + sw.stop(); + auto parallelWriteTime = sw.peek(); + + // Benchmark read operations + sw.reset(); + sw.start(); + auto normalReadData = op.read(testPath); + sw.stop(); + auto normalReadTime = sw.peek(); + + sw.reset(); + sw.start(); + auto parallelReadData = op.readParallel(testPath); + sw.stop(); + auto parallelReadTime = sw.peek(); + + // Benchmark list operations + sw.reset(); + sw.start(); + op.list(tempDir); + sw.stop(); + auto normalListTime = sw.peek(); + + sw.reset(); + sw.start(); + op.listParallel(tempDir); + sw.stop(); + auto parallelListTime = sw.peek(); + + // Print benchmark results + writeln("Write benchmark:"); + writeln(" Normal: ", normalWriteTime); + writeln(" Parallel: ", parallelWriteTime); + + writeln("Read benchmark:"); + writeln(" Normal: ", normalReadTime); + writeln(" Parallel: ", parallelReadTime); + + writeln("List benchmark:"); + writeln(" Normal: ", normalListTime); + writeln(" Parallel: ", parallelListTime); + + // Verify data integrity + assert(normalReadData == testData); + assert(parallelReadData == testData); + + // Clean up + op.removeObject(testPath); + assert(!op.exists(testPath)); } + }