Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Python UDF types for RACO compatibility #885

Open
BrandonHaynes opened this issue Mar 7, 2017 · 8 comments
Open

Modify Python UDF types for RACO compatibility #885

BrandonHaynes opened this issue Mar 7, 2017 · 8 comments
Assignees

Comments

@BrandonHaynes
Copy link
Member

We discussed modifying the Python UDF functionality in the following ways:

  • Modify apply-based Python UDFs to be of type Namedtuple->Tuple, flatmap-based Python UDFs to be of type Namedtuple->Tuple list, and aggregates to type Namedtuple list->state->state
  • Modify the catalog to store the type of UDF being registered (apply, flatmap, aggregate) and expose in REST API
@senderista
Copy link
Contributor

Does an aggregate ever actually need to be passed an input state, or does it just output a state (assuming that the Python worker is always passed all the tuples associated with the state it is computing, in a single batch)?

@BrandonHaynes
Copy link
Member Author

You're probably right, assuming that @parmitam isn't using any of the state initialization logic for anything meaningful.

@BrandonHaynes
Copy link
Member Author

Is it possible to just infer the flatmap UDF case by examining the value returned during each invocation? E.g.:

result = udf(tuple)
if isinstance(result, list):
    do_flatmap_stuff(result)
elif isinstance(result, tuple):
    do_apply_stuff(result)
else:
    raise SomeException()

@parmitam
Copy link
Contributor

parmitam commented Mar 8, 2017

not really.
A python udf may return a list or tuple object ( which is then pickled and saved as blob type attribute).

@parmitam
Copy link
Contributor

parmitam commented Mar 8, 2017

Also, state initializing logic just initialized the state variable, which is then passed to the python udf as part of the input tuple. So we'd have input tuple -> output tuple ( apply); input tuple-> multiple tuples(flatmap); input tuplebatch->output tuple( agg)
input tuple is a named tuple for each of the above.

@senderista
Copy link
Contributor

I don't quite understand what is meant by "the state variable...is then passed to the python udf as part of the input tuple". For aggregates, isn't the state always a single tuple, distinct from the input tuples that belong to the aggregate group?

@parmitam
Copy link
Contributor

parmitam commented Mar 8, 2017

uda foo(img){
--init
[0 as _a, null as _val];
--update
[_a+1,
pySum(img)];
--output
[pyMean(_val, _a)];
};

T1 = SCAN(Images);
MeanImg = [FROM T1 EMIT foo(T1.img) as meanImg, T1.subjId, T1.imgId];

tuple passed to pySum is (_val, img) where _val=null; and output is _val ( which is whatever pySum returns)

tuple sent to pyMean is (_val, _a)

@senderista
Copy link
Contributor

Oh, maybe the confusion here is that there is only one input tuple in each TupleBatch? I think I recall you mentioning that before when describing your image processing pipeline. So maybe in that case it makes sense to pass in a single tuple to the Python aggregator consisting of the one input tuple in the batch and the aggregator's initial state, but what about the general case? Don't we need two arguments, one for a single tuple representing the aggregator's initial state, and one for the input TupleBatch? Also, what about cases where the number of input tuples associated with a group exceeds the maximum size of a TupleBatch? In that case, do successive calls to the Python aggregator pass in the current state with the next TupleBatch of input tuples?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants