This project is a data processing pipeline designed to categorize scientific datasets using large language models. It includes components for data input, processing, and output, allowing users to efficiently manage and analyze their data.
The pipeline utilizes base classes for input, processing, categorization, and output, maintaining a high level of flexibility and extensibility that allows users to customize and adapt components to their specific needs.
The pipeline offers two distinct processing modes to accommodate different use cases and reliability requirements:
Sync mode processes records individually in real-time with built-in checkpointing and crash recovery capabilities. This mode is ideal for:
- Long-running processing jobs that need reliability
- Scenarios where you want to see progress and results in real-time
- Cases where you need to resume processing after interruptions
- Record-by-record processing: Each item is processed individually, not in batches
- Automatic checkpointing: Save progress every N records (configurable)
- Crash recovery: Resume from the last checkpoint if processing is interrupted
- Real-time output: Results are written immediately as they're processed
- Progress tracking: Live progress updates with ETA estimates
Basic sync mode:
python nde_pipeline_topic_categories.py --mode sync --dataset_path data.json --dataset_name zenodo
With custom checkpoint interval (checkpoint every 25 records):
python nde_pipeline_topic_categories.py --mode sync --dataset_path data.json --dataset_name zenodo --checkpoint_interval 25
Resume from checkpoint:
python nde_pipeline_topic_categories.py --mode sync --resume --session_id sync_20240728_143022
List available checkpoint sessions:
python nde_pipeline_topic_categories.py --list_sessions
- Checkpoints are automatically saved to the
checkpoints/
directory - Each session gets a unique ID (e.g.,
sync_20240728_143022
) - Checkpoint files include:
{session_id}_state.json
: Processing state and metadata{session_id}_items.pkl
: Remaining items to process{session_id}_results.json
: Intermediate results for recovery
- Automatic cleanup of checkpoint files after successful completion
Batch mode uses the OpenAI Batch API for cost-effective processing of large datasets:
- Submit all items as a single batch job
- Monitor batch status until completion
- Retrieve results when the batch is finished
- More cost-effective for large volumes but less real-time feedback
python nde_pipeline_topic_categories.py --mode batch --dataset_path data.json --dataset_name zenodo
--mode {sync,batch}
: Choose processing mode (default: batch)--dataset_path
: Path to your dataset file--dataset_name
: Dataset configuration name from column mappings--output
: Custom output filename
--enable_checkpointing
/--disable_checkpointing
: Control checkpointing (enabled by default)--checkpoint_interval N
: Save checkpoint every N records (default: 10)--session_id
: Custom session ID for checkpointing--resume
: Resume from checkpoint using session_id--list_sessions
: List all available checkpoint sessions
--batch_ids
: List of existing batch IDs to process--batch_file
: File containing batch IDs (one per line)--check_interval
: Minutes between batch status checks (default: 30)
-
Clone the Repository:
git clone <repository-url> cd <repository-directory>
-
Create a Virtual Environment (optional but recommended):
python -m venv venv source venv/bin/activate # On Windows use `venv\Scripts\activate`
-
Install Dependencies:
pip install -e .
Sync Mode (recommended for most use cases):
python nde_pipeline_topic_categories.py --mode sync --dataset_path data.json --dataset_name zenodo --output results.json
Batch Mode (for cost optimization):
python nde_pipeline_topic_categories.py --mode batch --dataset_path data.json --dataset_name zenodo --output results.json
Your dataset should be in JSON or CSV format. The pipeline uses column mappings to understand your data structure.
The column mappings are defined in pipeline/utils/configs/column_mappings.json
. This file specifies how to extract information from your dataset:
- text_columns: Columns containing the text to be processed
- id_column: Column to use as unique identifier (auto-generated if not specified)
- metadata_mapping: Maps your dataset fields to expected field names
Example configuration:
{
"zenodo": {
"text_columns": ["Description"],
"id_column": "_id",
"metadata_mapping": {
"Name": "title",
"URL": "source_url"
}
}
}
Your JSON dataset should look like this:
[
{
"_id": "study_001",
"Name": "Sample Study",
"Description": "This study investigates...",
"URL": "http://example.com"
}
]
The pipeline will:
- Use
_id
as the unique identifier - Process the
Description
field - Map
Name
→title
andURL
→source_url
The pipeline is designed to be modular and flexible, allowing for various components to work together seamlessly. Here's a general overview of how the pipeline operates:
-
Input Handling: The pipeline reads data from the specified dataset file using input classes that inherit from the
DataInput
base class. This allows for different file formats (e.g., CSV, JSON) to be processed uniformly. -
Preprocessing: The data is cleaned and formatted using processors that inherit from the
DataProcessor
base class. These processors can be customized to perform various text cleaning and formatting tasks before the data is sent for categorization. -
Categorization: The pipeline utilizes a language model provider that inherits from the
LLMProvider
base class. This component is responsible for interacting with the language model locally or via API, sending the cleaned prompts, and receiving the categorized results. -
Postprocessing: The results are processed using additional processors that also inherit from the
DataProcessor
base class. This allows for normalization and other transformations to ensure that the output categories conform to expected formats. -
Output Handling: Finally, the results are written to a specified output format using output classes that inherit from the
DataOutput
base class. This ensures that the processed results are saved in a structured and accessible manner.
By utilizing base classes for input, processing, categorization, and output, the pipeline maintains a high level of flexibility and extensibility, allowing users to customize and adapt the components to their specific needs.
Processing a large research dataset with checkpointing:
# Start processing with conservative checkpoint interval
python nde_pipeline_topic_categories.py --mode sync \
--dataset_path research_papers.json \
--dataset_name zenodo \
--checkpoint_interval 200 \
--output results/research_categorized.json
# If interrupted, resume from checkpoint
python nde_pipeline_topic_categories.py --mode sync \
--resume --session_id sync_20240728_143022
Quick processing for testing:
# Use batch mode for quick, small datasets
python nde_pipeline_topic_categories.py --mode batch \
--dataset_path test_data.json \
--dataset_name zenodo \
--output results/test_results.json
Use Sync Mode when:
- Processing large datasets that may take hours or days
- You need real-time progress feedback and results
- Reliability and crash recovery are important
- You want to process and save results incrementally
- You're running on unreliable infrastructure
Use Batch Mode when:
- You have a stable environment and processing time is predictable
- Cost optimization is a primary concern (batch API is more cost-effective)
- You can wait for all results at once
- The dataset size is manageable (completes within reasonable time)
- Small datasets (< 1000 records): Use
--checkpoint_interval 50-100
- Medium datasets (1000-10000 records): Use
--checkpoint_interval 100-500
- Large datasets (> 10000 records): Use
--checkpoint_interval 500-1000
Smaller intervals provide better crash recovery but create more I/O overhead. Larger intervals are more efficient but mean losing more progress if interrupted.
Monitor progress during long-running jobs:
# Check checkpoint sessions
python nde_pipeline_topic_categories.py --list_sessions
# Resume if needed
python nde_pipeline_topic_categories.py --mode sync --resume --session_id <session_id>
Automatic cleanup: Checkpoint files are automatically cleaned up after successful completion. To keep them for debugging, use a custom checkpointer configuration.