-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Vaex #243
Open
maartenbreddels
wants to merge
6
commits into
h2oai:master
Choose a base branch
from
vaexio:vaex
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Add Vaex #243
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
c1285ae
Vaex solution
maartenbreddels a8f241f
refactor the vaex-groupby scipt
JovanVeljanoski 13d7700
Refactor the join-vaex script to make it more readable
JovanVeljanoski 18f0ba5
Refactor a bit
maartenbreddels 17ee4f1
refactor to comply with repo standards
JovanVeljanoski 06d93eb
small improvements
JovanVeljanoski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,4 +25,6 @@ polars,join | |
arrow,groupby | ||
arrow,join | ||
duckdb,groupby | ||
duckdb,join | ||
duckdb,join | ||
vaex,groupby | ||
vaex,join |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
#!/usr/bin/env python | ||
|
||
print("# groupby-vaex.py", flush=True) | ||
|
||
import os | ||
import gc | ||
import timeit | ||
import vaex | ||
|
||
exec(open("./_helpers/helpers.py").read()) | ||
|
||
ver = vaex.__version__['vaex-core'] | ||
git = '-' | ||
task = "groupby" | ||
solution = "vaex" | ||
fun = ".groupby" | ||
cache = "TRUE" | ||
on_disk = "TRUE" | ||
|
||
data_name = os.environ['SRC_DATANAME'] | ||
src_grp = os.path.join("data", data_name+".csv") | ||
print("loading dataset %s" % data_name, flush=True) | ||
|
||
x = vaex.open(src_grp, convert=True, dtype={"id4":"Int8", "id5":"Int8", "id6":"Int32", "v1":"Int8", "v2":"Int8"}) | ||
print("loaded dataset") | ||
with vaex.cache.disk(): | ||
x.ordinal_encode('id1', inplace=True) | ||
x.ordinal_encode('id2', inplace=True) | ||
x.ordinal_encode('id3', inplace=True) | ||
print(x.dtypes) | ||
|
||
|
||
# Questions | ||
def question_1(): | ||
return x.groupby(['id1'], agg={'v1': 'sum'}) | ||
|
||
|
||
def question_2(): | ||
return x.groupby(['id1', 'id2'], agg={'v1': 'sum'}) | ||
|
||
|
||
def question_3(): | ||
return x.groupby(['id3'], agg={'v1': 'sum', 'v3': 'mean'}) | ||
|
||
|
||
def question_4(): | ||
return x.groupby(['id4'], agg={'v1':'mean', 'v2':'mean', 'v3':'mean'}) | ||
|
||
|
||
def question_5(): | ||
return x.groupby(['id6'], agg={'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) | ||
|
||
|
||
def question_6(): | ||
return x.groupby(['id4','id5']).agg({'v3': ['median','std']}) | ||
|
||
|
||
def question_7(): | ||
return x.groupby(['id3']).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] | ||
|
||
|
||
def question_8(): | ||
return x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6']).head(2) | ||
|
||
|
||
def question_9(): | ||
return x[['id2','id4','v1','v2']].groupby(['id2','id4']).apply(lambda x: vaex.Series({'r2': x.corr()['v1']['v2']**2})) | ||
|
||
|
||
def question_10(): | ||
return x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}) | ||
|
||
# Generic benchmark function - to improve code readability | ||
def benchmark(func, question, chk_sum_cols): | ||
with vaex.progress.tree(title=question) as progress: | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
with progress.add("iter1"): | ||
ans = func() | ||
if verbose: | ||
print(ans.shape, flush=True) | ||
t = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
t_start = timeit.default_timer() | ||
chk = [ans[col].sum() for col in chk_sum_cols] | ||
chkt = timeit.default_timer() - t_start | ||
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
with progress.add("iter2"): | ||
ans = func() | ||
if verbose: | ||
print(ans.shape, flush=True) | ||
t = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
t_start = timeit.default_timer() | ||
chk = [ans[col].sum() for col in chk_sum_cols] | ||
chkt = timeit.default_timer() - t_start | ||
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) | ||
if verbose: | ||
print(ans.head(3), flush=True) | ||
print(ans.tail(3), flush=True) | ||
del ans | ||
|
||
|
||
task_init = timeit.default_timer() | ||
print("grouping...", flush=True) | ||
|
||
benchmark(question_1, question="sum v1 by id1", chk_sum_cols=['v1']) | ||
benchmark(question_2, question="sum v1 by id1:id2", chk_sum_cols=['v1']) | ||
benchmark(question_3, question="sum v1 mean v3 by id3", chk_sum_cols=['v1', 'v3']) | ||
benchmark(question_4, question="mean v1:v3 by id4", chk_sum_cols=['v1', 'v2', 'v3']) | ||
benchmark(question_5, question="sum v1:v3 by id6", chk_sum_cols=['v1', 'v2', 'v3']) | ||
# benchmark(question_6, question="median v3 sd v3 by id4 id5", chk_sum_cols=['v3_median', 'v3_std']) | ||
# benchmark(question_7, question="max v1 - min v2 by id3", chk_sum_cols=['range_v1_v2'])с | ||
# benchmark(question_8, question="largest two v3 by id6", chk_sum_cols=['v3']) | ||
# benchmark(question_9, question="regression v1 v2 by id2 id4", chk_sum_cols=['r2']) | ||
# benchmark(question_10, question="sum v3 count by id1:id6", chk_sum_cols=['v3', 'v1']) | ||
|
||
print("grouping finished, took %0.3fs" % (timeit.default_timer()-task_init), flush=True) | ||
|
||
exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
#!/usr/bin/env python | ||
|
||
print("# join-vaex.py", flush=True) | ||
|
||
import os | ||
import gc | ||
import timeit | ||
import vaex | ||
|
||
exec(open("./_helpers/helpers.py").read()) | ||
|
||
ver = vaex.__version__['vaex-core'] | ||
git = 'alpha' | ||
task = "join" | ||
solution = "vaex" | ||
fun = ".join" | ||
cache = "TRUE" | ||
on_disk = "FALSE" | ||
|
||
data_name = os.environ['SRC_DATANAME'] | ||
src_jn_x = os.path.join("data", data_name+".csv") | ||
y_data_name = join_to_tbls(data_name) | ||
src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")] | ||
if len(src_jn_y) != 3: | ||
raise Exception("Something went wrong in preparing files used for join") | ||
|
||
print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) | ||
|
||
print(src_jn_x, src_jn_y) | ||
x = vaex.open(src_jn_x, convert=True, dtype={"id1": "Int8", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) | ||
x.ordinal_encode('id1', inplace=True) | ||
x.ordinal_encode('id2', inplace=True) | ||
x.ordinal_encode('id3', inplace=True) | ||
|
||
|
||
small = vaex.open(src_jn_y[0], convert=True, dtype={"id1": "Int16", "v2": "Float32"}) | ||
small.ordinal_encode('id1', inplace=True) | ||
medium = vaex.open(src_jn_y[1], convert=True, dtype={"id1": "Int16", "id2": "Int32", "v2": "Float32"}) | ||
medium.ordinal_encode('id1', inplace=True) | ||
medium.ordinal_encode('id2', inplace=True) | ||
big = vaex.open(src_jn_y[2], convert=True, dtype={"id1": "Int16", "id2": "Int32", "id3": "Int32", "v2": "Float32"}) | ||
big.ordinal_encode('id1', inplace=True) | ||
big.ordinal_encode('id2', inplace=True) | ||
big.ordinal_encode('id3', inplace=True) | ||
|
||
print(len(x), flush=True) | ||
print(len(small), flush=True) | ||
print(len(medium), flush=True) | ||
print(len(big), flush=True) | ||
|
||
def benchmark(func, question, chk_sum_cols): | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = func() | ||
print(ans.shape, flush=True) | ||
t = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
t_start = timeit.default_timer() | ||
chk = [ans[col].sum() for col in chk_sum_cols] | ||
chkt = timeit.default_timer() - t_start | ||
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = func() | ||
t = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
t_start = timeit.default_timer() | ||
chk = [ans[col].sum() for col in chk_sum_cols] | ||
chkt = timeit.default_timer() - t_start | ||
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) | ||
print(ans.head(3), flush=True) | ||
print(ans.tail(3), flush=True) | ||
del ans | ||
|
||
|
||
def question_1(): | ||
return x.join(small, how='inner', on='id1', rsuffix='_r') | ||
|
||
|
||
def question_2(): | ||
return x.join(medium, how='inner', on='id2', rsuffix='_r') | ||
|
||
|
||
def question_3(): | ||
return x.join(medium, how='left', on='id2', rsuffix='_r') | ||
|
||
|
||
def question_4(): | ||
return x.join(medium, how='inner', on='id5', rsuffix='_r') | ||
|
||
|
||
def question_5(): | ||
return x.join(big, how='inner', on='id3', rsuffix='_r') | ||
|
||
|
||
task_init = timeit.default_timer() | ||
print("joining...", flush=True) | ||
|
||
benchmark(question_1, question="small inner on int", chk_sum_cols=['v1', 'v2']) | ||
benchmark(question_2, question="medium inner on int", chk_sum_cols=['v1', 'v2']) | ||
benchmark(question_3, question="medium outer on int", chk_sum_cols=['v1', 'v2']) | ||
benchmark(question_4, question="medium inner on factor", chk_sum_cols=['v1', 'v2']) | ||
benchmark(question_5, question="big inner on int", chk_sum_cols=['v1', 'v2']) | ||
|
||
print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) | ||
|
||
exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
#!/bin/bash | ||
set -e | ||
|
||
echo 'upgrading vaex...' | ||
|
||
source ./vaex/py-vaex/bin/activate | ||
|
||
python -m pip install -U vaex-core vaex-hdf5 > /dev/null |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/bash | ||
set -e | ||
|
||
source ./pandas/py-pandas/bin/activate | ||
python -c 'import vaex; open("vaex/VERSION","w").write(vaex.__version__["vaex-core"]); open("vaex/REVISION","w").write("alpha");' > /dev/null |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the goals was to make scripts be matching between languages/solutions. Using helpers makes it more blurry and less reproducible interactively line by line. It has been proposed already at least twice before, in python datatable and Julia DF. I see no way to satisfy those goals and keep using helpers. Helpers would have to be the same in each language, and that requires investigation to compare overhead they would make in each language. Therefore I would suggest to keep the structure quite the same as the other solutions are written now.
Note that I am not longer maintaining the project as I am no longer an employee of h2o.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jangorecki, we will do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jangorecki
We refactored the code, so now it should be the exact same style as the rest of the repo.
I appreciate that you are not longer employed by h2o and not actively maintaining the project, but can you possibly point us to who should we contact regarding any further reviews, actions etc.
Thank you, and thank you for putting this all together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure but @srini-x had some recent activity in this repo, so possibly he could help on that. Otherwise contacting h2o support or twitter should work.