You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importmultiprocessingasmpfrompathlibimportPathimportlamindbaslnimportpandasaspddefprocess_chunk(chunk_id: int) ->str:
# Create a simple DataFramedf=pd.DataFrame(
{"id": range(chunk_id*10, (chunk_id+1) *10), "value": range(10)}
)
# Save it as an artifactkey=f"chunk_{chunk_id}.parquet"artifact=ln.Artifact.from_df(df, key=key).save()
returnartifact.keydeftest_tracked_multiprocessing():
ln.track()
# Initialize multiprocessing pooln_processes=2withmp.Pool(processes=n_processes) aspool:
# Process chunks in parallelchunk_keys=pool.map(process_chunk, range(n_processes))
# Verify results# Each process should have created its own artifact with unique runprint(chunk_keys)
artifacts= [ln.Artifact.get(key) forkeyinchunk_keys]
# Check that we got the expected number of artifactsassertlen(artifacts) ==n_processes# Verify each artifact has its own unique runruns= [artifact.runforartifactinartifacts]
run_ids= [run.idforruninruns]
print(run_ids)
assertlen(set(run_ids)) ==n_processes# all runs should be unique# Verify each run has the correct start and finish timesforruninruns:
print(run)
assertrun.started_atisnotNoneassertrun.finished_atisnotNoneassertrun.started_at<run.finished_at# Clean up test artifactsforartifactinartifacts:
artifact.delete(permanent=True)
if__name__=="__main__":
test_tracked_multiprocessing()
The text was updated successfully, but these errors were encountered:
This is an example:
The text was updated successfully, but these errors were encountered: