Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📝 Explain behavior under multi-processing #2431

Open
falexwolf opened this issue Feb 6, 2025 · 0 comments
Open

📝 Explain behavior under multi-processing #2431

falexwolf opened this issue Feb 6, 2025 · 0 comments
Assignees

Comments

@falexwolf
Copy link
Member

This is an example:

import multiprocessing as mp
from pathlib import Path

import lamindb as ln
import pandas as pd


def process_chunk(chunk_id: int) -> str:
    # Create a simple DataFrame
    df = pd.DataFrame(
        {"id": range(chunk_id * 10, (chunk_id + 1) * 10), "value": range(10)}
    )

    # Save it as an artifact
    key = f"chunk_{chunk_id}.parquet"
    artifact = ln.Artifact.from_df(df, key=key).save()
    return artifact.key


def test_tracked_multiprocessing():
    ln.track()

    # Initialize multiprocessing pool
    n_processes = 2
    with mp.Pool(processes=n_processes) as pool:
        # Process chunks in parallel
        chunk_keys = pool.map(process_chunk, range(n_processes))

    # Verify results
    # Each process should have created its own artifact with unique run
    print(chunk_keys)
    artifacts = [ln.Artifact.get(key) for key in chunk_keys]

    # Check that we got the expected number of artifacts
    assert len(artifacts) == n_processes

    # Verify each artifact has its own unique run
    runs = [artifact.run for artifact in artifacts]
    run_ids = [run.id for run in runs]
    print(run_ids)
    assert len(set(run_ids)) == n_processes  # all runs should be unique

    # Verify each run has the correct start and finish times
    for run in runs:
        print(run)
        assert run.started_at is not None
        assert run.finished_at is not None
        assert run.started_at < run.finished_at

    # Clean up test artifacts
    for artifact in artifacts:
        artifact.delete(permanent=True)


if __name__ == "__main__":
    test_tracked_multiprocessing()
@falexwolf falexwolf self-assigned this Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant