Skip to content
This repository has been archived by the owner on Jun 20, 2022. It is now read-only.

Worker optimization #5

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7089124
Added dask distributed to required packages
bobluppes Apr 13, 2021
916412d
Fixed syntax error in setup.py
bobluppes Apr 13, 2021
eb33ed6
Added jupyter server proxy to required packages
bobluppes Apr 16, 2021
ffdb453
Added jupyter notebooks for scheduler and worker plugins
bobluppes Apr 16, 2021
245ee28
Removed depricated workers.ipynb
bobluppes Apr 16, 2021
89ad246
Added dask distributed worker space to .gitignore
bobluppes Apr 16, 2021
2da94fe
Added some cells for testing the connection between a scheduler and w…
bobluppes Apr 23, 2021
80a9a94
Ignore all dask-worker-space directories in project
bobluppes Apr 23, 2021
fba0e9f
Added tornado and flake8 to required packages
bobluppes Apr 23, 2021
847a18b
Basic setup for an accelerated worker in dask distributed
bobluppes Apr 23, 2021
a246de1
accelerated_worker.py substitutes re2 operator
bobluppes Apr 23, 2021
03d1414
Fixed bug in which accelerated worker did not get any arguments at cu…
bobluppes Apr 23, 2021
057d7b8
Added config for Flake8
bobluppes Apr 25, 2021
aae071a
Added data_generator dir to Flake8 exclude since it is a submodule
bobluppes Apr 25, 2021
a68b999
Added workflow to check flake8 on PR
bobluppes Apr 25, 2021
81e90c3
Fixed Flake8 warnings
bobluppes Apr 25, 2021
bc449fe
Added linter action badge to README.md
bobluppes Apr 25, 2021
ca2f51b
Added simple benchmark for multiple accelerated workers to main.py
bobluppes Apr 26, 2021
38b049e
Added tabulate to plots.ipynb
bobluppes Apr 28, 2021
f76ab60
Changed CMakeLists.txt ABI to 1
bobluppes Apr 30, 2021
bf72776
Streamlined testing on AWS
bobluppes May 2, 2021
ba3bbca
Added bokeh to setup.py
bobluppes May 2, 2021
e94e7b3
Added option to spin up vanilla workers from start_worker.py
bobluppes May 5, 2021
9278880
Added batch size benchmark
bobluppes May 5, 2021
e69d0f7
Specify scheduler address on cli for main.py
bobluppes May 5, 2021
08c0880
Fixed lintly E503
bobluppes May 8, 2021
f70761e
Start scheduler explicitly such that cluster is not running in local …
bobluppes May 8, 2021
785c917
Cleaned main.py and extracted scheduler specific methods to helpers.py
bobluppes May 8, 2021
af00024
Remove benchmark dry run
bobluppes May 8, 2021
d69da99
Updated benchmark config
bobluppes May 8, 2021
5be4ab6
Fixed keyerror in main.py
bobluppes May 8, 2021
622f23e
Dont overwrite data dict and set repeats to 1
bobluppes May 8, 2021
23ad475
Set repeats to 5
bobluppes May 8, 2021
75abf66
Made plots for both cluster benchmarks in plots-worker.ipynb
bobluppes May 8, 2021
c4dfc8f
Added newline to end of utils.py
bobluppes May 8, 2021
40e8384
Added option to start an RE2 accelerated worker
bobluppes May 8, 2021
28183cf
Cleaned up main.py and extracted functionalities to benchmarks.py and…
bobluppes May 8, 2021
c9b6159
Added method to warm workers, effectively scattering input parquet files
bobluppes May 8, 2021
e8f1b8d
6 repeats and dont consider first 3 runs in benchmark
bobluppes May 9, 2021
1c05e77
Warm workers in main loop and do consider first 3 runs
bobluppes May 9, 2021
8cf1608
Use underscore variable for dry run result in benchmarks.py
bobluppes May 9, 2021
d3cfeba
Newest plots in plots-worker.ipynb
bobluppes May 12, 2021
702d9a2
Open changes in plots-worker.ipynb
bobluppes Jun 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: lint

on: [pull_request]

jobs:
flake8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: grantmcconnaughey/[email protected]
with:
# The GitHub API token to create reviews with
token: ${{ secrets.GITHUB_TOKEN }}
# Fail if "new" violations detected or "any", default "new"
failIf: new
# Additional arguments to pass to flake8, default "." (current directory)
args: ""
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,7 @@ cython_debug/
*.pdf

