-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#6301: Simplify usage of algebra operators to define custom functions #6302
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
@@ -297,7 +297,7 @@ def register( | |||
""" | |||
|
|||
def caller( | |||
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs | |||
query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the order so the function's positional arguments won't conflict with the keyword broadcast
arg.
------- | ||
The same type as `df`. | ||
""" | ||
from modin.pandas import Series |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like that inner layers depend on upper layers. I do not see any benefit of introducing these changes rather than it will simplify registriation of a UDF for users, which doesn't happen very often from my point of view. I would like to bring more attention to these changes to decide whether we want these changes to be merged or not.
cc @modin-project/modin-core
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @YarShev. I'd rather avoid the dependency on a higher layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can make the .apply()
method a separate function and place it somewhere at modin.pandas.utils
, the function would look something like:
# modin/pandas/utils.py
def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
res_qc = operator_cls.register(func)(df._query_compiler, *args, **kwargs)
return type(df)(query_compiler=res_qc)
# the usage then would be:
from modin.pandas.utils import apply_operator
from modin.core.dataframe.algebra import Reduce
res_df = apply_operator(df, Reduce, func=reduce_func, axis=1)
One of the problem I see here is that managing operators-dependent behavior can be quite a pain here, as we can no longer use OOP mechanisms to align with operator-specific logic using inheritance and overriding:
# modin/pandas/utils.py
def _apply_reduce_op(...):
...
def _apply_groupby_op(...):
...
...
_operators_dict = {
Reduce: _apply_reduce_op,
GroupbyReduce: _apply_groupby_op,
...
}
def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
return _operators_dict[operator_cls](df, func, *args, **kwargs)
Do you have any other suggestion on how to improve this approach (or maybe you have another approach in mind)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchigarev to me the current approach for using the operators doesn't look very verbose to me. will this PR make it much easier to use the operators anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this PR make it much easier to use the operators anywhere
The PR should make the usage of operators much easier for end-users of modin who would like to define their own distributed functions using modin's operators.
While optimizing customer workloads for modin we sometimes see places that would perform much better if rewritten from pandas API using modin's operators, however the present API the operators provide causes a lot of complex code written around that customers struggle to understand. That inspired us to create a simple method/function that makes operator's usage as simple as calling one single function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like that inner layers depend on upper layers.
So I made two versions of how we can eliminate this dependency.
-
Avoid importing objects from
modin.pandas...
to the algebra level, but still allow passing such objects to theOperator.apply()
. This way we're getting rid of the 'import dependency' on the higher level, meaning that we can easily detach the algebra layer if needed without worrying that it would require to bear stuff from the higher levels for the algebra to work correctly.I've made the changes to align with this approach and pushed it to the branch from this PR.
-
Also avoid passing dataframe objects to the
Operator.apply()
and rework this method to accept query compilers only. Then add a helper function somewhere to the dataframe level that would take series/dataframes, extract their QCs, and pass to them theOperator.apply()
.I've implemented this approach in a separate branch. There, users have a function at
modin.pandas.utils.apply_operator
with the following signature:def apply_operator(operator_cls, *args, **kwargs): pass ... # use case example from modin.pandas.utils import apply_operator from modin.core.dataframe.algebra import Reduce series_obj = apply_operator(Reduce, df, reduce_func, axis=1)
I don't really like this approach as
apply_operator()
doesn't provide meaningful signature and requires referring to theOperator.apply()
for the list of allowed parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mvashishtha @YarShev thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchigarev I wonder if you could provide an example of user-defined Modin operator (ideally a real case, even if simplified and anonymized)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could provide an example of user-defined Modin operator
What we usually use the user-defined operators for is to emulate lazy execution for those types of transformations that can't be written into a one or two pandas API calls (usually such transformations are performed in a for-loop).
As an example, consider that we have a dataframe with multiple string columns, and then we want to combine those columns into a single one using the specified separator. Surprisingly, but the fastest way to do so using vanilla pandas is simply writing a for-loop:
combining multiple string columns using different approaches in pandas:
for-loop: 17.533942513167858
df.apply(join): 21.597900850698352
df.str.cat(): 38.55254930164665
pandas code
import pandas as pd
import numpy as np
from timeit import default_timer as timer
NCOLS = 16
NROWS = 5_000_000
df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})
t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
res += "_" + df[col]
print(f"for-loop: {timer() - t1}")
t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")
t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")
Then when adapting this code to modin, it appears that the for-loop approach works very slow due to a lot of kernels being submitted to Ray and so causing it to overwhelm (each iteration of the for-loop will result into 3 separate kernels: 1. df[col]
; 2. "_" + df[col]
; 3. res +=
). And then it appears, that the most performant approach is to submit this for-loop as a single kernel using the Reduction operator:
combining multiple string columns using different approaches in modin:
reduction operator: 2.6848975336179137
batch pipeline API: 2.945119895040989
for-loop: 36.92861177679151
df.apply(join): 8.54124379903078
df.str.cat(): 43.84469765238464
modin code
import modin.pandas as pd
import modin.config as cfg
import numpy as np
from timeit import default_timer as timer
cfg.BenchmarkMode.put(True)
# start all the workers
pd.DataFrame([np.arange(cfg.MinPartitionSize.get()) for _ in range(cfg.NPartitions.get() ** 2)]).to_numpy()
NCOLS = 16
NROWS = 5_000_000
df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})
from modin.core.dataframe.algebra import Reduce
def reduction(df):
res = df.iloc[:, 0]
for col in df.columns[1:]:
res += "_" + df[col]
return res
t1 = timer()
res = Reduce.apply(df, reduction, axis=1)
print(f"reduction operator: {timer() - t1}")
from modin.experimental.batch import PandasQueryPipeline
t1 = timer()
pipeline = PandasQueryPipeline(df)
pipeline.add_query(reduction, is_output=True)
res = pipeline.compute_batch()
print(f"batch pipeline API: {timer() - t1}")
t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
res += "_" + df[col]
print(f"for-loop: {timer() - t1}")
t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")
t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")
(as I was writing this comment, I found out about the batch API in modin that is supposed to serve exactly the same purpose of "emulating" the lazy execution. However, it seems that it doesn't provide a way to specify the scheme on how the kernels actually should be submitted (map, row-wise, column-wise, ...) and also have some slight overhead when comparing with the pure user-defined operator's approach)
left : modin.pandas.DataFrame or modin.pandas.Series | ||
Left operand. | ||
right : modin.pandas.DataFrame or modin.pandas.Series | ||
Right operand. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This layer depends on the upper layer in every apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lower layer still takes the object(s) from the API layer, namely, modin.pandas.DataFrame/Series. Then, there is a func
, which takes pandas.DataFrame/Series. Also, there is a kwargs argument that needs to be passed to the cls.register()
. What is cls.register
for the user? Doesn't this look a little complicated for the user? So many things to understand. I am also thinking that we are trying to simplify the things not for the user, but for us ourselves, when we are re-writing a customer workload to get better performance.
Signed-off-by: Dmitry Chigarev <[email protected]>
Left operand. | ||
right : modin.pandas.DataFrame or modin.pandas.Series | ||
Right operand. | ||
func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the signature is wrong here, as the implementation explicitly passes Query Compilers as arguments...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, func
here is a kernel that will be applied to deserialized partitions (pandas dataframes) so the signature is correct
follow the track of the func
:
register(func)
register(func)
->modin_dataframe.apply_full_axis(func, ...
)
------- | ||
The same type as `df`. | ||
""" | ||
operator = cls.register(func, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what is the purpose of .register
for a one-off thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, that's the only way of how we can get the caller
function
Signed-off-by: Dmitry Chigarev <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchigarev so this is a way to implement some user-defined functions, right? I wonder if we can actually try to take the leaf out of pandas/numpy book the way they support UDF-s...
What do these changes do?
This PR introduces the
Operator.apply(...)
method that takes and outputsmodin.pandas.DataFrame
, and is able to apply the specified function using the operator's scheme directly to high-level dataframes.flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date