diff --git a/_control/solutions.csv b/_control/solutions.csv index eb626279..fce5e529 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -25,4 +25,6 @@ polars,join arrow,groupby arrow,join duckdb,groupby -duckdb,join \ No newline at end of file +duckdb,join +vaex,groupby +vaex,join diff --git a/_launcher/launcher.R b/_launcher/launcher.R index 0820dc37..767cc477 100644 --- a/_launcher/launcher.R +++ b/_launcher/launcher.R @@ -14,7 +14,7 @@ readret = function(x) { file.ext = function(x) { ans = switch( x, - "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", + "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", "vaex"=, "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "clickhouse"="sql", "juliadf"="jl" diff --git a/_launcher/solution.R b/_launcher/solution.R index ec3b70ae..246d257b 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -110,7 +110,7 @@ if ("quiet" %in% names(args)) { file.ext = function(x) { ans = switch( x, - "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", + "data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R", "vaex"=, "pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "clickhouse"="sql", "juliadf"="jl" diff --git a/run.conf b/run.conf index 5d5b0b45..52b8e9e5 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join groupby2014" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="data.table dplyr pandas pydatatable spark dask juliadf cudf clickhouse polars arrow duckdb" +export RUN_SOLUTIONS="vaex data.table dplyr pandas pydatatable spark dask juliadf cudf clickhouse polars arrow duckdb" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true diff --git a/run.sh b/run.sh index c30d73c6..9b4c5b1d 100755 --- a/run.sh +++ b/run.sh @@ -46,6 +46,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "dplyr" ]]; then ./dplyr/upg- if [[ "$RUN_SOLUTIONS" =~ "dplyr" ]]; then ./dplyr/ver-dplyr.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "juliadf" ]]; then ./juliadf/upg-juliadf.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "juliadf" ]]; then ./juliadf/ver-juliadf.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "vaex" ]]; then ./vaex/upg-vaex.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "vaex" ]]; then ./vaex/ver-vaex.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "modin" ]]; then ./modin/upg-modin.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "modin" ]]; then ./modin/ver-modin.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "pandas" ]]; then ./pandas/upg-pandas.sh; fi; diff --git a/vaex/groupby-vaex.py b/vaex/groupby-vaex.py new file mode 100755 index 00000000..913b77c3 --- /dev/null +++ b/vaex/groupby-vaex.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python + +print("# groupby-vaex.py", flush=True) + +import os +import gc +import timeit +import vaex + +exec(open("./_helpers/helpers.py").read()) + +ver = vaex.__version__['vaex-core'] +git = '-' +task = "groupby" +solution = "vaex" +fun = ".groupby" +cache = "TRUE" +on_disk = "TRUE" + +data_name = os.environ['SRC_DATANAME'] +src_grp = os.path.join("data", data_name+".csv") +print("loading dataset %s" % data_name, flush=True) + +x = vaex.open(src_grp, convert=True, dtype={"id4":"Int8", "id5":"Int8", "id6":"Int32", "v1":"Int8", "v2":"Int8"}) +print("loaded dataset") +x.ordinal_encode('id1', inplace=True) +x.ordinal_encode('id2', inplace=True) +x.ordinal_encode('id3', inplace=True) + +in_rows = x.shape[0] +print(in_rows, flush=True) + +task_init = timeit.default_timer() +print("grouping...", flush=True) + +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id1').agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id1').agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1','id2']).agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id1','id2']).agg({'v1':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3').agg({'v1':'sum', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3').agg({'v1':'sum', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id4').agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id4').agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id6').agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id6').agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v1.sum(), ans.v2.sum(), ans.v3.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +# question = "median v3 sd v3 by id4 id5" # q6 # median function not yet implemented: https://github.com/dask/dask/issues/4362 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id4','id5']).agg({'v3': ['median','std']}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "max v1 - min v2 by id3" # q7 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby('id3').agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']] +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby('id3').agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']] +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['range_v1_v2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "largest two v3 by id6" # q8 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6').apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']] +# ans.reset_index(level='id6', inplace=True) +# ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6').apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']] +# ans.reset_index(level='id6', inplace=True) +# ans.reset_index(drop=True, inplace=True) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['v3'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "regression v1 v2 by id2 id4" # q9 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans['r2'].sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +# question = "sum v3 count by id1:id6" # q10 +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'size'}) # column name different than expected, ignore it because: ValueError: Metadata inference failed in `rename`: Original error is below: ValueError('Level values must be unique: [nan, nan] on level 0',) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans.v3.sum(), ans.v1.sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# del ans +# gc.collect() +# t_start = timeit.default_timer() +# ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'size'}) +# print(ans.shape, flush=True) +# t = timeit.default_timer() - t_start +# m = memory_usage() +# t_start = timeit.default_timer() +# chk = [ans.v3.sum(), ans.v1.sum()] +# chkt = timeit.default_timer() - t_start +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +# print(ans.head(3), flush=True) +# print(ans.tail(3), flush=True) +# del ans + +print("grouping finished, took %0.3fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0) diff --git a/vaex/join-vaex.py b/vaex/join-vaex.py new file mode 100755 index 00000000..934500db --- /dev/null +++ b/vaex/join-vaex.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python + +print("# join-vaex.py", flush=True) + +import os +import gc +import timeit +import vaex + +exec(open("./_helpers/helpers.py").read()) + +ver = vaex.__version__['vaex-core'] +git = '-' +task = "join" +solution = "vaex" +fun = ".join" +cache = "TRUE" +on_disk = "TRUE" + +data_name = os.environ['SRC_DATANAME'] +src_jn_x = os.path.join("data", data_name+".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") + +print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) + +print(src_jn_x, src_jn_y) +x = vaex.open(src_jn_x, convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v1": "Float32"}) +x.ordinal_encode('id1', inplace=True) +x.ordinal_encode('id2', inplace=True) + +small = vaex.open(src_jn_y[0], convert=True, dtype={"id1": "Int16", "v2": "Float32"}) +small.ordinal_encode('id1', inplace=True) +medium = vaex.open(src_jn_y[1], convert=True, dtype={"id1": "Int16", "id2": "Int32", "v2": "Float32"}) +medium.ordinal_encode('id1', inplace=True) +medium.ordinal_encode('id2', inplace=True) +big = vaex.open(src_jn_y[2], convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) +big.ordinal_encode('id1', inplace=True) +big.ordinal_encode('id2', inplace=True) + +in_rows = len(x) +print(in_rows, flush=True) +print(len(small), flush=True) +print(len(medium), flush=True) +print(len(big), flush=True) + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, how='inner', on='id1', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(small, how='inner', on='id1', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='left', on='id2', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id5', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(medium, how='inner', on='id5', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, how='inner', on='id3', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.join(big, how='inner', on='id3', rsuffix='_r') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0) diff --git a/vaex/upg-vaex.sh b/vaex/upg-vaex.sh new file mode 100755 index 00000000..e7795c31 --- /dev/null +++ b/vaex/upg-vaex.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +echo 'upgrading vaex...' + +source ./vaex/py-vaex/bin/activate + +python -m pip install -U vaex-core vaex-hdf5 > /dev/null diff --git a/vaex/ver-vaex.sh b/vaex/ver-vaex.sh new file mode 100755 index 00000000..280d89f1 --- /dev/null +++ b/vaex/ver-vaex.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +source ./pandas/py-pandas/bin/activate +python -c 'import vaex; open("vaex/VERSION","w").write(vaex.__version__["vaex-core"]); open("vaex/REVISION","w").write("alpha");' > /dev/null