From c1285ae0d1274caff72a051f882612ba62d3eecb Mon Sep 17 00:00:00 2001 From: "Maarten A. Breddels" Date: Mon, 21 Jun 2021 11:21:13 +0200 Subject: [PATCH 1/6] Vaex solution --- _control/solutions.csv | 4 +- _launcher/launcher.R | 2 +- _launcher/solution.R | 2 +- run.conf | 2 +- run.sh | 2 + vaex/groupby-vaex.py | 302 +++++++++++++++++++++++++++++++++++++++++ vaex/join-vaex.py | 185 +++++++++++++++++++++++++ vaex/upg-vaex.sh | 8 ++ vaex/ver-vaex.sh | 5 + 9 files changed, 508 insertions(+), 4 deletions(-) create mode 100755 vaex/groupby-vaex.py create mode 100755 vaex/join-vaex.py create mode 100755 vaex/upg-vaex.sh create mode 100755 vaex/ver-vaex.sh diff --git a/_control/solutions.csv b/_control/solutions.csv index eb626279..fce5e529 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -25,4 +25,6 @@ polars,join arrow,groupby arrow,join duckdb,groupby -duckdb,join \ No newline at end of file +duckdb,join +vaex,groupby +vaex,join diff --git a/_launcher/launcher.R b/_launcher/launcher.R index 0820dc37..767cc477 100644 --- a/_launcher/launcher.R +++ b/_launcher/launcher.R @@ -14,7 +14,7 @@ readret = function(x) { file.ext = function(x) { ans = switch( x, - "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", + "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", "vaex"=, "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "clickhouse"="sql", "juliadf"="jl" diff --git a/_launcher/solution.R b/_launcher/solution.R index ec3b70ae..246d257b 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -110,7 +110,7 @@ if ("quiet" %in% names(args)) { file.ext = function(x) { ans = switch( x, - "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", + "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", "vaex"=, "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "clickhouse"="sql", "juliadf"="jl" diff --git a/run.conf b/run.conf index 5d5b0b45..52b8e9e5 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join groupby2014" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="data.table dplyr pandas pydatatable spark dask juliadf cudf clickhouse polars arrow duckdb" +export RUN_SOLUTIONS="vaex data.table dplyr pandas pydatatable spark dask juliadf cudf clickhouse polars arrow duckdb" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true diff --git a/run.sh b/run.sh index c30d73c6..9b4c5b1d 100755 --- a/run.sh +++ b/run.sh @@ -46,6 +46,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "dplyr" ]]; then ./dplyr/upg- if [[ "$RUN_SOLUTIONS" =~ "dplyr" ]]; then ./dplyr/ver-dplyr.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "juliadf" ]]; then ./juliadf/upg-juliadf.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "juliadf" ]]; then ./juliadf/ver-juliadf.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "vaex" ]]; then ./vaex/upg-vaex.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "vaex" ]]; then ./vaex/ver-vaex.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "modin" ]]; then ./modin/upg-modin.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "modin" ]]; then ./modin/ver-modin.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "pandas" ]]; then ./pandas/upg-pandas.sh; fi; diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py new file mode 100755 index 00000000..c36e2ee3 --- /dev/null +++ b/vaex/groupby-vaex.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python + +print("# groupby-vaex.py", flush=True) + +import os +import gc +import timeit +import vaex + +exec(open("./_helpers/helpers.py").read()) + +ver = vaex.__version__['vaex-core'] +git = '-' +task = "groupby" +solution = "vaex" +fun = ".groupby" +cache = "TRUE" +on_disk = "TRUE" + +# vaex.cache.redis() + +data_name = os.environ['SRC_GRP_LOCAL'] +src_grp = os.path.join("data", data_name+".csv") +print("loading dataset %s" % data_name, flush=True) + +x = vaex.open(src_grp, convert=True) +print("loaded dataset") +x.ordinal_encode('id1', inplace=True) +x.ordinal_encode('id2', inplace=True) +x.ordinal_encode('id3', inplace=True) + +task_init = timeit.default_timer() +print("grouping...", flush=True) + +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1']).agg({'v1': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1']).agg({'v1': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id3']).agg({'v1': 'sum', 'v3': 'mean'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +# exact median not implemented +# question = "median v3 sd v3 by id4 id5" # q6 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# we need to see how we do this +# question = "max v1 - min v2 by id3" # q7 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + + +# maybe we can do this with first +# question = "largest two v3 by id6" # q8 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) +# ans.reset_index(drop=True, inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) +# # ans.reset_index(drop=True, inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# can do this with corr +# question = "regression v1 v2 by id2 id4" # q9 +# #ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).corr().iloc[0::2][['v2']]**2 # slower, 76s vs 47s on 1e8 1e2 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# # del ans + +# segfault +# question = "sum v3 count by id1:id6" # q10 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum(), ans['v1'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) +# # ans.reset_index(inplace=True) +# # print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum(), ans['v1'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +print("grouping finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0) diff --git a/vaex/join-vaex.py b/vaex/join-vaex.py new file mode 100755 index 00000000..b6e90d63 --- /dev/null +++ b/vaex/join-vaex.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python + +print("# join-vaex.py", flush=True) + +import os +import gc +import timeit +import vaex + +exec(open("./_helpers/helpers.py").read()) + +ver = vaex.__version__['vaex-core'] +git = 'alpha' +task = "join" +solution = "vaex" +fun = ".join" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ['SRC_JN_LOCAL'] +src_jn_x = os.path.join("data", data_name+".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") + +print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) + +print(src_jn_x, src_jn_y) +x = vaex.open(src_jn_x, convert=True) +x.ordinal_encode('id1', inplace=True) +x.ordinal_encode('id2', inplace=True) +x.ordinal_encode('id3', inplace=True) + + +small = vaex.open(src_jn_y[0], convert=True) +small.ordinal_encode('id1', inplace=True) +medium = vaex.open(src_jn_y[1], convert=True) +medium.ordinal_encode('id1', inplace=True) +medium.ordinal_encode('id2', inplace=True) +big = vaex.open(src_jn_y[2], convert=True) +big.ordinal_encode('id1', inplace=True) +big.ordinal_encode('id2', inplace=True) +big.ordinal_encode('id3', inplace=True) +print(len(x), flush=True) +print(len(small), flush=True) +print(len(medium), flush=True) +print(len(big), flush=True) + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, on='id1', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, on='id1', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, on='id2', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, on='id2', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, on='id5', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, on='id5', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, on='id3', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, on='id3', rsuffix='_r') +#print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0) diff --git a/vaex/upg-vaex.sh b/vaex/upg-vaex.sh new file mode 100755 index 00000000..e7795c31 --- /dev/null +++ b/vaex/upg-vaex.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +echo 'upgrading vaex...' + +source ./vaex/py-vaex/bin/activate + +python -m pip install -U vaex-core vaex-hdf5 > /dev/null diff --git a/vaex/ver-vaex.sh b/vaex/ver-vaex.sh new file mode 100755 index 00000000..280d89f1 --- /dev/null +++ b/vaex/ver-vaex.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +source ./pandas/py-pandas/bin/activate +python -c 'import vaex; open("vaex/VERSION","w").write(vaex.__version__["vaex-core"]); open("vaex/REVISION","w").write("alpha");' > /dev/null From a8f241fdadaa9d761360bec6cb2c8f1da38967eb Mon Sep 17 00:00:00 2001 From: Jovan Veljanoski Date: Wed, 30 Jun 2021 18:12:01 +0200 Subject: [PATCH 2/6] refactor the vaex-groupby scipt --- vaex/groupby-vaex.py | 341 ++++++++++--------------------------------- 1 file changed, 74 insertions(+), 267 deletions(-) diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py index c36e2ee3..55d6cffb 100755 --- a/vaex/groupby-vaex.py +++ b/vaex/groupby-vaex.py @@ -6,6 +6,7 @@ import gc import timeit import vaex +vaex.multithreading.thread_count_default = 8 exec(open("./_helpers/helpers.py").read()) @@ -19,284 +20,90 @@ # vaex.cache.redis() -data_name = os.environ['SRC_GRP_LOCAL'] +data_name = os.environ['SRC_DATANAME'] src_grp = os.path.join("data", data_name+".csv") print("loading dataset %s" % data_name, flush=True) -x = vaex.open(src_grp, convert=True) +x = vaex.open(src_grp, convert=True, dtype={"id4":"Int32", "id5":"Int32", "id6":"Int32", "v1":"Int32", "v2":"Int32"}) print("loaded dataset") x.ordinal_encode('id1', inplace=True) x.ordinal_encode('id2', inplace=True) x.ordinal_encode('id3', inplace=True) -task_init = timeit.default_timer() -print("grouping...", flush=True) - -question = "sum v1 by id1" # q1 -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id1']).agg({'v1': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id1']).agg({'v1': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans - -question = "sum v1 by id1:id2" # q2 -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "sum v1 mean v3 by id3" # q3 -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id3']).agg({'v1': 'sum', 'v3': 'mean'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "mean v1:v3 by id4" # q4 -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans -question = "sum v1:v3 by id6" # q5 -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans +# Questions +def question_1(): + return x.groupby(['id1']).agg({'v1': 'sum'}) + +def question_2(): + return x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) + +def question_3(): + return x.groupby(['id3']).agg({'v1': 'sum', 'v3': 'mean'}) + +def question_4(): + return x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) + +def question_5(): + return x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) + +def question_6(): + return x.groupby(['id4','id5']).agg({'v3': ['median','std']}) + +def question_7(): + return x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] + +def question_8(): + return x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) + +def question_9(): + return x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) + +def question_10(): + return x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) + +# Generic benchmark function - to improve code readability +def benchmark(func, question, chk_sum_cols): + gc.collect() + t_start = timeit.default_timer() + ans = func() + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + del ans + gc.collect() + t_start = timeit.default_timer() + ans = func() + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + print(ans.head(3), flush=True) + print(ans.tail(3), flush=True) + del ans -# exact median not implemented -# question = "median v3 sd v3 by id4 id5" # q6 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# del ans -# we need to see how we do this -# question = "max v1 - min v2 by id3" # q7 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['range_v1_v2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['range_v1_v2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# del ans - - -# maybe we can do this with first -# question = "largest two v3 by id6" # q8 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) -# ans.reset_index(drop=True, inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) -# # ans.reset_index(drop=True, inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# del ans - -# can do this with corr -# question = "regression v1 v2 by id2 id4" # q9 -# #ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).corr().iloc[0::2][['v2']]**2 # slower, 76s vs 47s on 1e8 1e2 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['r2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['r2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# # del ans - -# segfault -# question = "sum v3 count by id1:id6" # q10 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3'].sum(), ans['v1'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) -# # ans.reset_index(inplace=True) -# # print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['v3'].sum(), ans['v1'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# del ans +task_init = timeit.default_timer() +print("grouping...", flush=True) -print("grouping finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) +benchmark(question_1, question="sum v1 by id1", chk_sum_cols=['v1']) +benchmark(question_2, question="sum v1 by id1:id2", chk_sum_cols=['v1']) +benchmark(question_3, question="sum v1 mean v3 by id3", chk_sum_cols=['v1', 'v3']) +benchmark(question_4, question="mean v1:v3 by id4", chk_sum_cols=['v1', 'v2', 'v3']) +benchmark(question_5, question="sum v1:v3 by id6", chk_sum_cols=['v1', 'v2', 'v3']) +# benchmark(question_6, question="median v3 sd v3 by id4 id5", chk_sum_cols=['v3_median', 'v3_std']) +# benchmark(question_7, question="max v1 - min v2 by id3", chk_sum_cols=['range_v1_v2']) +# benchmark(question_8, question="largest two v3 by id6", chk_sum_cols=['v3']) +# benchmark(question_9, question="regression v1 v2 by id2 id4", chk_sum_cols=['r2']) +# benchmark(question_10, question="sum v3 count by id1:id6", chk_sum_cols=['v3', 'v1']) + +print("grouping finished, took %0.3fs" % (timeit.default_timer()-task_init), flush=True) exit(0) From 13d7700e34b2804b1b8050b2b9d811f750526c33 Mon Sep 17 00:00:00 2001 From: Jovan Veljanoski Date: Wed, 7 Jul 2021 17:10:20 +0200 Subject: [PATCH 3/6] Refactor the join-vaex script to make it more readable --- vaex/groupby-vaex.py | 69 +++++++++------- vaex/join-vaex.py | 191 +++++++++++++------------------------------ 2 files changed, 98 insertions(+), 162 deletions(-) diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py index 55d6cffb..68f72b91 100755 --- a/vaex/groupby-vaex.py +++ b/vaex/groupby-vaex.py @@ -29,66 +29,79 @@ x.ordinal_encode('id1', inplace=True) x.ordinal_encode('id2', inplace=True) x.ordinal_encode('id3', inplace=True) +x.ordinal_encode('id4', inplace=True) +x.ordinal_encode('id5', inplace=True) +x.ordinal_encode('id6', inplace=True) + + +# Generic benchmark function - to improve code readability +def benchmark(func, question, chk_sum_cols): + gc.collect() + t_start = timeit.default_timer() + ans = func() + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + del ans + gc.collect() + t_start = timeit.default_timer() + ans = func() + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + print(ans.head(3), flush=True) + print(ans.tail(3), flush=True) + del ans # Questions def question_1(): return x.groupby(['id1']).agg({'v1': 'sum'}) + def question_2(): return x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) + def question_3(): return x.groupby(['id3']).agg({'v1': 'sum', 'v3': 'mean'}) + def question_4(): return x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) + def question_5(): return x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) + def question_6(): return x.groupby(['id4','id5']).agg({'v3': ['median','std']}) + def question_7(): return x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] + def question_8(): return x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) + def question_9(): return x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) + def question_10(): return x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) -# Generic benchmark function - to improve code readability -def benchmark(func, question, chk_sum_cols): - gc.collect() - t_start = timeit.default_timer() - ans = func() - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - del ans - gc.collect() - t_start = timeit.default_timer() - ans = func() - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - print(ans.head(3), flush=True) - print(ans.tail(3), flush=True) - del ans - task_init = timeit.default_timer() print("grouping...", flush=True) @@ -99,7 +112,7 @@ def benchmark(func, question, chk_sum_cols): benchmark(question_4, question="mean v1:v3 by id4", chk_sum_cols=['v1', 'v2', 'v3']) benchmark(question_5, question="sum v1:v3 by id6", chk_sum_cols=['v1', 'v2', 'v3']) # benchmark(question_6, question="median v3 sd v3 by id4 id5", chk_sum_cols=['v3_median', 'v3_std']) -# benchmark(question_7, question="max v1 - min v2 by id3", chk_sum_cols=['range_v1_v2']) +# benchmark(question_7, question="max v1 - min v2 by id3", chk_sum_cols=['range_v1_v2'])с # benchmark(question_8, question="largest two v3 by id6", chk_sum_cols=['v3']) # benchmark(question_9, question="regression v1 v2 by id2 id4", chk_sum_cols=['r2']) # benchmark(question_10, question="sum v3 count by id1:id6", chk_sum_cols=['v3', 'v1']) diff --git a/vaex/join-vaex.py b/vaex/join-vaex.py index b6e90d63..3c03efc6 100755 --- a/vaex/join-vaex.py +++ b/vaex/join-vaex.py @@ -17,7 +17,7 @@ cache = "TRUE" on_disk = "FALSE" -data_name = os.environ['SRC_JN_LOCAL'] +data_name = os.environ['SRC_DATANAME'] src_jn_x = os.path.join("data", data_name+".csv") y_data_name = join_to_tbls(data_name) src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")] @@ -27,158 +27,81 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) print(src_jn_x, src_jn_y) -x = vaex.open(src_jn_x, convert=True) +x = vaex.open(src_jn_x, convert=True, dtype={"id1": "Int8", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) x.ordinal_encode('id1', inplace=True) x.ordinal_encode('id2', inplace=True) x.ordinal_encode('id3', inplace=True) -small = vaex.open(src_jn_y[0], convert=True) +small = vaex.open(src_jn_y[0], convert=True, dtype={"id1": "Int16", "v2": "Float32"}) small.ordinal_encode('id1', inplace=True) -medium = vaex.open(src_jn_y[1], convert=True) +medium = vaex.open(src_jn_y[1], convert=True, dtype={"id1": "Int16", "id2": "Int32", "v2": "Float32"}) medium.ordinal_encode('id1', inplace=True) medium.ordinal_encode('id2', inplace=True) -big = vaex.open(src_jn_y[2], convert=True) +big = vaex.open(src_jn_y[2], convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) big.ordinal_encode('id1', inplace=True) big.ordinal_encode('id2', inplace=True) big.ordinal_encode('id3', inplace=True) + print(len(x), flush=True) print(len(small), flush=True) print(len(medium), flush=True) print(len(big), flush=True) +def benchmark(func, question, chk_sum_cols): + gc.collect() + t_start = timeit.default_timer() + ans = func() + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + del ans + gc.collect() + t_start = timeit.default_timer() + ans = func() + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + print(ans.head(3), flush=True) + print(ans.tail(3), flush=True) + del ans + + +def question_1(): + return x.join(small, how='inner', on='id1', rsuffix='_r') + + +def question_2(): + return x.join(medium, how='inner', on='id2', rsuffix='_r') + + +def question_3(): + return x.join(medium, how='left', on='id2', rsuffix='_r') + + +def question_4(): + return x.join(medium, how='inner', on='id5', rsuffix='_r') + + +def question_5(): + return x.join(big, how='inner', on='id3', rsuffix='_r') + + task_init = timeit.default_timer() print("joining...", flush=True) -question = "small inner on int" # q1 -gc.collect() -t_start = timeit.default_timer() -ans = x.join(small, on='id1', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.join(small, on='id1', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "medium inner on int" # q2 -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, on='id2', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, on='id2', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "medium outer on int" # q3 -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, how='left', on='id2', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, how='left', on='id2', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "medium inner on factor" # q4 -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, on='id5', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.join(medium, on='id5', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans - -question = "big inner on int" # q5 -gc.collect() -t_start = timeit.default_timer() -ans = x.join(big, on='id3', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.join(big, on='id3', rsuffix='_r') -#print(ans.shape, flush=True) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -print(ans.head(3), flush=True) -print(ans.tail(3), flush=True) -del ans +benchmark(question_1, question="small inner on int", chk_sum_cols=['v1', 'v2']) +benchmark(question_2, question="medium inner on int", chk_sum_cols=['v1', 'v2']) +benchmark(question_3, question="medium outer on int", chk_sum_cols=['v1', 'v2']) +benchmark(question_4, question="medium inner on factor", chk_sum_cols=['v1', 'v2']) +benchmark(question_5, question="big inner on int", chk_sum_cols=['v1', 'v2']) print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) From 18f0ba5f22fbc5e7c497f6c4f1144531130aaf59 Mon Sep 17 00:00:00 2001 From: "Maarten A. Breddels" Date: Wed, 9 Feb 2022 15:44:34 +0100 Subject: [PATCH 4/6] Refactor a bit --- vaex/groupby-vaex.py | 87 ++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py index 68f72b91..65b338b7 100755 --- a/vaex/groupby-vaex.py +++ b/vaex/groupby-vaex.py @@ -6,7 +6,6 @@ import gc import timeit import vaex -vaex.multithreading.thread_count_default = 8 exec(open("./_helpers/helpers.py").read()) @@ -18,69 +17,38 @@ cache = "TRUE" on_disk = "TRUE" -# vaex.cache.redis() - data_name = os.environ['SRC_DATANAME'] src_grp = os.path.join("data", data_name+".csv") print("loading dataset %s" % data_name, flush=True) -x = vaex.open(src_grp, convert=True, dtype={"id4":"Int32", "id5":"Int32", "id6":"Int32", "v1":"Int32", "v2":"Int32"}) +x = vaex.open(src_grp, convert=True, dtype={"id4":"Int8", "id5":"Int8", "id6":"Int32", "v1":"Int8", "v2":"Int8"}) print("loaded dataset") -x.ordinal_encode('id1', inplace=True) -x.ordinal_encode('id2', inplace=True) -x.ordinal_encode('id3', inplace=True) -x.ordinal_encode('id4', inplace=True) -x.ordinal_encode('id5', inplace=True) -x.ordinal_encode('id6', inplace=True) - - -# Generic benchmark function - to improve code readability -def benchmark(func, question, chk_sum_cols): - gc.collect() - t_start = timeit.default_timer() - ans = func() - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - del ans - gc.collect() - t_start = timeit.default_timer() - ans = func() - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - print(ans.head(3), flush=True) - print(ans.tail(3), flush=True) - del ans +with vaex.cache.disk(): + x.ordinal_encode('id1', inplace=True) + x.ordinal_encode('id2', inplace=True) + x.ordinal_encode('id3', inplace=True) + print(x.dtypes) # Questions def question_1(): - return x.groupby(['id1']).agg({'v1': 'sum'}) + return x.groupby(['id1'], agg={'v1': 'sum'}) def question_2(): - return x.groupby(['id1', 'id2']).agg({'v1': 'sum'}) + return x.groupby(['id1', 'id2'], agg={'v1': 'sum'}) def question_3(): - return x.groupby(['id3']).agg({'v1': 'sum', 'v3': 'mean'}) + return x.groupby(['id3'], agg={'v1': 'sum', 'v3': 'mean'}) def question_4(): - return x.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) + return x.groupby(['id4'], agg={'v1':'mean', 'v2':'mean', 'v3':'mean'}) def question_5(): - return x.groupby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) + return x.groupby(['id6'], agg={'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) def question_6(): @@ -102,6 +70,39 @@ def question_9(): def question_10(): return x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) +# Generic benchmark function - to improve code readability +def benchmark(func, question, chk_sum_cols): + with vaex.progress.tree(title=question) as progress: + gc.collect() + t_start = timeit.default_timer() + with progress.add("iter1"): + ans = func() + if verbose: + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + del ans + gc.collect() + t_start = timeit.default_timer() + with progress.add("iter2"): + ans = func() + if verbose: + print(ans.shape, flush=True) + t = timeit.default_timer() - t_start + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans[col].sum() for col in chk_sum_cols] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) + if verbose: + print(ans.head(3), flush=True) + print(ans.tail(3), flush=True) + del ans + task_init = timeit.default_timer() print("grouping...", flush=True) From 17ee4f1e983d3f32ebc7801ec41d73ce500b11c3 Mon Sep 17 00:00:00 2001 From: Jovan Veljanoski Date: Fri, 11 Feb 2022 17:24:04 +0100 Subject: [PATCH 5/6] refactor to comply with repo standards --- vaex/groupby-vaex.py | 349 ++++++++++++++++++++++++++++++++----------- vaex/join-vaex.py | 183 ++++++++++++++++------- 2 files changed, 396 insertions(+), 136 deletions(-) diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py index 65b338b7..a452efef 100755 --- a/vaex/groupby-vaex.py +++ b/vaex/groupby-vaex.py @@ -29,94 +29,275 @@ x.ordinal_encode('id3', inplace=True) print(x.dtypes) - -# Questions -def question_1(): - return x.groupby(['id1'], agg={'v1': 'sum'}) - - -def question_2(): - return x.groupby(['id1', 'id2'], agg={'v1': 'sum'}) - - -def question_3(): - return x.groupby(['id3'], agg={'v1': 'sum', 'v3': 'mean'}) - - -def question_4(): - return x.groupby(['id4'], agg={'v1':'mean', 'v2':'mean', 'v3':'mean'}) - - -def question_5(): - return x.groupby(['id6'], agg={'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) - - -def question_6(): - return x.groupby(['id4','id5']).agg({'v3': ['median','std']}) - - -def question_7(): - return x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] - - -def question_8(): - return x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) - - -def question_9(): - return x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) - - -def question_10(): - return x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) - -# Generic benchmark function - to improve code readability -def benchmark(func, question, chk_sum_cols): - with vaex.progress.tree(title=question) as progress: - gc.collect() - t_start = timeit.default_timer() - with progress.add("iter1"): - ans = func() - if verbose: - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - del ans - gc.collect() - t_start = timeit.default_timer() - with progress.add("iter2"): - ans = func() - if verbose: - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - if verbose: - print(ans.head(3), flush=True) - print(ans.tail(3), flush=True) - del ans - +in_rows = x.shape[0] +print(in_rows, flush=True) task_init = timeit.default_timer() print("grouping...", flush=True) -benchmark(question_1, question="sum v1 by id1", chk_sum_cols=['v1']) -benchmark(question_2, question="sum v1 by id1:id2", chk_sum_cols=['v1']) -benchmark(question_3, question="sum v1 mean v3 by id3", chk_sum_cols=['v1', 'v3']) -benchmark(question_4, question="mean v1:v3 by id4", chk_sum_cols=['v1', 'v2', 'v3']) -benchmark(question_5, question="sum v1:v3 by id6", chk_sum_cols=['v1', 'v2', 'v3']) -# benchmark(question_6, question="median v3 sd v3 by id4 id5", chk_sum_cols=['v3_median', 'v3_std']) -# benchmark(question_7, question="max v1 - min v2 by id3", chk_sum_cols=['range_v1_v2'])с -# benchmark(question_8, question="largest two v3 by id6", chk_sum_cols=['v3']) -# benchmark(question_9, question="regression v1 v2 by id2 id4", chk_sum_cols=['r2']) -# benchmark(question_10, question="sum v3 count by id1:id6", chk_sum_cols=['v3', 'v1']) +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id1').agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id1').agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1','id2']).agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1','id2']).agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3').agg({'v1':'sum', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3').agg({'v1':'sum', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id4').agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id4').agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id6').agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id6').agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +# question = "median v3 sd v3 by id4 id5" # q6 # median function not yet implemented: https://github.com/dask/dask/issues/4362 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "max v1 - min v2 by id3" # q7 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby('id3').agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']] +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby('id3').agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']] +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "largest two v3 by id6" # q8 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6').apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']] +# ans.reset_index(level='id6', inplace=True) +# ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6').apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']] +# ans.reset_index(level='id6', inplace=True) +# ans.reset_index(drop=True, inplace=True) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "regression v1 v2 by id2 id4" # q9 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "sum v3 count by id1:id6" # q10 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'size'}) # column name different than expected, ignore it because: ValueError: Metadata inference failed in `rename`: Original error is below: ValueError('Level values must be unique: [nan, nan] on level 0',) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans.v3.sum(), ans.v1.sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'size'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans.v3.sum(), ans.v1.sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans print("grouping finished, took %0.3fs" % (timeit.default_timer()-task_init), flush=True) diff --git a/vaex/join-vaex.py b/vaex/join-vaex.py index 3c03efc6..4679fe02 100755 --- a/vaex/join-vaex.py +++ b/vaex/join-vaex.py @@ -43,65 +43,144 @@ big.ordinal_encode('id2', inplace=True) big.ordinal_encode('id3', inplace=True) -print(len(x), flush=True) +in_rows = len(x) +print(in_rows, flush=True) print(len(small), flush=True) print(len(medium), flush=True) print(len(big), flush=True) -def benchmark(func, question, chk_sum_cols): - gc.collect() - t_start = timeit.default_timer() - ans = func() - print(ans.shape, flush=True) - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - del ans - gc.collect() - t_start = timeit.default_timer() - ans = func() - t = timeit.default_timer() - t_start - m = memory_usage() - t_start = timeit.default_timer() - chk = [ans[col].sum() for col in chk_sum_cols] - chkt = timeit.default_timer() - t_start - write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) - print(ans.head(3), flush=True) - print(ans.tail(3), flush=True) - del ans - - -def question_1(): - return x.join(small, how='inner', on='id1', rsuffix='_r') - - -def question_2(): - return x.join(medium, how='inner', on='id2', rsuffix='_r') - - -def question_3(): - return x.join(medium, how='left', on='id2', rsuffix='_r') - - -def question_4(): - return x.join(medium, how='inner', on='id5', rsuffix='_r') - - -def question_5(): - return x.join(big, how='inner', on='id3', rsuffix='_r') - - task_init = timeit.default_timer() print("joining...", flush=True) -benchmark(question_1, question="small inner on int", chk_sum_cols=['v1', 'v2']) -benchmark(question_2, question="medium inner on int", chk_sum_cols=['v1', 'v2']) -benchmark(question_3, question="medium outer on int", chk_sum_cols=['v1', 'v2']) -benchmark(question_4, question="medium inner on factor", chk_sum_cols=['v1', 'v2']) -benchmark(question_5, question="big inner on int", chk_sum_cols=['v1', 'v2']) +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, how='inner', on='id1', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, how='inner', on='id1', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id5', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id5', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, how='inner', on='id3', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, how='inner', on='id3', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) From 06d93eb62e3f8745234d0b703cc201d808983f76 Mon Sep 17 00:00:00 2001 From: Jovan Veljanoski Date: Tue, 15 Feb 2022 18:52:04 +0100 Subject: [PATCH 6/6] small improvements --- vaex/groupby-vaex.py | 8 +++----- vaex/join-vaex.py | 9 +++------ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py index a452efef..913b77c3 100755 --- a/vaex/groupby-vaex.py +++ b/vaex/groupby-vaex.py @@ -23,11 +23,9 @@ x = vaex.open(src_grp, convert=True, dtype={"id4":"Int8", "id5":"Int8", "id6":"Int32", "v1":"Int8", "v2":"Int8"}) print("loaded dataset") -with vaex.cache.disk(): - x.ordinal_encode('id1', inplace=True) - x.ordinal_encode('id2', inplace=True) - x.ordinal_encode('id3', inplace=True) - print(x.dtypes) +x.ordinal_encode('id1', inplace=True) +x.ordinal_encode('id2', inplace=True) +x.ordinal_encode('id3', inplace=True) in_rows = x.shape[0] print(in_rows, flush=True) diff --git a/vaex/join-vaex.py b/vaex/join-vaex.py index 4679fe02..934500db 100755 --- a/vaex/join-vaex.py +++ b/vaex/join-vaex.py @@ -10,12 +10,12 @@ exec(open("./_helpers/helpers.py").read()) ver = vaex.__version__['vaex-core'] -git = 'alpha' +git = '-' task = "join" solution = "vaex" fun = ".join" cache = "TRUE" -on_disk = "FALSE" +on_disk = "TRUE" data_name = os.environ['SRC_DATANAME'] src_jn_x = os.path.join("data", data_name+".csv") @@ -27,11 +27,9 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) print(src_jn_x, src_jn_y) -x = vaex.open(src_jn_x, convert=True, dtype={"id1": "Int8", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) +x = vaex.open(src_jn_x, convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v1": "Float32"}) x.ordinal_encode('id1', inplace=True) x.ordinal_encode('id2', inplace=True) -x.ordinal_encode('id3', inplace=True) - small = vaex.open(src_jn_y[0], convert=True, dtype={"id1": "Int16", "v2": "Float32"}) small.ordinal_encode('id1', inplace=True) @@ -41,7 +39,6 @@ big = vaex.open(src_jn_y[2], convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) big.ordinal_encode('id1', inplace=True) big.ordinal_encode('id2', inplace=True) -big.ordinal_encode('id3', inplace=True) in_rows = len(x) print(in_rows, flush=True)