diff --git a/packages/bigframes/bigframes/core/bytecode.py b/packages/bigframes/bigframes/core/bytecode.py new file mode 100644 index 000000000000..ba5071604da2 --- /dev/null +++ b/packages/bigframes/bigframes/core/bytecode.py @@ -0,0 +1,264 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dis +import operator +from types import ModuleType +from typing import Callable, Optional + +import bigframes.core.py_expressions as py_exprs +from bigframes.core import expression + + +class NullMarker: + pass + + +_BINARY_OP_MAP = { + "+": operator.add, + "-": operator.sub, + "*": operator.mul, + "/": operator.truediv, + "//": operator.floordiv, + "%": operator.mod, + "**": operator.pow, +} + +_COMPARE_OP_MAP = { + "==": operator.eq, + "!=": operator.ne, + "<": operator.lt, + "<=": operator.le, + ">": operator.gt, + ">=": operator.ge, +} + +_OLD_BINARY_OP_MAP = { + "BINARY_ADD": operator.add, + "INPLACE_ADD": operator.add, + "BINARY_SUBTRACT": operator.sub, + "INPLACE_SUBTRACT": operator.sub, + "BINARY_MULTIPLY": operator.mul, + "INPLACE_MULTIPLY": operator.mul, + "BINARY_TRUE_DIVIDE": operator.truediv, + "INPLACE_TRUE_DIVIDE": operator.truediv, + "BINARY_FLOOR_DIVIDE": operator.floordiv, + "INPLACE_FLOOR_DIVIDE": operator.floordiv, + "BINARY_MODULO": operator.mod, + "INPLACE_MODULO": operator.mod, + "BINARY_POWER": operator.pow, + "INPLACE_POWER": operator.pow, +} + + +def _compile_bytecode_to_py_expr(func: Callable) -> Optional[expression.Expression]: + try: + instructions = list(dis.get_instructions(func)) + except Exception: + return None + + stack = [] + globals_dict = func.__globals__ + import builtins + + builtins_dict = builtins.__dict__ + + closure_dict = {} + if func.__closure__: + free_vars = func.__code__.co_freevars + for var, cell in zip(free_vars, func.__closure__): + try: + closure_dict[var] = cell.cell_contents + except ValueError: + pass + + for inst in instructions: + opname = inst.opname + + if opname == "RESUME": + continue + + elif opname == "LOAD_FAST_LOAD_FAST": + var1, var2 = inst.argval + stack.append(expression.UnboundVariableExpression(var1)) + stack.append(expression.UnboundVariableExpression(var2)) + + elif opname.startswith("LOAD_FAST"): + stack.append(expression.UnboundVariableExpression(inst.argval)) + + elif opname in ("LOAD_CONST", "LOAD_SMALL_INT"): + stack.append(py_exprs.PyObject(inst.argval)) + + elif opname == "LOAD_GLOBAL": + name = inst.argval + found = False + val = None + if name in closure_dict: + val = closure_dict[name] + found = True + elif name in globals_dict: + val = globals_dict[name] + found = True + elif name in builtins_dict: + val = builtins_dict[name] + found = True + + if found: + if isinstance(val, ModuleType): + stack.append(py_exprs.Module(val)) + else: + stack.append(py_exprs.PyObject(val)) + else: + stack.append(expression.UnboundVariableExpression(name)) + + elif opname in ("LOAD_ATTR", "LOAD_METHOD"): + if not stack: + return None + target = stack.pop() + stack.append(py_exprs.GetAttr(target, inst.argval)) + if opname == "LOAD_METHOD": + if isinstance(target, py_exprs.Module): + stack.append(NullMarker) + else: + stack.append(target) + + elif opname == "PUSH_NULL": + stack.append(NullMarker) + + elif opname == "BINARY_OP": + if len(stack) < 2: + return None + right = stack.pop() + left = stack.pop() + op_symbol = inst.argrepr + if not op_symbol and isinstance(inst.argval, str): + op_symbol = inst.argval + if op_symbol and op_symbol.endswith("="): + op_symbol = op_symbol[:-1] + + if op_symbol not in _BINARY_OP_MAP: + return None + stack.append( + py_exprs.Call( + py_exprs.PyObject(_BINARY_OP_MAP[op_symbol]), (left, right) + ) + ) + + # Support older Python versions compatibility + elif opname in _OLD_BINARY_OP_MAP: + if len(stack) < 2: + return None + right = stack.pop() + left = stack.pop() + stack.append( + py_exprs.Call( + py_exprs.PyObject(_OLD_BINARY_OP_MAP[opname]), (left, right) + ) + ) + + elif opname == "COMPARE_OP": + if len(stack) < 2: + return None + right = stack.pop() + left = stack.pop() + op_symbol = inst.argval + if op_symbol not in _COMPARE_OP_MAP: + return None + stack.append( + py_exprs.Call( + py_exprs.PyObject(_COMPARE_OP_MAP[op_symbol]), (left, right) + ) + ) + + elif opname in ("UNARY_NEGATIVE", "UNARY_INVERT"): + if not stack: + return None + target = stack.pop() + stack.append( + py_exprs.Call( + py_exprs.PyObject( + operator.neg if opname == "UNARY_NEGATIVE" else operator.invert + ), + (target,), + ) + ) + + elif opname == "UNARY_POSITIVE": + if not stack: + return None + target = stack.pop() + stack.append(py_exprs.Call(py_exprs.PyObject(operator.pos), (target,))) + + elif opname == "CALL_INTRINSIC_1": + if inst.argrepr == "INTRINSIC_UNARY_POSITIVE": + if not stack: + return None + target = stack.pop() + stack.append(py_exprs.Call(py_exprs.PyObject(operator.pos), (target,))) + else: + return None + + elif opname in ("CALL", "CALL_FUNCTION", "CALL_METHOD"): + num_args = inst.arg + if len(stack) < num_args: + return None + args = [stack.pop() for _ in range(num_args)][::-1] + if stack and stack[-1] is NullMarker: + stack.pop() + elif ( + stack + and stack[-1] is not NullMarker + and isinstance(stack[-1], expression.Expression) + ): + self_arg = stack.pop() + args = [self_arg] + args + if not stack: + return None + callable_expr = stack.pop() + stack.append(py_exprs.Call(callable_expr, tuple(args))) + + elif opname == "RETURN_VALUE": + if not stack: + return None + return stack[-1] + + elif opname in ("STORE_FAST", "POP_TOP"): + if stack: + stack.pop() + + else: + return None + + return None + + +def dis_to_expr( + func: Callable, unpack_mode: bool = False +) -> Optional[expression.Expression]: + """ + Try to convert a python function to a BigQuery expression. + + Unpack mode is whether SQL columns are addressed as attributes of a single + python argument (e.g. row.col1), or as separate arguments (e.g. col1). + + This is "best effort" - if the function contains operations that cannot + be converted to BigQuery expressions, it will return None. + """ + try: + py_expr = _compile_bytecode_to_py_expr(func) + if py_expr is None: + return None + return py_exprs.resolve_py_exprs(py_expr, unpack_mode=unpack_mode) + except Exception: + return None diff --git a/packages/bigframes/bigframes/core/py_expressions.py b/packages/bigframes/bigframes/core/py_expressions.py new file mode 100644 index 000000000000..be26255534a9 --- /dev/null +++ b/packages/bigframes/bigframes/core/py_expressions.py @@ -0,0 +1,358 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +import itertools +from types import ModuleType +from typing import Callable, Hashable, Mapping, Tuple + +import bigframes.operations.python_op_maps as python_op_maps +from bigframes import dtypes +from bigframes.core import identifiers +from bigframes.core.expression import ( + Expression, + OpExpression, + UnboundVariableExpression, + const, +) +from bigframes.operations import NUMPY_TO_BINOP, NUMPY_TO_OP, generic_ops, numeric_ops + +_CALLABLE_TO_OP = { + **NUMPY_TO_OP, + **NUMPY_TO_BINOP, +} + +_BUILTIN_CALLABLES = { + str: generic_ops.AsTypeOp(dtypes.STRING_DTYPE), + abs: numeric_ops.abs_op, +} + + +@dataclasses.dataclass(frozen=True) +class GetAttr(Expression): + input: Expression + attr: str + + @property + def column_references( + self, + ) -> Tuple[identifiers.ColumnId, ...]: + return self.input.column_references + + @property + def free_variables(self) -> set[str]: + return self.input.free_variables + + @property + def is_const(self) -> bool: + return False + + @property + def children(self): + return (self.input,) + + @property + def nullable(self) -> bool: + return True + + @property + def is_resolved(self) -> bool: + return False + + @property + def output_type(self) -> dtypes.ExpressionType: + raise ValueError(f"Type of expression {self} has not been fixed.") + + @property + def is_bijective(self) -> bool: + # TODO: Mark individual functions as bijective? + return False + + @property + def deterministic(self) -> bool: + return True + + def transform_children(self, t: Callable[[Expression], Expression]) -> Expression: + new_input = t(self.input) + if new_input != self.input: + return dataclasses.replace(self, input=new_input) + return self + + def bind_variables( + self, bindings: Mapping[str, Expression], allow_partial_bindings: bool = False + ) -> GetAttr: + return GetAttr( + self.input.bind_variables( + bindings, allow_partial_bindings=allow_partial_bindings + ), + self.attr, + ) + + def bind_refs( + self, + bindings: Mapping[identifiers.ColumnId, Expression], + allow_partial_bindings: bool = False, + ) -> GetAttr: + return GetAttr( + self.input.bind_refs( + bindings, allow_partial_bindings=allow_partial_bindings + ), + self.attr, + ) + + +@dataclasses.dataclass(frozen=True) +class Module(Expression): + """An expression representing a module reference.""" + + module: ModuleType + + @property + def is_const(self) -> bool: + return True + + @property + def column_references(self) -> Tuple[identifiers.ColumnId, ...]: + return () + + @property + def nullable(self) -> bool: + return True # type: ignore + + @property + def is_resolved(self) -> bool: + return False + + @property + def output_type(self) -> dtypes.ExpressionType: + raise ValueError("Module expression does not have a type.") + + def bind_variables( + self, bindings: Mapping[str, Expression], allow_partial_bindings: bool = False + ) -> Expression: + return self + + def bind_refs( + self, + bindings: Mapping[identifiers.ColumnId, Expression], + allow_partial_bindings: bool = False, + ) -> Module: + return self + + @property + def is_bijective(self) -> bool: + # () <-> value + return True + + def transform_children(self, t: Callable[[Expression], Expression]) -> Expression: + return self + + +@dataclasses.dataclass(frozen=True) +class PyObject(Expression): + """An expression representing a module reference.""" + + value: Hashable + + @property + def is_const(self) -> bool: + return True + + @property + def column_references(self) -> Tuple[identifiers.ColumnId, ...]: + return () + + @property + def nullable(self) -> bool: + return True # type: ignore + + @property + def is_resolved(self) -> bool: + return False + + @property + def output_type(self) -> dtypes.ExpressionType: + raise ValueError("PyObject expression does not have a type.") + + def bind_variables( + self, bindings: Mapping[str, Expression], allow_partial_bindings: bool = False + ) -> Expression: + return self + + def bind_refs( + self, + bindings: Mapping[identifiers.ColumnId, Expression], + allow_partial_bindings: bool = False, + ) -> PyObject: + return self + + @property + def is_bijective(self) -> bool: + # () <-> value + return True + + def transform_children(self, t: Callable[[Expression], Expression]) -> Expression: + return self + + +@dataclasses.dataclass(frozen=True) +class Call(Expression): + """An expression representing a scalar constant.""" + + # TODO: Further constrain? + callable: Expression + inputs: Tuple[Expression, ...] + + @property + def column_references( + self, + ) -> Tuple[identifiers.ColumnId, ...]: + return tuple( + itertools.chain.from_iterable( + map(lambda x: x.column_references, self.children) + ) + ) + + @property + def free_variables(self) -> set[str]: + return set( + itertools.chain.from_iterable( + map(lambda x: x.free_variables, self.children) + ) + ) + + @property + def is_const(self) -> bool: + return False + + @property + def children(self): + return (self.callable, *self.inputs) + + @property + def nullable(self) -> bool: + return True + + @property + def is_resolved(self) -> bool: + return False + + @property + def output_type(self) -> dtypes.ExpressionType: + raise ValueError(f"Type of expression {self} has not been fixed.") + + @property + def is_bijective(self) -> bool: + # TODO: Mark individual functions as bijective? + return False + + @property + def deterministic(self) -> bool: + return True + + def transform_children(self, t: Callable[[Expression], Expression]) -> Expression: + return dataclasses.replace( + self, + callable=t(self.callable), + inputs=tuple(t(input) for input in self.inputs), + ) + + def bind_variables( + self, bindings: Mapping[str, Expression], allow_partial_bindings: bool = False + ) -> Call: + return Call( + callable=self.callable.bind_variables( + bindings, allow_partial_bindings=allow_partial_bindings + ), + inputs=tuple( + input.bind_variables( + bindings, allow_partial_bindings=allow_partial_bindings + ) + for input in self.inputs + ), + ) + + def bind_refs( + self, + bindings: Mapping[identifiers.ColumnId, Expression], + allow_partial_bindings: bool = False, + ) -> Call: + return Call( + callable=self.callable.bind_refs( + bindings, allow_partial_bindings=allow_partial_bindings + ), + inputs=tuple( + input.bind_refs(bindings, allow_partial_bindings=allow_partial_bindings) + for input in self.inputs + ), + ) + + +# TODO: Mode that resolves free variable attrs as columns +def resolve_py_exprs(expression: Expression, unpack_mode: bool = False) -> Expression: + """Replace all PyObject, attribute, call expressions. Bottom-up.""" + + def resolve_expr_if_call(expression: Expression) -> Expression: + if isinstance(expression, Call): + return resolve_call(expression) + return expression + + # this function assumes attrs that become callables have been resolved + # also, we don't yet handle resolving attrs that are column accesses + def resolve_attrs(expression: Expression) -> Expression: + if isinstance(expression, GetAttr): + if isinstance(expression.input, Module): + # resolves things like Math.pi + return PyObject(getattr(expression.input.module, expression.attr)) + if not unpack_mode and isinstance( + expression.input, UnboundVariableExpression + ): + return UnboundVariableExpression(expression.attr) + return expression + + def resolve_pyobjs(expression: Expression) -> Expression: + if isinstance(expression, PyObject): + return const(expression.value) + return expression + + wo_calls = expression.bottom_up(resolve_expr_if_call) + wo_attrs = wo_calls.bottom_up(resolve_attrs) + wo_pyobjs = wo_attrs.bottom_up(resolve_pyobjs) + return wo_pyobjs + + +def resolve_call(call: Call) -> Expression: + callable = call.callable + if isinstance(callable, GetAttr): + attr = callable.attr + if isinstance(callable.input, Module): + fn = getattr(callable.input.module, attr) + if fn in python_op_maps.PYTHON_TO_BIGFRAMES: + op = python_op_maps.PYTHON_TO_BIGFRAMES[fn] + return OpExpression(op, call.inputs) + if fn in _CALLABLE_TO_OP: + op = _CALLABLE_TO_OP[fn] + return OpExpression(op, call.inputs) + elif isinstance(callable, PyObject): + if callable.value in python_op_maps.PYTHON_TO_BIGFRAMES: + op = python_op_maps.PYTHON_TO_BIGFRAMES[callable.value] + return OpExpression(op, call.inputs) + if callable.value in _BUILTIN_CALLABLES: + return OpExpression(_BUILTIN_CALLABLES[callable.value], call.inputs) + + raise NotImplementedError( + f"No implementation available for call expression: {call}" + ) diff --git a/packages/bigframes/tests/unit/core/test_bytecode.py b/packages/bigframes/tests/unit/core/test_bytecode.py new file mode 100644 index 000000000000..08903c8b2ba8 --- /dev/null +++ b/packages/bigframes/tests/unit/core/test_bytecode.py @@ -0,0 +1,89 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math + +import bigframes.core.expression as ex +import bigframes.operations as ops +from bigframes.core.bytecode import dis_to_expr + + +def test_dis_to_expr_simple_arithmetic(): + func = lambda row: row.x + 1 + expr = dis_to_expr(func, unpack_mode=False) + assert expr is not None + + expected = ops.add_op.as_expr(ex.free_var("x"), ex.const(1)) + assert expr == expected + + +def test_dis_to_expr_unpack_mode(): + func = lambda col1, col2: col1 * col2 + expr = dis_to_expr(func, unpack_mode=True) + assert expr is not None + + expected = ops.mul_op.as_expr(ex.free_var("col1"), ex.free_var("col2")) + assert expr == expected + + +def test_dis_to_expr_math_function(): + func = lambda row: math.sin(row.x) + expr = dis_to_expr(func, unpack_mode=False) + assert expr is not None + + expected = ops.numeric_ops.sin_op.as_expr(ex.free_var("x")) + assert expr == expected + + +def test_dis_to_expr_negation(): + func = lambda row: -row.x + expr = dis_to_expr(func, unpack_mode=False) + assert expr is not None + + expected = ops.numeric_ops.neg_op.as_expr(ex.free_var("x")) + assert expr == expected + + +def test_dis_to_expr_comparison(): + func = lambda row: row.x == row.y + expr = dis_to_expr(func, unpack_mode=False) + assert expr is not None + + expected = ops.comparison_ops.eq_op.as_expr(ex.free_var("x"), ex.free_var("y")) + assert expr == expected + + +def test_dis_to_expr_unsupported(): + # Control flow or unsupported structures should return None + def func_with_loop(row): + res = 0 + for val in range(int(row.x)): + res += val + return res + + expr = dis_to_expr(func_with_loop, unpack_mode=False) + assert expr is None + + +global_none_val = None + + +def test_dis_to_expr_global_none(): + # Test resolving a global variable explicitly set to None + func = lambda row: row.x == global_none_val + expr = dis_to_expr(func, unpack_mode=False) + assert expr is not None + + expected = ops.comparison_ops.eq_op.as_expr(ex.free_var("x"), ex.const(None)) + assert expr == expected