Skip to content

callawaycloud/aio-salesforce

Repository files navigation

aio-sf

An async Salesforce library for Python.

Features

✅ Supported APIs

  • Bulk API 2.0 - Efficient querying of large datasets
  • Describe API - Field metadata and object descriptions
  • SOQL Query API - Standard Salesforce queries
  • SObjects Collections API - CRUD on collections of SObjects (up to 2000 records at a time)
  • Tooling API - Development and deployment tools
  • Bulk API 1.0 - Legacy bulk operations
  • Streaming API - Real-time event streaming

✅ Supported Authentication Strategies

  • OAuth Client Credentials - Automatic authentication
  • Static Token - Existing access tokens
  • Refresh Token - Refresh token flow
  • SFDX CLI - Login by grabbing a token from the SFDX CLI
  • Password Authentication - Password + ST authentication (soap login)

🚀 Export Features

  • Parquet Export - Efficient columnar storage with schema mapping
  • CSV Export - Simple text format export
  • Resume Support - Resume interrupted queries using job IDs
  • Streaming Processing - Memory-efficient processing of large datasets

Installation

Core (Connection Only)

uv add aio-sf
# or: pip install aio-sf

With Export Capabilities

uv add "aio-sf[exporter]"
# or: pip install "aio-sf[exporter]"

Quick Start

Authentication & Connection

import asyncio
import os
from aio_sf import SalesforceClient, ClientCredentialsAuth

async def main():
    auth = ClientCredentialsAuth(
        client_id=os.getenv('SF_CLIENT_ID'),
        client_secret=os.getenv('SF_CLIENT_SECRET'),
        instance_url=os.getenv('SF_INSTANCE_URL'),
    )
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        print(f"✅ Connected to: {sf.instance_url}")

        sobjects = await sf.describe.list_sobjects()
        print(sobjects[0]["name"])

        contact_describe = await sf.describe.sobject("Contact")

        # retrieve first 5 "creatable" fields on contact
        queryable_fields = [
            field.get("name", "")
            for field in contact_describe["fields"]
            if field.get("createable")
        ][:5]

        query = f"SELECT {', '.join(queryable_fields)} FROM Contact LIMIT 5"
        print(query)

        query_result = await sf.query.soql(query)
        # Loop over records using async iteration
        # or: await query_result.collect_all() to collect all records into a list
        async for record in query_result:
            print(record.get("AccountId"))

        # Create a new Account
        await sf.collections.insert(
            sobject_type="Account",
            records=[{"Name": "Test Account"}]
        )

asyncio.run(main())

Exporter

The Exporter library contains a streamlined and "opinionated" way to export data from Salesforce to various formats.

3. Export to Parquet

from aio_sf.exporter import bulk_query, write_query_to_parquet

async def main():
    # ... authentication code from above ...
    
    async with SalesforceClient(auth_strategy=auth) as sf:
        # Query with proper schema
        query_result = await bulk_query(
            sf=sf,
            soql_query="SELECT Id, Name, Email, CreatedDate FROM Contact"
        )
        
        # Export to Parquet
        write_query_to_parquet(
            query_result=query_result,
            file_path="contacts.parquet"
        )
        
        print(f"✅ Exported {len(query_result)} contacts to Parquet")

License

MIT License

About

Modern Async Salesforce Client

Topics

Resources

License

Stars

Watchers

Forks

Languages