diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..d3d9d9b --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,19 @@ +name: lint + +on: [pull_request] + +jobs: + flake8: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - uses: grantmcconnaughey/lintly-flake8-github-action@v1.0 + with: + # The GitHub API token to create reviews with + token: ${{ secrets.GITHUB_TOKEN }} + # Fail if "new" violations detected or "any", default "new" + failIf: new + # Additional arguments to pass to flake8, default "." (current directory) + args: "" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 70d1329..8fef6f3 100644 --- a/.gitignore +++ b/.gitignore @@ -150,4 +150,7 @@ cython_debug/ *.pdf # Profiler results -profiler/*.txt \ No newline at end of file +profiler/*.txt + +# Dask distributed +dask-worker-space/ \ No newline at end of file diff --git a/README.md b/README.md index f9ed035..016bef1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Dask Accelerated [![test](https://github.com/teratide/dask-accelerated/actions/workflows/test.yml/badge.svg)](https://github.com/teratide/dask-accelerated/actions/workflows/test.yml) +[![lint](https://github.com/teratide/dask-accelerated/actions/workflows/lint.yml/badge.svg)](https://github.com/teratide/dask-accelerated/actions/workflows/lint.yml) An accelerated version of Dask which substitutes operators in the Dask task graph with an accelerated version. This new operator can do it's evaluation using native libraries or by offloading the computation to an FPGA accelerator. diff --git a/benchmark/benchmarks.py b/benchmark/benchmarks.py index 93d534d..6effcba 100644 --- a/benchmark/benchmarks.py +++ b/benchmark/benchmarks.py @@ -124,6 +124,3 @@ def benchmark_tidre_in_size(in_sizes, batch_size, batch_aggregate, repeats): } return data, name - - - diff --git a/benchmark/helpers.py b/benchmark/helpers.py index 671c22c..831a130 100644 --- a/benchmark/helpers.py +++ b/benchmark/helpers.py @@ -2,7 +2,17 @@ from dask_accelerated import helpers -def run_repeats(in_size, batch_size, batch_aggregate, repeats, key, vanilla_filter, re2_filter, tidre_filter=None, tidre_filter_unaligned=None): +def run_repeats( + in_size, + batch_size, + batch_aggregate, + repeats, + key, + vanilla_filter, + re2_filter, + tidre_filter=None, + tidre_filter_unaligned=None +): # Single run to mitigate caching effects (res, dur) = helpers.run_vanilla(in_size, batch_size, batch_aggregate) diff --git a/benchmark/main.py b/benchmark/main.py index a722b6d..fecdb2b 100644 --- a/benchmark/main.py +++ b/benchmark/main.py @@ -13,6 +13,7 @@ args = parser.parse_args() + def benchmark_re2(in_sizes, batch_aggregates, repeats): # Constants when varying a single parameter @@ -28,10 +29,20 @@ def benchmark_re2(in_sizes, batch_aggregates, repeats): data = {} - (benchmark_data, benchmark_name) = bench.benchmark_re2_in_size(in_sizes, constant_batch_size_in_benchmark, constant_batch_aggregate, repeats) + (benchmark_data, benchmark_name) = bench.benchmark_re2_in_size( + in_sizes, + constant_batch_size_in_benchmark, + constant_batch_aggregate, + repeats + ) data[benchmark_name] = benchmark_data - (benchmark_data, benchmark_name) = bench.benchmark_re2_batch_size(constant_in_size, constant_batch_size_batch_benchmark, batch_aggregates, repeats) + (benchmark_data, benchmark_name) = bench.benchmark_re2_batch_size( + constant_in_size, + constant_batch_size_batch_benchmark, + batch_aggregates, + repeats + ) data[benchmark_name] = benchmark_data benchmark_helpers.print_and_store_with_or_without_tidre(data, False) @@ -52,10 +63,20 @@ def benchmark_tidre(in_sizes, batch_aggregates, repeats): data = {} - (benchmark_data, benchmark_name) = bench.benchmark_tidre_in_size(in_sizes, constant_batch_size_in_benchmark, constant_batch_aggregate, repeats) + (benchmark_data, benchmark_name) = bench.benchmark_tidre_in_size( + in_sizes, + constant_batch_size_in_benchmark, + constant_batch_aggregate, + repeats + ) data[benchmark_name] = benchmark_data - (benchmark_data, benchmark_name) = bench.benchmark_tidre_batch_size(constant_in_size, constant_batch_size_batch_benchmark, batch_aggregates, repeats) + (benchmark_data, benchmark_name) = bench.benchmark_tidre_batch_size( + constant_in_size, + constant_batch_size_batch_benchmark, + batch_aggregates, + repeats + ) data[benchmark_name] = benchmark_data benchmark_helpers.print_and_store_with_or_without_tidre(data, True) @@ -75,4 +96,3 @@ def benchmark_tidre(in_sizes, batch_aggregates, repeats): end = time.time() print("Ran all benchmarks in ", (end - start) / 60, " minutes") - diff --git a/benchmark/pickler.py b/benchmark/pickler.py index 5305868..d6fb4d8 100644 --- a/benchmark/pickler.py +++ b/benchmark/pickler.py @@ -21,4 +21,4 @@ def load_from_notebooks(): with open(data_root + 'data.pickle', 'rb') as f: data = pickle.load(f) - return data \ No newline at end of file + return data diff --git a/dask_accelerated/helpers.py b/dask_accelerated/helpers.py index 69ad7e0..9fdc486 100644 --- a/dask_accelerated/helpers.py +++ b/dask_accelerated/helpers.py @@ -110,7 +110,9 @@ def get_lazy_result(in_size, batch_size, split_row_groups): parquet_engine = "pyarrow" # Valid engines: ['fastparquet', 'pyarrow', 'pyarrow-dataset', 'pyarrow-legacy'] file_root = "../data_generator/diving/data-" file_ext = ".parquet" - regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+[dD][iI][vV][iI][nN][gG][ \t\n]+([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*' + regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+' \ + '[dD][iI][vV][iI][nN][gG][ \t\n]+' \ + '([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*' # Load the dataframe columns = ["value", "string"] @@ -151,7 +153,11 @@ def run_and_record_durations(dsk, result, substitute_operator): filter_durations = np.array(substitute_operator.durations) durations = construct_durations(total_duration_in_seconds, filter_durations) - print("Computed ", res, " in ", total_duration_in_seconds, " seconds\tfilter: ", durations['filter']['total'], " seconds") + print( + "Computed ", res, + " in ", total_duration_in_seconds, " seconds", + "\tfilter: ", durations['filter']['total'], " seconds" + ) return res, durations @@ -191,7 +197,9 @@ def generate_datasets_if_needed(sizes, chunksize=1e6): print("Missing datasets found, these will be generated") match_percentage = 0.05 data_length = 100 - regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+[dD][iI][vV][iI][nN][gG][ \t\n]+([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*' + regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+' \ + '[dD][iI][vV][iI][nN][gG][ \t\n]+' \ + '([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*' parquet_chunksize = chunksize parquet_compression = 'none' diff --git a/dask_accelerated/operators.py b/dask_accelerated/operators.py index 5bfb2ed..83f3536 100644 --- a/dask_accelerated/operators.py +++ b/dask_accelerated/operators.py @@ -150,9 +150,6 @@ def custom_tidre_unaligned(self, obj, accessor, attr, args, kwargs): # The number of records in the current batch number_of_records = obj.size - # The regular expression to be matched - regex = args[0] - # Add some padding to the pandas series, which will be removed from the buffers later obj_with_padding = pandas.concat([pandas.Series(["a"]), obj]) arr = pyarrow.Array.from_pandas(obj_with_padding) diff --git a/dask_accelerated/optimization.py b/dask_accelerated/optimization.py index bf4c1be..ee8abbb 100644 --- a/dask_accelerated/optimization.py +++ b/dask_accelerated/optimization.py @@ -1,6 +1,7 @@ import re from dask.optimization import SubgraphCallable + # Unwrap the corresponding subgraph_callable in the task graph in order to insert a custom function def compute_substitute(dsk, key, custom): str_match = dsk[(key, 0)] @@ -16,6 +17,7 @@ def compute_substitute(dsk, key, custom): return SubgraphCallable(new_dsk, call.outkey, call.inkeys, "regex_callable") + # Substitute all string match operators in the graph with the custom re2 operator def optimize_graph_re2(graph, substitute_function): @@ -27,7 +29,7 @@ def optimize_graph_re2(graph, substitute_function): # This key is used to target one of the operators in the task graph # from which the regex_callable will be constructed for key in graph.keys(): - if re.match(regex, key[0]) != None: + if re.match(regex, key[0]) is not None: key = key[0] # The keys are tuples and the operator name is the first value break @@ -39,9 +41,9 @@ def optimize_graph_re2(graph, substitute_function): # Substitute the regex_callable if the operator name matches the str-match pattern for k in dsk: - if re.match(regex, k[0]) != None: + if re.match(regex, k[0]) is not None: target_op = list(dsk[k]) target_op[0] = regex_callable dsk[k] = tuple(target_op) - return dsk \ No newline at end of file + return dsk diff --git a/dask_accelerated_worker/__init__.py b/dask_accelerated_worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dask_accelerated_worker/accelerated_worker.py b/dask_accelerated_worker/accelerated_worker.py new file mode 100644 index 0000000..86d226f --- /dev/null +++ b/dask_accelerated_worker/accelerated_worker.py @@ -0,0 +1,111 @@ +import logging +import re +import pickle +from dask.distributed import Worker +from dask.distributed import worker +from dask.optimization import SubgraphCallable +from dask_accelerated.operators import CustomFilter + +logger = logging.getLogger(__name__) + + +# Create an RE2 accelerated worker class based on the original worker class +class RE2Worker(Worker): + + def add_task( + self, + key, + function=None, + args=None, + kwargs=None, + task=worker.no_value, + who_has=None, + nbytes=None, + priority=None, + duration=None, + resource_restrictions=None, + actor=False, + **kwargs2, + ): + regex = re.compile('.*str-match.*') + if re.match(regex, key) is not None: + # This task matches the operation we want to perform on fpga + func = pickle.loads(function) + + substitute_op = CustomFilter().custom_re2 + + dsk = func.dsk + vals = dsk[func.outkey] + vals_args = vals[3] + new_vals_args = (vals_args[0], [['_func', substitute_op], vals_args[1][1]]) + new_vals = (vals[0], vals[1], vals[2], new_vals_args) + dsk[func.outkey] = new_vals + + new_func = SubgraphCallable(dsk, func.outkey, func.inkeys, "regex_callable") + function = pickle.dumps(new_func) + + super().add_task( + key, + function, + args, + kwargs, + task, + who_has, + nbytes, + priority, + duration, + resource_restrictions, + actor, + **kwargs2, + ) + + +# Create a tidre accelerated worker class based on the original worker class +class TidreWorker(Worker): + + def add_task( + self, + key, + function=None, + args=None, + kwargs=None, + task=worker.no_value, + who_has=None, + nbytes=None, + priority=None, + duration=None, + resource_restrictions=None, + actor=False, + **kwargs2, + ): + regex = re.compile('.*str-match.*') + if re.match(regex, key) is not None: + # This task matches the operation we want to perform on fpga + func = pickle.loads(function) + + substitute_op = CustomFilter().custom_tidre + + dsk = func.dsk + vals = dsk[func.outkey] + vals_args = vals[3] + new_vals_args = (vals_args[0], [['_func', substitute_op], vals_args[1][1]]) + new_vals = (vals[0], vals[1], vals[2], new_vals_args) + dsk[func.outkey] = new_vals + + new_func = SubgraphCallable(dsk, func.outkey, func.inkeys, "regex_callable") + function = pickle.dumps(new_func) + + super().add_task( + key, + function, + args, + kwargs, + task, + who_has, + nbytes, + priority, + duration, + resource_restrictions, + actor, + **kwargs2, + ) diff --git a/dask_accelerated_worker/benchmarks.py b/dask_accelerated_worker/benchmarks.py new file mode 100644 index 0000000..e843b83 --- /dev/null +++ b/dask_accelerated_worker/benchmarks.py @@ -0,0 +1,118 @@ +from dask_accelerated import helpers +import time + + +def warm_workers(client, scheduler, benchmark_config): + + print('Warming workers... ', end='') + + for in_size in benchmark_config['in_sizes']: + + lazy_result = helpers.get_lazy_result( + in_size, + benchmark_config['batch_size'], + in_size + ) + + graph = lazy_result.__dask_graph__() + + # Scheduler does round robin + # so we can run this for each worker in the pool + for worker in scheduler.workers: + # Dry run + _ = client.get(graph, (lazy_result.__dask_layers__()[0], 0)) + + print('done') + + +def run_all_benchmarks(client, scheduler, data, benchmark_config): + + data_in_size = run_in_benchmark(client, benchmark_config) + data_batch_size = run_batch_benchmark(client, benchmark_config) + + # Count the number of accelerated workers + accelerated_workers = 0 + for worker in scheduler.workers: + if str(scheduler.workers[worker].name).split('-')[0] == 'accelerated': + accelerated_workers += 1 + + data[str(accelerated_workers)] = { + 'in_size': data_in_size, + 'batch_size': data_batch_size + } + + return data + + +def run_in_benchmark(client, benchmark_config): + + data_in_size = {} + + for in_size in benchmark_config['in_sizes']: + + lazy_result = helpers.get_lazy_result( + in_size, + benchmark_config['batch_size'], + benchmark_config['const_batch_aggregate'] + ) + + graph = lazy_result.__dask_graph__() + + # Dry run + # res = client.get(graph, (lazy_result.__dask_layers__()[0], 0)) + + data_in_size[in_size] = 0 + + for i in range(benchmark_config['repeats']): + start = time.time() + res = client.get(graph, (lazy_result.__dask_layers__()[0], 0)) + end = time.time() + + duration = end - start + data_in_size[in_size] += duration + print( + 'In: ', in_size, + '\tBatch: ', benchmark_config['batch_size'] * benchmark_config['const_batch_aggregate'], + '\tComputed ', res, ' in ', duration, ' seconds' + ) + + data_in_size[in_size] = data_in_size[in_size] / benchmark_config['repeats'] + + return data_in_size + + +def run_batch_benchmark(client, benchmark_config): + + data_batch_size = {} + + for batch_aggregate in benchmark_config['batch_aggregates']: + + lazy_result = helpers.get_lazy_result( + benchmark_config['const_in_size'], + benchmark_config['batch_size'], + batch_aggregate + ) + + graph = lazy_result.__dask_graph__() + + # Dry run + # res = client.get(graph, (lazy_result.__dask_layers__()[0], 0)) + + data_batch_size[batch_aggregate] = 0 + + for i in range(benchmark_config['repeats']): + start = time.time() + res = client.get(graph, (lazy_result.__dask_layers__()[0], 0)) + end = time.time() + + duration = end - start + data_batch_size[batch_aggregate] += duration + print( + 'In: ', benchmark_config['const_in_size'], + '\tBatch: ', benchmark_config['batch_size'] * batch_aggregate, + 'Computed ', res, ' in ', duration, ' seconds' + ) + + data_batch_size[batch_aggregate] = data_batch_size[batch_aggregate] / benchmark_config['repeats'] + + return data_batch_size diff --git a/dask_accelerated_worker/helpers.py b/dask_accelerated_worker/helpers.py new file mode 100644 index 0000000..ae4448d --- /dev/null +++ b/dask_accelerated_worker/helpers.py @@ -0,0 +1,79 @@ +from dask.distributed import Scheduler +from tornado.ioloop import IOLoop +from datetime import datetime +import asyncio +import pickle + + +def get_scheduler(): + kwargs = { + 'preload': (), + 'preload_argv': (), + 'interface': None, + 'protocol': None, + 'scheduler_file': '', + 'idle_timeout': None + } + + loop = IOLoop.current() + sec = {} + host = '' + port = 8786 + dashboard = True + dashboard_address = 8787 + dashboard_prefix = '' + + scheduler = Scheduler( + loop=loop, + security=sec, + host=host, + port=port, + dashboard=dashboard, + dashboard_address=dashboard_address, + http_prefix=dashboard_prefix, + **kwargs + ) + + return scheduler, loop + + +def run_scheduler(scheduler, loop): + + async def run(): + await scheduler + await scheduler.finished() + + loop.run_sync(run) + + +def remove_non_accelerated_workers(scheduler): + + # New event loop to await async remove worker method + loop = asyncio.new_event_loop() + + # TODO: fix this + # Somehow this does not always work on the first try + # A quick but messy fix is to run it more than once to + # ensure all non-accelerated workers get removed + for i in range(3): + workers = scheduler.workers + for worker in workers: + # All accelerated workers are called 'accelerated-[timestamp]' + if str(workers[worker].name).split('-')[0] != 'accelerated': + loop.run_until_complete( + scheduler.remove_worker(address=worker) + ) + + loop.close() + + +def save_data(data): + + # Add timestamp to data + timestamp = datetime.now().strftime("%d-%b-%Y_%H:%M:%S") + data['timestamp'] = timestamp + + # Save data to disk + data_root = '../notebooks/' + with open(data_root + 'data-workers.pickle', 'wb') as f: + pickle.dump(data, f, pickle.HIGHEST_PROTOCOL) diff --git a/dask_accelerated_worker/main.py b/dask_accelerated_worker/main.py new file mode 100644 index 0000000..30e9eee --- /dev/null +++ b/dask_accelerated_worker/main.py @@ -0,0 +1,66 @@ +from dask.distributed import Client +from dask_accelerated import helpers +from dask_accelerated_worker import helpers as worker_helpers +import benchmarks +import time +from threading import Thread +import logging + +logger = logging.getLogger(__name__) + + +benchmark_config = { + 'const_in_size': 4096e3, + 'in_sizes': [256e3, 512e3, 1024e3, 2048e3, 4096e3], + 'batch_size': 1e3, + 'const_batch_aggregate': 1e3, + 'batch_aggregates': [64, 128, 256, 512, 1024, 2048, 4096, 8192], + 'repeats': 6 +} + + +def main(): + + # Start a dask scheduler + (scheduler, scheduler_loop) = worker_helpers.get_scheduler() + thread = Thread(target=worker_helpers.run_scheduler, args=(scheduler, scheduler_loop,)) + thread.start() + + # Wait 1 second to ensure the scheduler is running + time.sleep(1) + + # Start a client and connect it to the scheduler + client = Client(scheduler.address) + + print('Dashboard available at', client.dashboard_link) + print('Scheduler address: ', scheduler.address) + + # input("Press Enter to remove non accelerated workers...") + # worker_helpers.remove_non_accelerated_workers(scheduler) + + input("Press Enter to perform benchmarks...") + + # Make sure the desired dataset exists + helpers.generate_datasets_if_needed(benchmark_config['in_sizes'], benchmark_config['batch_size']) + + data = {} + + # Keep running the benchmark until the user quits the client + while True: + + benchmarks.warm_workers(client, scheduler, benchmark_config) + + data = benchmarks.run_all_benchmarks(client, scheduler, data, benchmark_config) + + user_input = input("Press Enter to run again or send 'q' to close the client...") + if user_input == 'q': + break + + worker_helpers.save_data(data) + + # Close the client + client.close() + + +if __name__ == '__main__': + main() diff --git a/dask_accelerated_worker/start_worker.py b/dask_accelerated_worker/start_worker.py new file mode 100644 index 0000000..40bd742 --- /dev/null +++ b/dask_accelerated_worker/start_worker.py @@ -0,0 +1,116 @@ +from dask_accelerated_worker.accelerated_worker import RE2Worker, TidreWorker +from dask.distributed import Worker +from tornado.ioloop import IOLoop +from dask_accelerated_worker.utils import install_signal_handlers +import asyncio +import sys +import signal +import logging +import argparse +import time +logger = logging.getLogger(__name__) + + +parser = argparse.ArgumentParser(description='Dask Accelerated Worker.') +parser.add_argument('scheduler_address', metavar='S', type=str, + help='string containing the ip and port of the scheduler. Example: tcp://127.0.0.1:37983') +parser.add_argument('type', metavar='T', type=str, + help='string containing the type of the worker. Can be `tidre`, `re2`, or `vanilla`.') + +args = parser.parse_args() + + +def main(): + + scheduler_address = args.scheduler_address + + if args.type == 'tidre': + print('Starting Tidre worker') + t = TidreWorker + worker_name = 'accelerated-' + str(time.time()) + elif args.type == 're2': + print('Starting RE2 worker') + t = RE2Worker + worker_name = 'accelerated-' + str(time.time()) + elif args.type == 'vanilla': + print('Starting vanilla worker') + t = Worker + worker_name = 'vanilla-' + str(time.time()) + else: + raise Exception("Worker type not valid.") + + # Start a new worker based on the AcceleratedWorker class + # This worker automatically connects to the scheduler and gets added to the worker pool + kwargs = { + 'preload': (), + 'memory_limit': '0', + 'preload_argv': (), + 'interface': None, + 'protocol': None, + 'reconnect': True, + 'local_directory': None, + 'death_timeout': None, + 'lifetime': None, + 'lifetime_stagger': '0 seconds', + 'lifetime_restart': False + } + + loop = IOLoop.current() + + async_loop = asyncio.get_event_loop() + worker = async_loop.run_until_complete( + t( + scheduler_address, + scheduler_file=None, + nthreads=1, + loop=loop, + resources=None, + security={}, + contact_address=None, + host=None, + port=None, + dashboard=True, + dashboard_address=':0', + name=worker_name, + **kwargs + ) + ) + + nannies = [worker] + nanny = True + + async def close_all(): + # Unregister all workers from scheduler + if nanny: + await asyncio.gather(*[n.close(timeout=2) for n in nannies]) + + signal_fired = False + + def on_signal(signum): + nonlocal signal_fired + signal_fired = True + if signum != signal.SIGINT: + logger.info("Exiting on signal %d", signum) + return asyncio.ensure_future(close_all()) + + async def run(): + await asyncio.gather(*nannies) + await asyncio.gather(*[n.finished() for n in nannies]) + + install_signal_handlers(loop, cleanup=on_signal) + + try: + loop.run_sync(run) + except TimeoutError: + # We already log the exception in nanny / worker. Don't do it again. + if not signal_fired: + logger.info("Timed out starting worker") + sys.exit(1) + except KeyboardInterrupt: + pass + finally: + logger.info("End worker") + + +if __name__ == '__main__': + main() diff --git a/dask_accelerated_worker/utils.py b/dask_accelerated_worker/utils.py new file mode 100644 index 0000000..a0191ed --- /dev/null +++ b/dask_accelerated_worker/utils.py @@ -0,0 +1,30 @@ +from tornado.ioloop import IOLoop + + +def install_signal_handlers(loop=None, cleanup=None): + """ + Install global signal handlers to halt the Tornado IOLoop in case of + a SIGINT or SIGTERM. *cleanup* is an optional callback called, + before the loop stops, with a single signal number argument. + """ + import signal + + loop = loop or IOLoop.current() + + old_handlers = {} + + def handle_signal(sig, frame): + async def cleanup_and_stop(): + try: + if cleanup is not None: + await cleanup(sig) + finally: + loop.stop() + + loop.add_callback_from_signal(cleanup_and_stop) + # Restore old signal handler to allow for a quicker exit + # if the user sends the signal again. + signal.signal(sig, old_handlers[sig]) + + for sig in [signal.SIGINT, signal.SIGTERM]: + old_handlers[sig] = signal.signal(sig, handle_signal) diff --git a/native/CMakeLists.txt b/native/CMakeLists.txt index 4f5db9c..b5d54b4 100644 --- a/native/CMakeLists.txt +++ b/native/CMakeLists.txt @@ -4,7 +4,7 @@ project(dask_native) set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_FLAGS -D_GLIBCXX_USE_CXX11_ABI=0) +set(CMAKE_CXX_FLAGS -D_GLIBCXX_USE_CXX11_ABI=1) set(CMAKE_POSITION_INDEPENDENT_CODE ON) include(FetchContent) diff --git a/notebooks/accelerated-worker.ipynb b/notebooks/accelerated-worker.ipynb new file mode 100644 index 0000000..041438f --- /dev/null +++ b/notebooks/accelerated-worker.ipynb @@ -0,0 +1,308 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client, Scheduler, Worker" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Close the previous scheduler and all associated workers if present\n", + "try:\n", + " client.shutdown()\n", + " client.close()\n", + "except:\n", + " print('Could not shut down old client, was there any to begin with?')\n", + "\n", + "# Start a client in local cluster mode and expose the underlying scheduler\n", + "client = Client()\n", + "scheduler = client.cluster.scheduler\n", + "print('Dashboard available at', client.dashboard_link)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "import logging\n", + "\n", + "no_value = \"--no-value-sentinel--\"\n", + "logger = logging.getLogger(__name__)\n", + "\n", + "# Create an accelerated worker class based on the original worker class\n", + "class AcceleratedWorker(Worker):\n", + " pass\n", + " # def add_task(\n", + " # self,\n", + " # key,\n", + " # function=None,\n", + " # args=None,\n", + " # kwargs=None,\n", + " # task=no_value,\n", + " # who_has=None,\n", + " # nbytes=None,\n", + " # priority=None,\n", + " # duration=None,\n", + " # resource_restrictions=None,\n", + " # actor=False,\n", + " # **kwargs2,\n", + " # ):\n", + " # logger.info('TTTTESTSETSTESTSTETS')\n", + " # super(AcceleratedWorker, self).add_task(\n", + " # key,\n", + " # function,\n", + " # args,\n", + " # kwargs,\n", + " # task,\n", + " # who_has,\n", + " # nbytes,\n", + " # priority,\n", + " # duration,\n", + " # resource_restrictions,\n", + " # actor,\n", + " # **kwargs2,\n", + " # )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "def install_signal_handlers(loop=None, cleanup=None):\n", + " \"\"\"\n", + " Install global signal handlers to halt the Tornado IOLoop in case of\n", + " a SIGINT or SIGTERM. *cleanup* is an optional callback called,\n", + " before the loop stops, with a single signal number argument.\n", + " \"\"\"\n", + " import signal\n", + "\n", + " loop = loop or IOLoop.current()\n", + "\n", + " old_handlers = {}\n", + "\n", + " def handle_signal(sig, frame):\n", + " async def cleanup_and_stop():\n", + " try:\n", + " if cleanup is not None:\n", + " await cleanup(sig)\n", + " finally:\n", + " loop.stop()\n", + "\n", + " loop.add_callback_from_signal(cleanup_and_stop)\n", + " # Restore old signal handler to allow for a quicker exit\n", + " # if the user sends the signal again.\n", + " signal.signal(sig, old_handlers[sig])\n", + "\n", + " for sig in [signal.SIGINT, signal.SIGTERM]:\n", + " old_handlers[sig] = signal.signal(sig, handle_signal)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Start a new worker based on the AcceleratedWorker class\n", + "# This worker automatically connects to the scheduler and gets added to the worker pool\n", + "# accelerated_worker = await Worker(\n", + "# scheduler.address,\n", + "# validate=True,\n", + "# nthreads=1,\n", + "# memory_limit=4e9,\n", + "# dashboard=True,\n", + "# name='accelerated'\n", + "# )\n", + "\n", + "from tornado.ioloop import IOLoop\n", + "import asyncio\n", + "import sys\n", + "import signal\n", + "\n", + "kwargs = {\n", + " 'preload': (),\n", + " 'memory_limit': '0',\n", + " 'preload_argv': (),\n", + " 'interface': None,\n", + " 'protocol': None,\n", + " 'reconnect': True,\n", + " 'local_directory': None,\n", + " 'death_timeout': None,\n", + " 'lifetime': None,\n", + " 'lifetime_stagger': '0 seconds',\n", + " 'lifetime_restart': False\n", + "}\n", + "\n", + "loop = IOLoop.current()\n", + "\n", + "accelerated_worker = await Worker(\n", + " scheduler.address,\n", + " scheduler_file=None,\n", + " nthreads=1,\n", + " loop=loop,\n", + " resources=None,\n", + " security={},\n", + " contact_address=None,\n", + " host=None,\n", + " port=None,\n", + " dashboard=True,\n", + " dashboard_address=':0',\n", + " name='accelerated',\n", + " **kwargs\n", + ")\n", + "\n", + "nannies = [accelerated_worker]\n", + "\n", + "nanny = True\n", + "\n", + "async def close_all():\n", + " # Unregister all workers from scheduler\n", + " if nanny:\n", + " await asyncio.gather(*[n.close(timeout=2) for n in nannies])\n", + "\n", + "signal_fired = False\n", + "\n", + "def on_signal(signum):\n", + " nonlocal signal_fired\n", + " signal_fired = True\n", + " if signum != signal.SIGINT:\n", + " logger.info(\"Exiting on signal %d\", signum)\n", + " return asyncio.ensure_future(close_all())\n", + "\n", + "async def run():\n", + " await asyncio.gather(*nannies)\n", + " await asyncio.gather(*[n.finished() for n in nannies])\n", + "\n", + "install_signal_handlers(loop, cleanup=on_signal)\n", + "\n", + "try:\n", + " loop.run_sync(run)\n", + "except TimeoutError:\n", + " # We already log the exception in nanny / worker. Don't do it again.\n", + " if not signal_fired:\n", + " logger.info(\"Timed out starting worker\")\n", + " sys.exit(1)\n", + "except KeyboardInterrupt:\n", + " pass\n", + "finally:\n", + " logger.info(\"End worker\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "workers = scheduler.workers\n", + "for worker in workers:\n", + " print(workers[worker].nanny)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Remove the non-accelerated workers\n", + "workers = scheduler.workers\n", + "for worker in workers:\n", + " if worker != accelerated_worker.address:\n", + " await scheduler.remove_worker(address=worker)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Define a simple function and\n", + "# submit a future on the client\n", + "\n", + "# Increment integer values by 1\n", + "def inc(x):\n", + " return x + 1\n", + "\n", + "# Y holds the future to the result\n", + "y = client.submit(inc, 1211)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Print the result of the future when it is ready\n", + "y.result()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/notebooks/plots-worker.ipynb b/notebooks/plots-worker.ipynb new file mode 100644 index 0000000..181beaa --- /dev/null +++ b/notebooks/plots-worker.ipynb @@ -0,0 +1,471 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "from shutil import copyfile\n", + "import pickle" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "with open('./data-workers.pickle', 'rb') as f:\n", + " data = pickle.load(f)\n", + "\n", + "print(data['timestamp'])\n", + "out_dir = './plots/workers/' + data['timestamp']\n", + "\n", + "# Create the directory if it does not yet exist\n", + "if not os.path.exists(out_dir):\n", + " os.makedirs(out_dir)\n", + "\n", + "copyfile('./data-workers.pickle', out_dir + '/data-workers.pickle')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "print(data)\n", + "\n", + "x_vals_in = ['256k', '512k', '1024k', '2048k', '4096k']\n", + "x_vals_batch = ['64k', '128k', '256k', '512k', '1024k', '2048k', '4096k', '8192k']\n", + "\n", + "m_cost_per_second = 0.4 / 60 / 60\n", + "f_cost_per_second = 1.65 / 60 / 60" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# RUNTIME IN SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "plt.plot(x_vals_in, list(data['0']['in_size'].values()), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_in, list(data['1']['in_size'].values()), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_in, list(data['2']['in_size'].values()), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_in, list(data['3']['in_size'].values()), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Number of records')\n", + "plt.ylabel('Total query runtime (seconds)')\n", + "\n", + "plt.xticks(x_vals_in)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Runtime in sizes benchmark - batch size 1M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_in_sizes.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# THROUGHPUT IN SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "in_sizes = [256e3, 512e3, 1024e3, 2048e3, 4096e3]\n", + "in_bytes = np.array([x * 100 * 1 for x in in_sizes])\n", + "\n", + "throughput = {\n", + " '0': np.divide(in_bytes, list(data['0']['in_size'].values())),\n", + " '1': np.divide(in_bytes, list(data['1']['in_size'].values()))\n", + " # '2': np.divide(in_bytes, list(data['2']['in_size'].values())),\n", + " # '3': np.divide(in_bytes, list(data['3']['in_size'].values()))\n", + "}\n", + "\n", + "plt.plot(x_vals_in, list(throughput['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_in, list(throughput['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_in, list(throughput['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_in, list(throughput['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Number of records')\n", + "plt.ylabel('Total query throughput (bytes/s)')\n", + "\n", + "plt.xticks(x_vals_in)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Throughput in sizes benchmark - batch size 1M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_in_sizes_throughput.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# SPEEDUP IN SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "speedup = {\n", + " '0': np.divide(list(data['0']['in_size'].values()), list(data['0']['in_size'].values())),\n", + " '1': np.divide(list(data['0']['in_size'].values()), list(data['1']['in_size'].values()))\n", + " # '2': np.divide(list(data['0']['in_size'].values()), list(data['2']['in_size'].values())),\n", + " # '3': np.divide(list(data['0']['in_size'].values()), list(data['3']['in_size'].values()))\n", + "}\n", + "\n", + "plt.plot(x_vals_in, list(speedup['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_in, list(speedup['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_in, list(speedup['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_in, list(speedup['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Number of records')\n", + "plt.ylabel('Speedup')\n", + "\n", + "plt.xticks(x_vals_in)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Speedup in sizes benchmark - batch size 1M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_in_sizes_speedup.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# COST IN SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "cost = {\n", + " '0': np.multiply(list(data['0']['in_size'].values()), (m_cost_per_second)),\n", + " '1': np.multiply(list(data['1']['in_size'].values()), (f_cost_per_second)),\n", + " # '2': np.multiply(list(data['2']['in_size'].values()), (m_cost_per_second + 2*f_cost_per_second)),\n", + " # '3': np.multiply(list(data['3']['in_size'].values()), (3*f_cost_per_second))\n", + "}\n", + "\n", + "plt.plot(x_vals_in, list(cost['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_in, list(cost['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_in, list(cost['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_in, list(cost['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Number of records')\n", + "plt.ylabel('Cost per query ($)')\n", + "\n", + "plt.xticks(x_vals_in)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Cost in sizes benchmark - batch size 1M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_in_sizes_cost.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# RUNTIME BATCH SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "plt.plot(x_vals_batch, list(data['0']['batch_size'].values()), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_batch, list(data['1']['batch_size'].values()), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_batch, list(data['2']['batch_size'].values()), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_batch, list(data['3']['batch_size'].values()), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Batch size')\n", + "plt.ylabel('Total query runtime (seconds)')\n", + "\n", + "plt.xticks(x_vals_batch)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Runtime batch sizes benchmark - in size 4M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_batch_sizes.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# THROUGHPUT BATCH SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "in_bytes = 100 * 1 * 4096e3\n", + "\n", + "throughput = {\n", + " '0': np.divide(in_bytes, list(data['0']['batch_size'].values())),\n", + " '1': np.divide(in_bytes, list(data['1']['batch_size'].values())),\n", + " # '2': np.divide(in_bytes, list(data['2']['batch_size'].values())),\n", + " # '3': np.divide(in_bytes, list(data['3']['batch_size'].values()))\n", + "}\n", + "\n", + "plt.plot(x_vals_batch, list(throughput['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_batch, list(throughput['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_batch, list(throughput['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_batch, list(throughput['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Batch size')\n", + "plt.ylabel('Total query throughput (bytes/s)')\n", + "\n", + "plt.xticks(x_vals_batch)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Throughput batch sizes benchmark - in size 4M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_batch_sizes_throughput.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# SPEEDUP BATCH SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "speedup = {\n", + " '0': np.divide(list(data['0']['batch_size'].values()), list(data['0']['batch_size'].values())),\n", + " '1': np.divide(list(data['0']['batch_size'].values()), list(data['1']['batch_size'].values())),\n", + " # '2': np.divide(list(data['0']['batch_size'].values()), list(data['2']['batch_size'].values())),\n", + " # '3': np.divide(list(data['0']['batch_size'].values()), list(data['3']['batch_size'].values()))\n", + "}\n", + "\n", + "plt.plot(x_vals_batch, list(speedup['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_batch, list(speedup['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_batch, list(speedup['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_batch, list(speedup['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Batch size')\n", + "plt.ylabel('Speedup')\n", + "\n", + "plt.xticks(x_vals_batch)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Speedup batch sizes benchmark - in size 4M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_batch_sizes_speedup.png')\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# COST BATCH SIZE\n", + "\n", + "fig = plt.figure(figsize=(9,7))\n", + "fig.patch.set_facecolor('white')\n", + "\n", + "cost = {\n", + " '0': np.multiply(list(data['0']['batch_size'].values()), (m_cost_per_second)),\n", + " '1': np.multiply(list(data['1']['batch_size'].values()), (f_cost_per_second)),\n", + " # '2': np.multiply(list(data['2']['batch_size'].values()), (m_cost_per_second + 2*f_cost_per_second)),\n", + " # '3': np.multiply(list(data['3']['batch_size'].values()), (3*f_cost_per_second))\n", + "}\n", + "\n", + "plt.plot(x_vals_batch, list(cost['0']), color='r', marker='x', label='vanilla worker', zorder=3)\n", + "plt.plot(x_vals_batch, list(cost['1']), color='g', marker='o', label='tidre worker', zorder=3)\n", + "# plt.plot(x_vals_batch, list(cost['2']), color='b', marker='>', label='2 accelerated workers', zorder=3)\n", + "# plt.plot(x_vals_batch, list(cost['3']), color='orange', marker='^', label='3 accelerated workers', zorder=3)\n", + "\n", + "# Add xticks on the middle of the group bars\n", + "plt.xlabel('Batch size')\n", + "plt.ylabel('Cost per query ($)')\n", + "\n", + "plt.xticks(x_vals_batch)\n", + "plt.xticks(rotation=-90)\n", + "\n", + "plt.title('Cost batch sizes benchmark - in size 4M')\n", + "\n", + "axes = plt.gca()\n", + "axes.grid(which='both', axis='y', linestyle='--')\n", + "\n", + "# plt.yscale('log')\n", + "\n", + "# Create legend & Show graphic\n", + "plt.legend()\n", + "\n", + "# Save fig as pdf\n", + "plt.savefig(out_dir + '/cluster_batch_sizes_cost.png')\n", + "\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/notebooks/plots.ipynb b/notebooks/plots.ipynb index db43d8d..1caaab9 100644 --- a/notebooks/plots.ipynb +++ b/notebooks/plots.ipynb @@ -10,7 +10,8 @@ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from shutil import copyfile\n", - "from benchmark import pickler" + "from benchmark import pickler\n", + "from tabulate import tabulate" ] }, { @@ -42,7 +43,7 @@ "print(\"Run using tidre: \", tidre)\n", "print(\"Data generated on\", timestamp)\n", "\n", - "# Determing output path for generated plots\n", + "# Determine output path for generated plots\n", "out_dir = './plots/'\n", "if tidre:\n", " out_dir += 'tidre/'\n", @@ -58,6 +59,87 @@ "copyfile('./data.pickle', out_dir + '/data.pickle')" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# All measurements are averaged over 10 runs\n", + "# Before each measurement, a single run is performed and discarded\n", + "# All records contain a string of 100 singly byte ascii characters\n", + "in_sizes = [1e3, 2e3, 4e3, 8e3, 16e3, 32e3, 64e3, 128e3, 256e3, 512e3, 1024e3, 2048e3, 4096e3]\n", + "in_bytes = np.array([x * 100 * 1 for x in in_sizes])\n", + "\n", + "# Varies the number of input records\n", + "# Batch size is kept constant at 1M records\n", + "in_size_runtime = {\n", + " 'in sizes (records)': list(data['in_size']['vanilla_filter'].keys()),\n", + " 'vanilla filter (s)': list(data['in_size']['vanilla_filter'].values()),\n", + " 're2 filter (s)': list(data['in_size']['re2_filter'].values()),\n", + " 'tidre filter (s)': list(data['in_size']['tidre_filter'].values()),\n", + " 'tidre filter unaligned (s)': list(data['in_size']['tidre_filter_unaligned'].values())\n", + "}\n", + "\n", + "in_size_throughput = {\n", + " 'in sizes (records)': list(data['in_size']['vanilla_filter'].keys()),\n", + " 'vanilla filter (bytes/s)': np.divide(in_bytes, list(data['in_size']['vanilla_filter'].values())),\n", + " 're2 filter (bytes/s)': np.divide(in_bytes, list(data['in_size']['re2_filter'].values())),\n", + " 'tidre filter (bytes/s)': np.divide(in_bytes, list(data['in_size']['tidre_filter'].values())),\n", + " 'tidre filter unaligned (bytes/s)': np.divide(in_bytes, list(data['in_size']['tidre_filter_unaligned'].values()))\n", + "}\n", + "\n", + "# Varies the number of records in a single record batch\n", + "# In size is kept constant at 4M records\n", + "batch_size_runtime = {\n", + " 'batch sizes (records)': list(data['batch_size']['vanilla_filter'].keys()),\n", + " 'vanilla filter (s)': list(data['batch_size']['vanilla_filter'].values()),\n", + " 're2 filter (s)': list(data['batch_size']['re2_filter'].values()),\n", + " 'tidre filter (s)': list(data['batch_size']['tidre_filter'].values()),\n", + " 'tidre filter unaligned (s)': list(data['batch_size']['tidre_filter_unaligned'].values())\n", + "}\n", + "\n", + "in_bytes = 4096e3 * 100 * 1\n", + "batch_size_throughput = {\n", + " 'batch sizes (records)': list(data['batch_size']['vanilla_filter'].keys()),\n", + " 'vanilla filter (bytes/s)': np.divide(in_bytes, list(data['batch_size']['vanilla_filter'].values())),\n", + " 're2 filter (bytes/s)': np.divide(in_bytes, list(data['batch_size']['re2_filter'].values())),\n", + " 'tidre filter (bytes/s)': np.divide(in_bytes, list(data['batch_size']['tidre_filter'].values())),\n", + " 'tidre filter unaligned (bytes/s)': np.divide(in_bytes, list(data['batch_size']['tidre_filter_unaligned'].values()))\n", + "}\n", + "\n", + "print('Dask in size benchmark runtime:')\n", + "print(tabulate(in_size_runtime, headers='keys'))\n", + "\n", + "print('\\nDask in size benchmark throughput:')\n", + "print(tabulate(in_size_throughput, headers='keys'))\n", + "\n", + "print('\\n\\nDask batch size runtime:')\n", + "print(tabulate(batch_size_runtime, headers='keys'))\n", + "\n", + "print('\\nDask batch size throughput:')\n", + "print(tabulate(batch_size_throughput, headers='keys'))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "speedup_tidre_re2 = data['in_size']['vanilla_filter']['4M'] / data['in_size']['tidre_filter']['4M']\n", + "\n", + "print(speedup_tidre_re2)" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/notebooks/scheduler-plugin.ipynb b/notebooks/scheduler-plugin.ipynb new file mode 100644 index 0000000..ac70b3b --- /dev/null +++ b/notebooks/scheduler-plugin.ipynb @@ -0,0 +1,213 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "import pickle\n", + "from dask.distributed import Client, Scheduler, SchedulerPlugin" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Close the previous scheduler and all associated workers if present\n", + "try:\n", + " client.shutdown()\n", + " client.close()\n", + "except:\n", + " print('Could not shut down old client, was there any to begin with?')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Start a client in local cluster mode\n", + "client = Client()\n", + "print('Dashboard available at', client.dashboard_link)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Scheduler plugin that accelerates the\n", + "# task graph using dask-accelerated\n", + "class SchedulerOptimizer(SchedulerPlugin):\n", + "\n", + " # Add the scheduler instance to the plugin as an attribute\n", + " # such that we can access the underlying task state\n", + " def __init__(self, scheduler):\n", + " self.scheduler = scheduler\n", + "\n", + " def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwargs):\n", + " for key in keys:\n", + " task_state = scheduler.tasks[key]\n", + " scheduler.logg(task_state)\n", + " rs_func = pickle.loads(task_state.run_spec['function'])\n", + " rs_arg = pickle.loads(task_state.run_spec['args'])\n", + " scheduler.logg(rs_func, rs_arg)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "scheduler = client.cluster.scheduler" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# The client dashboard is able to display the info logs\n", + "# from the scheduler. Unfortunately, the scheduler does not\n", + "# have a method to log data, so we monkey patch the scheduler class\n", + "import logging\n", + "logger = logging.getLogger(\"distributed.scheduler\")\n", + "\n", + "# The dashboard only exposes info logs, so\n", + "# logger.info is used instead of logger.debug\n", + "def log_method(self, *msgs):\n", + " logstring = ''\n", + " for msg in msgs:\n", + " logstring += str(msg) + '\\t'\n", + "\n", + " logger.info(logstring)\n", + "\n", + "# The monkey patched method\n", + "Scheduler.logg = log_method\n", + "\n", + "# TODO: remove\n", + "scheduler.logg('logger test')\n", + "scheduler.logg('logger test', 'multiple inputs')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Register the plugin with the scheduler\n", + "plugin = SchedulerOptimizer(scheduler)\n", + "scheduler.add_plugin(plugin)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Define a simple function and\n", + "# submit a future on the client\n", + "\n", + "# Increment integer values by 1\n", + "def inc(x):\n", + " return x + 1\n", + "\n", + "# Y holds the future to the result\n", + "y = client.submit(inc, 10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Print the result of the future when it is ready\n", + "y.result()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Restart the client in case something went wrong\n", + "# By default this doesnt run so the entire notebook\n", + "# can be executed without restarting the client\n", + "if False:\n", + " client.restart()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/notebooks/worker-plugin.ipynb b/notebooks/worker-plugin.ipynb new file mode 100644 index 0000000..a0e4fa9 --- /dev/null +++ b/notebooks/worker-plugin.ipynb @@ -0,0 +1,165 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client, WorkerPlugin" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Close the previous scheduler and all associated workers if present\n", + "try:\n", + " client.shutdown()\n", + " client.close()\n", + "except:\n", + " print('Could not shut down old client, was there any to begin with?')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# First start a scheduler and worker from cli:\n", + "# $ dask-scheduler\n", + "# $ dask-worker tcp://xxx.xx.xxx.xxx:pppp --nthreads 1 --memory-limit 0 --no-nanny\n", + "\n", + "# Start a client and connect it to the existing scheduler\n", + "client = Client('tcp://145.94.225.114:8786')\n", + "print('Dashboard available at', client.dashboard_link)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Worker plugin that accelerates the incoming part of the\n", + "# task graph using dask-accelerated\n", + "class WorkerOptimizer(WorkerPlugin):\n", + "\n", + " def setup(self, worker):\n", + " print('WorkerOptimizer plugin registered')\n", + "\n", + " def transition(self, key, start, finish, **kwargs):\n", + " print(key, '\\t', start, '\\t', finish)\n", + " print(kwargs, '\\n')\n", + "\n", + " def release_key(self, key, state, cause, reason, report):\n", + " print(key, '\\t', state, '\\t', cause, '\\t', reason, '\\t', report)\n", + "\n", + " def release_dep(self, dep, state, report):\n", + " print(dep, '\\t', state, '\\t', report)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Register the plugin with all current and future workers\n", + "client.register_worker_plugin(WorkerOptimizer)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Define a simple function and\n", + "# submit a future on the client\n", + "\n", + "# Increment integer values by 1\n", + "def inc(x):\n", + " return x + 1\n", + "\n", + "# Y holds the future to the result\n", + "y = client.submit(inc, 10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Print the result of the future when it is ready\n", + "y.result()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Restart the client in case something went wrong\n", + "# By default this doesnt run so the entire notebook\n", + "# can be executed without restarting the client\n", + "if False:\n", + " client.restart()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/profiler/main.py b/profiler/main.py index 7b8b23a..1cbb062 100644 --- a/profiler/main.py +++ b/profiler/main.py @@ -12,6 +12,7 @@ args = parser.parse_args() + def profile_and_save(func, in_size, batch_size, out_file): # Create a new profiler and enable it, @@ -45,5 +46,3 @@ def profile_and_save(func, in_size, batch_size, out_file): if args.tidre: profile_and_save(helpers.run_tidre, in_size, batch_size, 'tidre_prof.txt') - - diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..6421db6 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[flake8] +exclude = .git,*migrations*,native,data_generator +max-line-length = 119 \ No newline at end of file diff --git a/setup.py b/setup.py index 6273fe3..c5192f7 100644 --- a/setup.py +++ b/setup.py @@ -7,12 +7,17 @@ install_requires=[ 'cython', 'dask', + 'dask[distributed]', 'dask[dataframe]', + 'bokeh', + 'jupyter-server-proxy', 'pyarrow', 'pandas', 'xeger', 'progressbar2', 'matplotlib', - 'jupyter_contrib_nbextensions' + 'jupyter_contrib_nbextensions', + 'flake8', + 'tornado' ] -) \ No newline at end of file +) diff --git a/test/test_re2.py b/test/test_re2.py index b914ed9..791ef87 100644 --- a/test/test_re2.py +++ b/test/test_re2.py @@ -21,7 +21,6 @@ def test_small_input(self): """ self.run_and_assert_equal(16e3, 32e3) - def test_large_input(self): """ Test that the re2 implementation computes the same @@ -50,7 +49,6 @@ def test_small_input(self): """ self.run_and_assert_equal(16e3, 32e3) - def test_large_input(self): """ Test that the re2 implementation computes the same @@ -59,5 +57,6 @@ def test_large_input(self): """ self.run_and_assert_equal(32e3, 16e3) + if __name__ == '__main__': unittest.main() diff --git a/test/test_tidre.py b/test/test_tidre.py index 431d791..7e34505 100644 --- a/test/test_tidre.py +++ b/test/test_tidre.py @@ -21,7 +21,6 @@ def test_small_input(self): """ self.run_and_assert_equal(16e3, 32e3) - def test_large_input(self): """ Test that the tidre implementation computes the same @@ -50,7 +49,6 @@ def test_small_input(self): """ self.run_and_assert_equal(16e3, 32e3) - def test_large_input(self): """ Test that the tidre implementation computes the same @@ -59,5 +57,6 @@ def test_large_input(self): """ self.run_and_assert_equal(32e3, 16e3) + if __name__ == '__main__': unittest.main()