# Profiler results
profiler/*.txt
profiler/*.txt

# Dask distributed
dask-worker-space/
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Dask Accelerated

[![test](https://github.com/teratide/dask-accelerated/actions/workflows/test.yml/badge.svg)](https://github.com/teratide/dask-accelerated/actions/workflows/test.yml)
[![lint](https://github.com/teratide/dask-accelerated/actions/workflows/lint.yml/badge.svg)](https://github.com/teratide/dask-accelerated/actions/workflows/lint.yml)

An accelerated version of Dask which substitutes operators in the Dask task graph with an accelerated version.
This new operator can do it's evaluation using native libraries or by offloading the computation to an FPGA accelerator.
Expand Down
3 changes: 0 additions & 3 deletions benchmark/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,3 @@ def benchmark_tidre_in_size(in_sizes, batch_size, batch_aggregate, repeats):
}

return data, name



12 changes: 11 additions & 1 deletion benchmark/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@
from dask_accelerated import helpers


def run_repeats(in_size, batch_size, batch_aggregate, repeats, key, vanilla_filter, re2_filter, tidre_filter=None, tidre_filter_unaligned=None):
def run_repeats(
in_size,
batch_size,
batch_aggregate,
repeats,
key,
vanilla_filter,
re2_filter,
tidre_filter=None,
tidre_filter_unaligned=None
):

# Single run to mitigate caching effects
(res, dur) = helpers.run_vanilla(in_size, batch_size, batch_aggregate)
Expand Down
30 changes: 25 additions & 5 deletions benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

args = parser.parse_args()


def benchmark_re2(in_sizes, batch_aggregates, repeats):

# Constants when varying a single parameter
Expand All @@ -28,10 +29,20 @@ def benchmark_re2(in_sizes, batch_aggregates, repeats):

data = {}

(benchmark_data, benchmark_name) = bench.benchmark_re2_in_size(in_sizes, constant_batch_size_in_benchmark, constant_batch_aggregate, repeats)
(benchmark_data, benchmark_name) = bench.benchmark_re2_in_size(
in_sizes,
constant_batch_size_in_benchmark,
constant_batch_aggregate,
repeats
)
data[benchmark_name] = benchmark_data

(benchmark_data, benchmark_name) = bench.benchmark_re2_batch_size(constant_in_size, constant_batch_size_batch_benchmark, batch_aggregates, repeats)
(benchmark_data, benchmark_name) = bench.benchmark_re2_batch_size(
constant_in_size,
constant_batch_size_batch_benchmark,
batch_aggregates,
repeats
)
data[benchmark_name] = benchmark_data

benchmark_helpers.print_and_store_with_or_without_tidre(data, False)
Expand All @@ -52,10 +63,20 @@ def benchmark_tidre(in_sizes, batch_aggregates, repeats):

data = {}

(benchmark_data, benchmark_name) = bench.benchmark_tidre_in_size(in_sizes, constant_batch_size_in_benchmark, constant_batch_aggregate, repeats)
(benchmark_data, benchmark_name) = bench.benchmark_tidre_in_size(
in_sizes,
constant_batch_size_in_benchmark,
constant_batch_aggregate,
repeats
)
data[benchmark_name] = benchmark_data

(benchmark_data, benchmark_name) = bench.benchmark_tidre_batch_size(constant_in_size, constant_batch_size_batch_benchmark, batch_aggregates, repeats)
(benchmark_data, benchmark_name) = bench.benchmark_tidre_batch_size(
constant_in_size,
constant_batch_size_batch_benchmark,
batch_aggregates,
repeats
)
data[benchmark_name] = benchmark_data

benchmark_helpers.print_and_store_with_or_without_tidre(data, True)
Expand All @@ -75,4 +96,3 @@ def benchmark_tidre(in_sizes, batch_aggregates, repeats):
end = time.time()

print("Ran all benchmarks in ", (end - start) / 60, " minutes")

2 changes: 1 addition & 1 deletion benchmark/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ def load_from_notebooks():
with open(data_root + 'data.pickle', 'rb') as f:
data = pickle.load(f)

return data
return data
14 changes: 11 additions & 3 deletions dask_accelerated/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ def get_lazy_result(in_size, batch_size, split_row_groups):
parquet_engine = "pyarrow" # Valid engines: ['fastparquet', 'pyarrow', 'pyarrow-dataset', 'pyarrow-legacy']
file_root = "../data_generator/diving/data-"
file_ext = ".parquet"
regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+[dD][iI][vV][iI][nN][gG][ \t\n]+([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*'
regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+' \
'[dD][iI][vV][iI][nN][gG][ \t\n]+' \
'([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*'

# Load the dataframe
columns = ["value", "string"]
Expand Down Expand Up @@ -151,7 +153,11 @@ def run_and_record_durations(dsk, result, substitute_operator):
filter_durations = np.array(substitute_operator.durations)
durations = construct_durations(total_duration_in_seconds, filter_durations)

print("Computed ", res, " in ", total_duration_in_seconds, " seconds\tfilter: ", durations['filter']['total'], " seconds")
print(
"Computed ", res,
" in ", total_duration_in_seconds, " seconds",
"\tfilter: ", durations['filter']['total'], " seconds"
)

return res, durations

Expand Down Expand Up @@ -191,7 +197,9 @@ def generate_datasets_if_needed(sizes, chunksize=1e6):
print("Missing datasets found, these will be generated")
match_percentage = 0.05
data_length = 100
regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+[dD][iI][vV][iI][nN][gG][ \t\n]+([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*'
regex = '.*[tT][eE][rR][aA][tT][iI][dD][eE][ \t\n]+' \
'[dD][iI][vV][iI][nN][gG][ \t\n]+' \
'([sS][uU][bB])+[sS][uU][rR][fF][aA][cC][eE].*'
parquet_chunksize = chunksize
parquet_compression = 'none'

Expand Down
3 changes: 0 additions & 3 deletions dask_accelerated/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ def custom_tidre_unaligned(self, obj, accessor, attr, args, kwargs):
# The number of records in the current batch
number_of_records = obj.size

# The regular expression to be matched
regex = args[0]

# Add some padding to the pandas series, which will be removed from the buffers later
obj_with_padding = pandas.concat([pandas.Series(["a"]), obj])
arr = pyarrow.Array.from_pandas(obj_with_padding)
Expand Down
8 changes: 5 additions & 3 deletions dask_accelerated/optimization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
from dask.optimization import SubgraphCallable


# Unwrap the corresponding subgraph_callable in the task graph in order to insert a custom function
def compute_substitute(dsk, key, custom):
str_match = dsk[(key, 0)]
Expand All @@ -16,6 +17,7 @@ def compute_substitute(dsk, key, custom):

return SubgraphCallable(new_dsk, call.outkey, call.inkeys, "regex_callable")


# Substitute all string match operators in the graph with the custom re2 operator
def optimize_graph_re2(graph, substitute_function):

Expand All @@ -27,7 +29,7 @@ def optimize_graph_re2(graph, substitute_function):
# This key is used to target one of the operators in the task graph
# from which the regex_callable will be constructed
for key in graph.keys():
if re.match(regex, key[0]) != None:
if re.match(regex, key[0]) is not None:
key = key[0] # The keys are tuples and the operator name is the first value
break

Expand All @@ -39,9 +41,9 @@ def optimize_graph_re2(graph, substitute_function):

# Substitute the regex_callable if the operator name matches the str-match pattern
for k in dsk:
if re.match(regex, k[0]) != None:
if re.match(regex, k[0]) is not None:
target_op = list(dsk[k])
target_op[0] = regex_callable
dsk[k] = tuple(target_op)

return dsk
return dsk
Empty file.
60 changes: 60 additions & 0 deletions dask_accelerated_worker/accelerated_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import re
import pickle
from dask.distributed import Worker
from dask.distributed import worker
from dask.optimization import SubgraphCallable
from dask_accelerated.operators import CustomFilter

logger = logging.getLogger(__name__)


# Create an accelerated worker class based on the original worker class
class AcceleratedWorker(Worker):

def add_task(
self,
key,
function=None,
args=None,
kwargs=None,
task=worker.no_value,
who_has=None,
nbytes=None,
priority=None,
duration=None,
resource_restrictions=None,
actor=False,
**kwargs2,
):
regex = re.compile('.*str-match.*')
if re.match(regex, key) is not None:
# This task matches the operation we want to perform on fpga
func = pickle.loads(function)

substitute_op = CustomFilter().custom_tidre

dsk = func.dsk
vals = dsk[func.outkey]
vals_args = vals[3]
new_vals_args = (vals_args[0], [['_func', substitute_op], vals_args[1][1]])
new_vals = (vals[0], vals[1], vals[2], new_vals_args)
dsk[func.outkey] = new_vals

new_func = SubgraphCallable(dsk, func.outkey, func.inkeys, "regex_callable")
function = pickle.dumps(new_func)

super().add_task(
key,
function,
args,
kwargs,
task,
who_has,
nbytes,
priority,
duration,
resource_restrictions,
actor,
**kwargs2,
)
65 changes: 65 additions & 0 deletions dask_accelerated_worker/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from dask.distributed import Scheduler
from tornado.ioloop import IOLoop
import asyncio


def get_scheduler():
kwargs = {
'preload': (),
'preload_argv': (),
'interface': None,
'protocol': None,
'scheduler_file': '',
'idle_timeout': None
}

loop = IOLoop.current()
sec = {}
host = ''
port = 8786
dashboard = True
dashboard_address = 8787
dashboard_prefix = ''

scheduler = Scheduler(
loop=loop,
security=sec,
host=host,
port=port,
dashboard=dashboard,
dashboard_address=dashboard_address,
http_prefix=dashboard_prefix,
**kwargs
)

return scheduler, loop


def run_scheduler(scheduler, loop):

async def run():
await scheduler
await scheduler.finished()

loop.run_sync(run)


def remove_non_accelerated_workers(scheduler):

# New event loop to await async remove worker method
loop = asyncio.new_event_loop()

# TODO: fix this
# Somehow this does not always work on the first try
# A quick but messy fix is to run it more than once to
# ensure all non-accelerated workers get removed
for i in range(3):
workers = scheduler.workers
for worker in workers:
# All accelerated workers are called 'accelerated-[timestamp]'
if str(workers[worker].name).split('-')[0] != 'accelerated':
loop.run_until_complete(
scheduler.remove_worker(address=worker)
)

loop.close()
Loading