A pure Python implementation of FlowQuery, a declarative OpenCypher-based query language for virtual graphs and data processing pipelines. This package has full functional fidelity with the TypeScript version.
pip install flowqueryStart the interactive REPL:
flowqueryimport asyncio
from flowquery import Runner
runner = Runner("WITH 1 as x RETURN x + 1 as result")
asyncio.run(runner.run())
print(runner.results) # [{'result': 2}]In Jupyter notebooks, you can use await directly:
from flowquery import Runner
runner = Runner("WITH 1 as x RETURN x + 1 as result")
await runner.run()
print(runner.results) # [{'result': 2}]- Language Reference (clauses, expressions, functions, graph operations, and more)
- Quick Cheat Sheet
- Full Documentation
- Contributing Guide
- Virtual Graph Demo Notebook — a demo of virtual graph capabilities and custom function extensibility
The query language itself is identical between the TypeScript and Python versions. The only difference is that custom functions are written in Python here instead of TypeScript.
Scalar functions operate on individual values and return a result:
from flowquery.extensibility import Function, FunctionDef
@FunctionDef({
"description": "Doubles a number",
"category": "scalar",
"parameters": [{"name": "value", "description": "Number to double", "type": "number"}],
"output": {"description": "Doubled value", "type": "number"},
})
class Double(Function):
def __init__(self):
super().__init__("double")
self._expected_parameter_count = 1
def value(self):
return self.get_children()[0].value() * 2Once defined, use it in your queries:
WITH 5 AS num RETURN double(num) AS result
// Returns: [{"result": 10}]from flowquery.extensibility import Function, FunctionDef
@FunctionDef({
"description": "Reverses a string",
"category": "scalar",
"parameters": [{"name": "text", "description": "String to reverse", "type": "string"}],
"output": {"description": "Reversed string", "type": "string"},
})
class StrReverse(Function):
def __init__(self):
super().__init__("strreverse")
self._expected_parameter_count = 1
def value(self) -> str:
return str(self.get_children()[0].value())[::-1]Usage:
WITH 'hello' AS s RETURN strreverse(s) AS reversed
// Returns: [{"reversed": "olleh"}]Aggregate functions process multiple values and return a single result. They require a ReducerElement to track state:
from flowquery.extensibility import AggregateFunction, FunctionDef, ReducerElement
class MinReducerElement(ReducerElement):
def __init__(self):
self._value = None
@property
def value(self):
return self._value
@value.setter
def value(self, val):
self._value = val
@FunctionDef({
"description": "Collects the minimum value",
"category": "aggregate",
"parameters": [{"name": "value", "description": "Value to compare", "type": "number"}],
"output": {"description": "Minimum value", "type": "number"},
})
class MinValue(AggregateFunction):
def __init__(self):
super().__init__("minvalue")
self._expected_parameter_count = 1
def reduce(self, element):
current = self.first_child().value()
if element.value is None or current < element.value:
element.value = current
def element(self):
return MinReducerElement()Usage:
UNWIND [5, 2, 8, 1, 9] AS num RETURN minvalue(num) AS min
// Returns: [{"min": 1}]Async providers allow you to create custom data sources that can be used with LOAD JSON FROM:
from flowquery.extensibility import AsyncFunction, FunctionDef
@FunctionDef({
"description": "Provides example data for testing",
"category": "async",
"parameters": [],
"output": {"description": "Example data object", "type": "object"},
})
class GetExampleData(AsyncFunction):
def __init__(self):
super().__init__("getexampledata")
self._expected_parameter_count = 0
async def generate(self):
yield {"id": 1, "name": "Alice"}
yield {"id": 2, "name": "Bob"}Usage:
LOAD JSON FROM getexampledata() AS data RETURN data.id AS id, data.name AS name
// Returns: [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]Custom functions integrate seamlessly with FlowQuery expressions and can be combined with other functions:
// Using custom function with expressions
WITH 5 * 3 AS num RETURN addhundred(num) + 1 AS result
// Using multiple custom functions together
WITH 2 AS num RETURN triple(num) AS tripled, square(num) AS squaredYou can use the built-in functions() function to discover registered functions including your custom ones:
WITH functions() AS funcs
UNWIND funcs AS f
WITH f WHERE f.name = 'double'
RETURN f.name AS name, f.description AS description, f.category AS categoryMIT License - see LICENSE for details.