RushDB is an instant database for modern apps and DS/ML ops built on top of Neo4j. It automates data normalization, manages relationships, and infers data types.
pip install rushdb
from rushdb import RushDB
# Initialize the client
db = RushDB("RUSHDB_API_KEY")
# Create a record
user = db.records.create(
label="USER",
data={
"name": "John Doe",
"email": "[email protected]",
"age": 30
}
)
# Find records
result = db.records.find({
"where": {
"age": {"$gte": 18},
"name": {"$startsWith": "J"}
},
"limit": 10
})
# Work with SearchResult
print(f"Found {len(result)} records out of {result.total} total")
# Iterate over results
for record in result:
print(f"User: {record.get('name')} (Age: {record.get('age')})")
# Check if there are more results
if result.has_more:
print("There are more records available")
# Access specific records
first_user = result[0] if result else None
# Create relationships
company = db.records.create(
label="COMPANY",
data={"name": "Acme Inc."}
)
# Attach records with a relationship
user.attach(
target=company,
options={"type": "WORKS_AT", "direction": "out"}
)
RushDB automatically normalizes nested objects into a graph structure:
# Push nested JSON with automatic relationship creation
db.records.create_many("COMPANY", {
"name": "Google LLC",
"rating": 4.9,
"DEPARTMENT": [{
"name": "Research & Development",
"PROJECT": [{
"name": "Bard AI",
"EMPLOYEE": [{
"name": "Jeff Dean",
"position": "Head of AI Research"
}]
}]
}]
})
RushDB Python SDK uses a modern SearchResult
container that follows Python SDK best practices similar to boto3, google-cloud libraries, and other popular SDKs.
- Generic type support: Uses Python's typing generics (
SearchResult[T]
) withRecordSearchResult
as a type alias forSearchResult[Record]
- List-like access: Index, slice, and iterate like a regular list
- Search context: Access total count, pagination info, and the original search query
- Boolean conversion: Use in if statements naturally (returns
True
if the result contains any items) - Pagination support: Built-in pagination information and
has_more
property
# Perform a search
result = db.records.find({
"where": {"status": "active"},
"limit": 10,
"skip": 20
})
# Check if we have results
if result:
print(f"Found {len(result)} records")
# Access search result information
print(f"Total matching records: {result.total}")
print(f"Has more results: {result.has_more}")
print(f"Search query: {result.search_query}")
# Get detailed pagination info
page_info = result.get_page_info()
print(f"Page info: {page_info}")
# Iterate over results
for record in result:
print(f"Record: {record.get('name')}")
# List comprehensions work
names = [r.get('name') for r in result]
# Indexing and slicing
first_record = result[0] if result else None
first_five = result[:5]
# String representation
print(repr(result)) # SearchResult(count=10, total=42)
def __init__(
self,
data: List[T],
total: Optional[int] = None,
search_query: Optional[SearchQuery] = None,
):
"""
Initialize search result.
Args:
data: List of result items
total: Total number of matching records (defaults to len(data) if not provided)
search_query: The search query used to generate this result (defaults to {})
"""
Property | Type | Description |
---|---|---|
data |
List[T] |
The list of result items (generic type) |
total |
int |
Total number of matching records |
has_more |
bool |
Whether there are more records available |
search_query |
SearchQuery |
The search query used to generate result |
Method | Return Type | Description |
---|---|---|
to_dict() |
dict |
Returns standardized dict with total, data, search_query |
get_page_info() |
dict |
Returns pagination info including total, loaded, has_more |
Implementation Notes:
- If
search_query
is not provided during initialization, it defaults to an empty dictionary{}
- The
has_more
property is calculated by comparing total with loaded records- The
__bool__
method returnsTrue
if the result contains any items (len(data) > 0
)get_page_info()
provides detailed pagination metadata for advanced use cases
# Paginated search using skip/limit in query
def paginate_results(query_base, page_size=10):
current_skip = 0
while True:
# Add pagination to query
query = {**query_base, "limit": page_size, "skip": current_skip}
result = db.records.find(query)
if not result:
break
print(f"Processing {len(result)} records (skip: {current_skip})")
for record in result:
process_record(record)
if not result.has_more:
break
current_skip += len(result)
# Usage
paginate_results({
"where": {"category": "electronics"},
"orderBy": {"created_at": "desc"}
})
The SDK provides a specialized type alias for search results containing Record objects:
# Type alias for record search results
RecordSearchResult = SearchResult[Record]
This type is what's returned by methods like db.records.find()
, providing type safety and specialized handling for Record objects while leveraging all the functionality of the generic SearchResult class.
The Record class has been enhanced with better data access patterns and utility methods.
# Create a record
user = db.records.create("User", {
"name": "John Doe",
"email": "[email protected]",
"age": 30,
"department": "Engineering"
})
# Safe field access with defaults
name = user.get("name") # "John Doe"
phone = user.get("phone", "Not provided") # "Not provided"
# Get clean user data (excludes internal fields like __id, __label)
user_data = user.get_data()
# Returns: {"name": "John Doe", "email": "[email protected]", "age": 30, "department": "Engineering"}
# Get all data including internal fields
full_data = user.get_data(exclude_internal=False)
# Includes: __id, __label, __proptypes, etc.
# Convenient fields property
fields = user.fields # Same as user.get_data()
# Dictionary conversion
user_dict = user.to_dict() # Clean user data
full_dict = user.to_dict(exclude_internal=False) # All data
# Direct field access
user_name = user["name"] # Direct access
user_id = user["__id"] # Internal field access
# Safe existence checking (no exceptions)
if user.exists():
print("Record is valid and accessible")
user.update({"status": "active"})
else:
print("Record doesn't exist or is not accessible")
# Perfect for validation workflows
def process_record_safely(record):
if not record.exists():
return None
return record.get_data()
# Conditional operations
records = db.records.find({"where": {"status": "pending"}})
for record in records:
if record.exists():
record.update({"processed_at": datetime.now()})
user = db.records.create("User", {"name": "Alice Johnson"})
print(repr(user)) # Record(id='abc-123', label='User')
print(str(user)) # User: Alice Johnson
# For records without names
product = db.records.create("Product", {"sku": "ABC123"})
print(str(product)) # Product (product-id-here)
For comprehensive documentation, tutorials, and examples, please visit:
Documentation includes:
- Complete Records API reference
- Relationship management
- Complex query examples
- Transaction usage
- Vector search capabilities
- Data import tools
- GitHub Issues - Bug reports and feature requests
- Discord Community - Get help from the community
- Email Support - Direct support from the RushDB team
Updates a record by ID, replacing all data.
Signature:
def set(
self,
record_id: str,
data: Dict[str, Any],
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
record_id
(str): ID of the record to updatedata
(Dict[str, Any]): New record datatransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Update entire record data
new_data = {
"name": "Updated Company Name",
"rating": 5.0
}
response = db.records.set(
record_id="record-123",
data=new_data
)
Updates specific fields of a record by ID.
Signature:
def update(
self,
record_id: str,
data: Dict[str, Any],
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
record_id
(str): ID of the record to updatedata
(Dict[str, Any]): Partial record data to updatetransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Update specific fields
updates = {
"rating": 4.8,
"status": "active"
}
response = db.records.update(
record_id="record-123",
data=updates
)
Searches for records matching specified criteria.
Signature:
def find(
self,
search_query: Optional[SearchQuery] = None,
record_id: Optional[str] = None,
transaction: Optional[Transaction] = None
) -> RecordSearchResult
Arguments:
search_query
(Optional[SearchQuery]): Search query parametersrecord_id
(Optional[str]): Optional record ID to search fromtransaction
(Optional[Transaction]): Optional transaction object
Returns:
RecordSearchResult
: SearchResult container with matching records and metadata
Example:
# Search for records with complex criteria
search_query = {
"where": {
"$and": [
{"age": {"$gte": 18}},
{"status": "active"},
{"department": "Engineering"}
]
},
"orderBy": {"created_at": "desc"},
"limit": 10
}
result = db.records.find(search_query=search_query)
# Work with SearchResult
print(f"Found {len(result)} out of {result.total} total records")
# Iterate over results
for record in result:
print(f"Employee: {record.get('name')} - {record.get('department')}")
# Check pagination
if result.has_more:
print("More results available")
# Access specific records
first_employee = result[0] if result else None
# List operations
senior_employees = [r for r in result if r.get('age', 0) > 30]
Deletes records matching a query.
Signature:
def delete(
self,
search_query: SearchQuery,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
search_query
(SearchQuery): Query to match records for deletiontransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Delete records matching criteria
search_query = {
"where": {
"status": "inactive",
"lastActive": {"$lt": "2023-01-01"}
}
}
response = db.records.delete(search_query)
Deletes one or more records by ID.
Signature:
def delete_by_id(
self,
id_or_ids: Union[str, List[str]],
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
id_or_ids
(Union[str, List[str]]): Single ID or list of IDs to deletetransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Delete single record
response = db.records.delete_by_id("record-123")
# Delete multiple records
response = db.records.delete_by_id([
"record-123",
"record-456",
"record-789"
])
Creates relationships between records.
Signature:
def attach(
self,
source: Union[str, Dict[str, Any]],
target: Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]],
options: Optional[RelationshipOptions] = None,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
source
(Union[str, Dict[str, Any]]): Source record ID or datatarget
(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options
(Optional[RelationshipOptions]): Relationship optionsdirection
(Optional[Literal["in", "out"]]): Relationship directiontype
(Optional[str]): Relationship type
transaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Create relationship between records
options = RelationshipOptions(
type="HAS_EMPLOYEE",
direction="out"
)
response = db.records.attach(
source="company-123",
target=["employee-456", "employee-789"],
options=options
)
Removes relationships between records.
Signature:
def detach(
self,
source: Union[str, Dict[str, Any]],
target: Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]],
options: Optional[RelationshipDetachOptions] = None,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
source
(Union[str, Dict[str, Any]]): Source record ID or datatarget
(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options
(Optional[RelationshipDetachOptions]): Detach optionsdirection
(Optional[Literal["in", "out"]]): Relationship directiontypeOrTypes
(Optional[Union[str, List[str]]]): Relationship type(s)
transaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Remove relationships between records
options = RelationshipDetachOptions(
typeOrTypes=["HAS_EMPLOYEE", "MANAGES"],
direction="out"
)
response = db.records.detach(
source="company-123",
target="employee-456",
options=options
)
Imports records from CSV data.
Signature:
def import_csv(
self,
label: str,
data: str,
options: Optional[Dict[str, bool]] = None,
transaction: Optional[Transaction] = None
) -> List[Dict[str, Any]]
Arguments:
label
(str): Label for imported recordsdata
(Union[str, bytes]): CSV data to importoptions
(Optional[Dict[str, bool]]): Import optionstransaction
(Optional[Transaction]): Optional transaction object
Returns:
List[Dict[str, Any]]
: Imported records data
Example:
# Import records from CSV
data = """name,age,department,role
John Doe,30,Engineering,Senior Engineer
Jane Smith,28,Product,Product Manager
Bob Wilson,35,Engineering,Tech Lead"""
records = db.records.import_csv(
label="EMPLOYEE",
data=data,
options={"returnResult": True, "suggestTypes": True}
)
The Record
class represents a record in RushDB and provides methods for manipulating individual records, including updates, relationships, and deletions.
class Record:
def __init__(self, client: "RushDB", data: Union[Dict[str, Any], None] = None)
Gets the record's unique identifier.
Type: str
Example:
record = db.records.create("USER", {"name": "John"})
print(record.id) # e.g., "1234abcd-5678-..."
Gets the record's property types.
Type: str
Example:
record = db.records.create("USER", {"name": "John", "age": 25})
print(record.proptypes) # Returns property type definitions
Gets the record's label.
Type: str
Example:
record = db.records.create("USER", {"name": "John"})
print(record.label) # "USER"
Gets the record's creation timestamp from its ID.
Type: int
Example:
record = db.records.create("USER", {"name": "John"})
print(record.timestamp) # Unix timestamp in milliseconds
Gets the record's creation date.
Type: datetime
Example:
record = db.records.create("USER", {"name": "John"})
print(record.date) # datetime object
Updates all data for the record.
Signature:
def set(
self,
data: Dict[str, Any],
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
data
(Dict[str, Any]): New record datatransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
record = db.records.create("USER", {"name": "John"})
response = record.set({
"name": "John Doe",
"email": "[email protected]",
"age": 30
})
Updates specific fields of the record.
Signature:
def update(
self,
data: Dict[str, Any],
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
data
(Dict[str, Any]): Partial record data to updatetransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
record = db.records.create("USER", {
"name": "John",
"email": "[email protected]"
})
response = record.update({
"email": "[email protected]"
})
Creates relationships with other records.
Signature:
def attach(
self,
target: Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], "Record", List["Record"]],
options: Optional[RelationshipOptions] = None,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
target
(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options
(Optional[RelationshipOptions]): Relationship optionsdirection
(Optional[Literal["in", "out"]]): Relationship directiontype
(Optional[str]): Relationship type
transaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Create two records
user = db.records.create("USER", {"name": "John"})
group = db.records.create("GROUP", {"name": "Admins"})
# Attach user to group
response = user.attach(
target=group,
options=RelationshipOptions(
type="BELONGS_TO",
direction="out"
)
)
Removes relationships with other records.
Signature:
def detach(
self,
target: Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], "Record", List["Record"]],
options: Optional[RelationshipDetachOptions] = None,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
target
(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options
(Optional[RelationshipDetachOptions]): Detach optionsdirection
(Optional[Literal["in", "out"]]): Relationship directiontypeOrTypes
(Optional[Union[str, List[str]]]): Relationship type(s)
transaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
# Detach user from group
response = user.detach(
target=group,
options=RelationshipDetachOptions(
typeOrTypes="BELONGS_TO",
direction="out"
)
)
Deletes the record.
Signature:
def delete(
self,
transaction: Optional[Transaction] = None
) -> Dict[str, str]
Arguments:
transaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]
: Response data
Example:
user = db.records.create("USER", {"name": "John"})
response = user.delete()
Here's a comprehensive example demonstrating various Record operations:
# Create a new record
user = db.records.create("USER", {
"name": "John Doe",
"email": "[email protected]",
"age": 30
})
# Access properties
print(f"Record ID: {user.id}")
print(f"Label: {user.label}")
print(f"Created at: {user.date}")
# Update record data
user.update({
"age": 31,
"title": "Senior Developer"
})
# Create related records
department = db.records.create("DEPARTMENT", {
"name": "Engineering"
})
project = db.records.create("PROJECT", {
"name": "Secret Project"
})
# Create relationships
user.attach(
target=department,
options=RelationshipOptions(
type="BELONGS_TO",
direction="out"
)
)
user.attach(
target=project,
options=RelationshipOptions(
type="WORKS_ON",
direction="out"
)
)
# Remove relationship
user.detach(
target=project,
options=RelationshipDetachOptions(
typeOrTypes="WORKS_ON",
direction="out"
)
)
# Delete record
user.delete()
Records can be manipulated within transactions for atomic operations:
# Start a transaction
transaction = db.transactions.begin()
try:
# Create user
user = db.records.create(
"USER",
{"name": "John Doe"},
transaction=transaction
)
# Update user
user.update(
{"status": "active"},
transaction=transaction
)
# Create and attach department
dept = db.records.create(
"DEPARTMENT",
{"name": "Engineering"},
transaction=transaction
)
user.attach(
target=dept,
options=RelationshipOptions(type="BELONGS_TO"),
transaction=transaction
)
# Explicitly commit the transaction to make changes permanent
transaction.commit()
except Exception as e:
# Rollback if any error occurs
transaction.rollback()
raise e
# Alternative: Using context manager
with db.transactions.begin() as transaction:
# Perform operations...
user = db.records.create(
"USER",
{"name": "John Doe"},
transaction=transaction
)
# Must explicitly commit - transactions are NOT automatically committed
transaction.commit()
The PropertiesAPI
class provides methods for managing and querying properties in RushDB.
class PropertiesAPI(BaseAPI):
Retrieves a list of properties based on optional search criteria.
Signature:
def find(
self,
search_query: Optional[SearchQuery] = None,
transaction: Optional[Transaction] = None
) -> List[Property]
Arguments:
query
(Optional[SearchQuery]): Search query parameters for filtering propertiestransaction
(Optional[Transaction]): Optional transaction object
Returns:
List[Property]
: List of properties matching the search criteria
Example:
# Find all properties
properties = db.properties.find()
# Find properties with specific criteria
query = {
"where": {
"name": {"$startsWith": "user_"}, # Properties starting with 'user_'
"type": "string" # Only string type properties
},
"limit": 10 # Limit to 10 results
}
filtered_properties = db.properties.find(query)
Retrieves a specific property by its ID.
Signature:
def find_by_id(
self,
property_id: str,
transaction: Optional[Transaction] = None
) -> Property
Arguments:
property_id
(str): Unique identifier of the propertytransaction
(Optional[Transaction]): Optional transaction object
Returns:
Property
: Property details
Example:
# Retrieve a specific property by ID
property_details = db.properties.find_by_id("prop_123456")
Deletes a property by its ID.
Signature:
def delete(
self,
property_id: str,
transaction: Optional[Transaction] = None
) -> None
Arguments:
property_id
(str): Unique identifier of the property to deletetransaction
(Optional[Transaction]): Optional transaction object
Returns:
None
Example:
# Delete a property
db.properties.delete("prop_123456")
Retrieves values for a specific property with optional sorting and pagination.
Signature:
def values(
self,
property_id: str,
sort: Optional[Literal["asc", "desc"]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
transaction: Optional[Transaction] = None
) -> PropertyValuesData
Arguments:
property_id
(str): Unique identifier of the propertysort
(Optional[Literal["asc", "desc"]]): Sort order of valuesskip
(Optional[int]): Number of values to skip (for pagination)limit
(Optional[int]): Maximum number of values to returntransaction
(Optional[Transaction]): Optional transaction object
Returns:
PropertyValuesData
: Property values data, including optional min/max and list of values
Example:
# Get property values
values_data = db.properties.values(
property_id="prop_age",
sort="desc", # Sort values in descending order
skip=0, # Start from the first value
limit=100 # Return up to 100 values
)
# Access values
print(values_data.get('values', [])) # List of property values
print(values_data.get('min')) # Minimum value (for numeric properties)
print(values_data.get('max')) # Maximum value (for numeric properties)
# Find all properties
all_properties = db.properties.find()
for prop in all_properties:
print(f"Property ID: {prop['id']}")
print(f"Name: {prop['name']}")
print(f"Type: {prop['type']}")
print(f"Metadata: {prop.get('metadata', 'No metadata')}")
print("---")
# Detailed property search
query = {
"where": {
"type": "number", # Only numeric properties
"name": {"$contains": "score"} # Properties with 'score' in name
},
"limit": 5 # Limit to 5 results
}
numeric_score_properties = db.properties.find(query)
# Get values for a specific property
if numeric_score_properties:
first_prop = numeric_score_properties[0]
prop_values = db.properties.values(
property_id=first_prop['id'],
sort="desc",
limit=50
)
print(f"Values for {first_prop['name']}:")
print(f"Min: {prop_values.get('min')}")
print(f"Max: {prop_values.get('max')}")
# Detailed property examination
detailed_prop = db.properties.find_by_id(first_prop['id'])
print("Detailed Property Info:", detailed_prop)
RushDB supports the following property types:
"boolean"
: True/False values"datetime"
: Date and time values"null"
: Null/empty values"number"
: Numeric values"string"
: Text values
property = {
"id": "prop_unique_id",
"name": "user_score",
"type": "number",
"metadata": Optional[str] # Optional additional information
}
property_with_value = {
"id": "prop_unique_id",
"name": "user_score",
"type": "number",
"value": 95.5 # Actual property value
}
Properties API methods support optional transactions for atomic operations:
# Using a transaction with explicit commit
transaction = db.transactions.begin()
try:
# Perform multiple property-related operations
property_to_delete = db.properties.find(
{"where": {"name": "temp_property"}},
transaction=transaction
)[0]
db.properties.delete(
property_id=property_to_delete['id'],
transaction=transaction
)
# Explicitly commit the transaction
transaction.commit()
except Exception as e:
# Rollback if any error occurs
transaction.rollback()
raise e
# Alternative: Using context manager (auto-rollback on error)
with db.transactions.begin() as transaction:
# Perform operations
property_to_delete = db.properties.find(
{"where": {"name": "temp_property"}},
transaction=transaction
)[0]
db.properties.delete(
property_id=property_to_delete['id'],
transaction=transaction
)
# Must explicitly commit - transactions are NOT automatically committed
transaction.commit()
When working with the PropertiesAPI, be prepared to handle potential errors:
try:
# Attempt to find or delete a property
property_details = db.properties.find_by_id("non_existent_prop")
except RushDBError as e:
print(f"Error: {e}")
print(f"Error Details: {e.details}")
The LabelsAPI
class provides methods for discovering and working with record labels in RushDB. Labels are used to categorize and type records, similar to table names in relational databases.
class LabelsAPI(BaseAPI):
Discovers labels (record types) that exist in the database and can optionally filter them based on search criteria.
Signature:
def find(
self,
search_query: Optional[SearchQuery] = None,
transaction: Optional[Transaction] = None
) -> Dict[str, int]
Arguments:
search_query
(Optional[SearchQuery]): Search criteria to filter labelstransaction
(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, int]
: Dictionary mapping label names to their record counts
Example:
# Get all labels in the database
all_labels = db.labels.find()
print("Available labels:", all_labels)
# Output: {'USER': 150, 'DEPARTMENT': 12, 'PROJECT': 45, 'COMPANY': 3}
# Search for labels amongst records matching a pattern
from rushdb.models.search_query import SearchQuery
query = SearchQuery(where={"name": {"$contains": "alice"}})
user_labels = db.labels.find(query)
print("Labels for records containing 'alice':", user_labels)
# Output: {'USER': 2, 'EMPLOYEE': 1}
# Discover all record types in the database
all_labels = db.labels.find()
print(f"Database contains {len(all_labels)} record types:")
for label, count in all_labels.items():
print(f" - {label}: {count} records")
# Find labels for records with specific criteria
query = SearchQuery(where={
"status": "active",
"created_date": {"$gte": "2023-01-01"}
})
active_labels = db.labels.find(query)
print("Labels for active records:")
for label, count in active_labels.items():
print(f" - {label}: {count} active records")
# Use with transaction
transaction = db.transactions.begin()
try:
labels_in_tx = db.labels.find(transaction=transaction)
# Process labels...
transaction.commit()
except Exception as e:
transaction.rollback()
raise e
The RelationshipsAPI
class provides functionality for querying and analyzing relationships between records in RushDB. Relationships represent connections or associations between different records.
class RelationshipsAPI(BaseAPI):
Search for and retrieve relationships matching the specified criteria with support for pagination and transactions.
Signature:
async def find(
self,
search_query: Optional[SearchQuery] = None,
pagination: Optional[PaginationParams] = None,
transaction: Optional[Union[Transaction, str]] = None
) -> List[Relationship]
Arguments:
search_query
(Optional[SearchQuery]): Search criteria to filter relationshipspagination
(Optional[PaginationParams]): Pagination options withlimit
andskip
transaction
(Optional[Union[Transaction, str]]): Optional transaction object or ID
Returns:
List[Relationship]
: List of relationships matching the search criteria
Example:
import asyncio
from rushdb.models.search_query import SearchQuery
async def main():
# Find all relationships
all_relationships = await db.relationships.find()
print(f"Total relationships: {len(all_relationships)}")
# Find relationships with pagination
pagination = {"limit": 50, "skip": 0}
first_page = await db.relationships.find(pagination=pagination)
# Find specific relationship types
query = SearchQuery(where={"type": "BELONGS_TO"})
belongs_to_rels = await db.relationships.find(search_query=query)
# Find relationships involving specific records
user_query = SearchQuery(where={
"$or": [
{"source_id": "user-123"},
{"target_id": "user-123"}
]
})
user_relationships = await db.relationships.find(search_query=user_query)
# Run the async function
asyncio.run(main())
The PaginationParams
TypedDict defines pagination options:
class PaginationParams(TypedDict, total=False):
limit: int # Maximum number of relationships to return
skip: int # Number of relationships to skip
import asyncio
from rushdb.models.search_query import SearchQuery
async def explore_relationships():
# Get overview of all relationships
all_rels = await db.relationships.find()
print(f"Database contains {len(all_rels)} relationships")
# Paginate through relationships
page_size = 25
page = 0
while True:
pagination = {"limit": page_size, "skip": page * page_size}
relationships = await db.relationships.find(pagination=pagination)
if not relationships:
break
print(f"Page {page + 1}: {len(relationships)} relationships")
for rel in relationships:
print(f" {rel['source_id']} --[{rel['type']}]--> {rel['target_id']}")
page += 1
if len(relationships) < page_size:
break
# Find relationships by type
query = SearchQuery(where={"type": "WORKS_ON"})
work_relationships = await db.relationships.find(search_query=query)
print(f"Found {len(work_relationships)} 'WORKS_ON' relationships")
# Find relationships within a transaction
transaction = db.transactions.begin()
try:
tx_rels = await db.relationships.find(transaction=transaction)
# Process relationships...
transaction.commit()
except Exception as e:
transaction.rollback()
raise e
# Run the example
asyncio.run(explore_relationships())
Both LabelsAPI and RelationshipsAPI support transactions:
import asyncio
async def transaction_example():
transaction = db.transactions.begin()
try:
# Find labels within transaction
labels = db.labels.find(transaction=transaction)
# Find relationships within transaction
relationships = await db.relationships.find(transaction=transaction)
# Perform operations based on discovered data...
# Explicitly commit the transaction
transaction.commit()
except Exception as e:
# Rollback on any error
transaction.rollback()
raise e
asyncio.run(transaction_example())
Note: The RelationshipsAPI methods are async and require the use of await
and asyncio
for proper execution.