Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refactor of dataset builder and executor #537

Open
wants to merge 57 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d11f89c
ignore __dj__produced_data__
cyruszhang Nov 15, 2024
41dea26
add download framework; add wiki support
cyruszhang Nov 19, 2024
50f8d3d
refactor formatter; add dataset_builder
cyruszhang Nov 22, 2024
817caab
merge with master
cyruszhang Nov 25, 2024
a089de4
add config files and test entry
cyruszhang Nov 26, 2024
5a717d7
initial dataset_builder
cyruszhang Dec 2, 2024
9c79844
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Dec 2, 2024
ffba7e7
add mixture dataset support; type/subtype
cyruszhang Dec 4, 2024
79ae980
RayExecutor with ExecutorBase
cyruszhang Dec 4, 2024
e6a6e71
get rid of subtype for local dataset; depending on ext for proper rou…
cyruszhang Dec 4, 2024
eb300f0
use source instead of sub_type for remote dataset configs
cyruszhang Dec 4, 2024
456eea1
arxiv downloader return Dataset instead of DJDataset
cyruszhang Dec 4, 2024
c25e40f
rewrite CLI datapath with test cases
cyruszhang Dec 5, 2024
75ffe3f
add executor and dataload strategy logic
cyruszhang Dec 6, 2024
4ec1ef9
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Dec 6, 2024
4fb6e17
add layered load strategies
cyruszhang Dec 6, 2024
84803cd
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Dec 9, 2024
cb5b80a
fix circular dependency; add dataset config test
cyruszhang Dec 10, 2024
daf7a85
update dataset_path parsing in config
cyruszhang Dec 10, 2024
7c48892
fix download test case; add wildcard matching for load strategy
cyruszhang Dec 11, 2024
940b44d
add test case for load strategy wild card matching
cyruszhang Dec 11, 2024
b80f991
add more test cases for datapath rewrite logic; fix rewrite to handle…
cyruszhang Dec 11, 2024
0d5d4ba
materialize symlinks for duplicates
cyruszhang Dec 11, 2024
f3a4ec4
add load strategy validation framework
cyruszhang Dec 12, 2024
70fffd2
add DataValidator logic
cyruszhang Dec 16, 2024
bbc303d
data validator as separate pre-processing
cyruszhang Dec 16, 2024
4b6065f
update data validator logic and add/fix test cases
cyruszhang Dec 25, 2024
0b153ab
[nit] rename test
cyruszhang Jan 2, 2025
171b361
[nit] rename test again
cyruszhang Jan 2, 2025
6841d19
add builder test cases; update ds config validation logic
cyruszhang Jan 2, 2025
3128d05
[minor] update test case naming
cyruszhang Jan 2, 2025
7b6b2bd
add support for max_sample_num in dataset configs; add tests
cyruszhang Jan 6, 2025
161f059
fix test cases and update dataset builder code
cyruszhang Jan 6, 2025
8cb322f
merge main
cyruszhang Jan 6, 2025
afe906d
handle weights and sample_nums
cyruszhang Jan 8, 2025
1217e61
support ExecutorType enum
cyruszhang Jan 9, 2025
755abca
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Jan 9, 2025
5dd17fe
flip on DatasetBuilder; replace formatter
cyruszhang Jan 9, 2025
eb3b123
minor fix
cyruszhang Jan 9, 2025
7c171fb
add ExecutorBase to RayExecutor
cyruszhang Jan 9, 2025
195aff8
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Jan 21, 2025
dd95df0
fix bugs; use str for executor_type
cyruszhang Jan 23, 2025
530efa8
add add_same_content_to_new_column reference
cyruszhang Jan 23, 2025
3b726bd
ray data defaults to json
cyruszhang Jan 24, 2025
cac8e5e
fix dataset_path bug; add ray config test
cyruszhang Jan 24, 2025
a99c9b5
tests video on ray config
cyruszhang Jan 24, 2025
3c9caf5
add default cfg logic; fix data_mixture demo
cyruszhang Jan 24, 2025
b9f6a99
default executor + local data; fix analyzer bug
cyruszhang Jan 27, 2025
e05f146
Merge branch 'main' into feat/cyruszhang/data-downloader
cyruszhang Jan 27, 2025
acccc01
pass through num_proc param for ray executor when loading dataset
cyruszhang Jan 27, 2025
1823cd6
fix bugs for huggingface dataset loading; add sample config
cyruszhang Jan 27, 2025
2963118
fix typo in configs
cyruszhang Jan 29, 2025
4472aef
remove absolute path logic; remove dup test files
cyruszhang Feb 7, 2025
7964867
update .gitignore for dup files in tests
cyruszhang Feb 7, 2025
96207ba
fix RayDataset schema validation issue
cyruszhang Feb 7, 2025
9b1d738
fix wiki downloader tests
cyruszhang Feb 7, 2025
828e7ba
remove mixture formatter; logic captured in dataloader
cyruszhang Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ dist
wandb/
__pycache__
.vscode/
**/__dj__produced_data__/*
venv/

# dup files created by tests
tests/ops/data/*dup*
6 changes: 6 additions & 0 deletions configs/datasets/local_json.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# global parameters
project_name: 'dataset-local-json'
dataset:
configs:
- type: 'local'
path: 'path/to/json/file'
6 changes: 6 additions & 0 deletions configs/datasets/local_parquet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# global parameters
project_name: 'dataset-local-parquet'
dataset:
configs:
- type: 'local'
path: 'path/to/parquet/file'
10 changes: 10 additions & 0 deletions configs/datasets/mixture.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
project_name: 'dataset-mixture'
dataset:
max_sample_num: 10000
configs:
- type: 'local'
weight: 1.0
path: 'path/to/json/file'
- type: 'local'
weight: 1.0
path: 'path/to/csv/file'
10 changes: 10 additions & 0 deletions configs/datasets/remote_arxiv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# global parameters
project_name: 'dataset-remote-arxiv'
dataset:
configs:
- type: 'remote'
source: 'arxiv'
lang: 'en'
dump_date: 'latest'
force_download: false
url_limit: 2
11 changes: 11 additions & 0 deletions configs/datasets/remote_commoncrawl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# global parameters
project_name: 'dataset-remote-commoncrawl'
dataset:
configs:
- type: 'remote'
source: 'commoncrawl'
start_snapshot: '2020-50'
end_snapshot: '2021-04'
aws: true
force_download: false
url_limit: 2
10 changes: 10 additions & 0 deletions configs/datasets/remote_huggingface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# global parameters
project_name: 'dataset-remote-huggingface'
dataset:
configs:
- type: 'remote'
source: 'huggingface'
path: "HuggingFaceFW/fineweb"
name: "CC-MAIN-2024-10"
split: "train"
limit: 1000
10 changes: 10 additions & 0 deletions configs/datasets/remote_modelscope.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# global parameters
project_name: 'dataset-remote-modelscope'
dataset:
configs:
- type: 'remote'
source: 'modelscope'
path: 'modelscope/clue'
subset_name: 'afqmc'
split: 'train'
limit: 1000
10 changes: 10 additions & 0 deletions configs/datasets/remote_wiki.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# global parameters
project_name: 'dataset-remote-wiki'
dataset:
configs:
- type: 'remote'
source: 'wiki'
lang: 'en'
dump_date: 'latest'
force_download: false
url_limit: 2
18 changes: 18 additions & 0 deletions configs/datasets/validation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
dataset:
configs:
- type: local
path: path/to/data.json

validators:
- type: conversation
min_turns: 2
max_turns: 20
- type: required_fields
required_fields:
- "text"
- "metadata"
- "language"
field_types:
text: "str"
metadata: "dict"
language: "str"
22 changes: 22 additions & 0 deletions configs/demo/process-huggingface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Process config example for dataset

# global parameters
project_name: 'demo-process'
dataset:
configs:
- type: 'remote'
source: 'huggingface'
path: 'hugfaceguy0001/retarded_bar'
name: 'question'
split: 'train'

np: 4 # number of subprocess to process your dataset

export_path: './outputs/demo-process/demo-processed.jsonl'

# process schedule
# a list of several process operators with their arguments
process:
- language_id_score_filter:
lang: 'zh'
min_score: 0.8
6 changes: 3 additions & 3 deletions data_juicer/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .config import (export_config, get_init_configs, init_configs,
merge_config, prepare_side_configs)
from .config import (export_config, get_default_cfg, get_init_configs,
init_configs, merge_config, prepare_side_configs)

__all__ = [
'init_configs', 'get_init_configs', 'export_config', 'merge_config',
'prepare_side_configs'
'prepare_side_configs', 'get_default_cfg'
]
55 changes: 49 additions & 6 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from jsonargparse import (ActionConfigFile, ArgumentParser, Namespace,
dict_to_namespace, namespace_to_dict)
from jsonargparse.typehints import ActionTypeHint
from jsonargparse._typehints import ActionTypeHint
from jsonargparse.typing import ClosedUnitInterval, NonNegativeInt, PositiveInt
from loguru import logger

Expand Down Expand Up @@ -102,6 +102,13 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None):
help='Path to datasets with optional weights(0.0-1.0), 1.0 as '
'default. Accepted format:<w1> dataset1-path <w2> dataset2-path '
'<w3> dataset3-path ...')
parser.add_argument(
'--dataset',
type=Union[List[Dict], Dict],
default=[],
help='Dataset setting to define local/remote datasets; could be a '
'dict or a list of dicts; refer to configs/datasets for more '
'detailed examples')
parser.add_argument(
'--generated_dataset_config',
type=Dict,
Expand Down Expand Up @@ -452,19 +459,22 @@ def init_setup_from_cfg(cfg: Namespace):

# check and get dataset dir
if cfg.get('dataset_path', None) and os.path.exists(cfg.dataset_path):
logger.info('dataset_path config is set and a valid local path')
cfg.dataset_path = os.path.abspath(cfg.dataset_path)
if os.path.isdir(cfg.dataset_path):
cfg.dataset_dir = cfg.dataset_path
else:
cfg.dataset_dir = os.path.dirname(cfg.dataset_path)
elif cfg.dataset_path == '':
logger.warning('dataset_path is empty by default.')
elif cfg.dataset_path == '' and cfg.get('dataset', None):
logger.info('dataset_path config is empty; dataset is present')
cfg.dataset_dir = ''
else:
logger.warning(f'dataset_path [{cfg.dataset_path}] is not a valid '
f'local path. Please check and retry, otherwise we '
f'will treat it as a remote dataset or a mixture of '
f'several datasets.')
f'local path, AND dataset is not present. '
f'Please check and retry, otherwise we '
f'will treat dataset_path as a remote dataset or a '
f'mixture of several datasets.')

cfg.dataset_dir = ''

# check number of processes np
Expand Down Expand Up @@ -910,3 +920,36 @@ def get_init_configs(cfg: Union[Namespace, Dict]):
json.dump(cfg, f)
inited_dj_cfg = init_configs(['--config', temp_file])
return inited_dj_cfg


def get_default_cfg():
"""Get default config values from config_all.yaml"""
cfg = Namespace()

# Get path to config_all.yaml
config_dir = os.path.dirname(os.path.abspath(__file__))
default_config_path = os.path.join(config_dir,
'../../configs/config_all.yaml')

# Load default values from yaml
with open(default_config_path, 'r', encoding='utf-8') as f:
defaults = yaml.safe_load(f)

# Convert to flat dictionary for namespace
flat_defaults = {
'executor_type': 'default',
'ray_address': 'auto',
'suffixes': None,
'text_keys': 'text',
'add_suffix': False,
'export_path': './outputs',
# Add other top-level keys from config_all.yaml
**defaults
}

# Update cfg with defaults
for key, value in flat_defaults.items():
if not hasattr(cfg, key):
setattr(cfg, key, value)

return cfg
6 changes: 5 additions & 1 deletion data_juicer/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from .adapter import Adapter
from .analyzer import Analyzer
from .data import NestedDataset
from .executor import Executor
from .executor import Executor, ExecutorFactory, RayExecutor
from .executor.base import ExecutorBase
from .exporter import Exporter
from .monitor import Monitor
from .tracer import Tracer
Expand All @@ -10,7 +11,10 @@
'Adapter',
'Analyzer',
'NestedDataset',
'ExecutorFactory',
'Executor',
'RayExecutor',
'ExecutorBase',
'Exporter',
'Monitor',
'Tracer',
Expand Down
15 changes: 5 additions & 10 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.core.data.dataset_builder import DatasetBuilder
from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS, Filter, load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils import cache_utils
Expand Down Expand Up @@ -44,14 +44,9 @@ def __init__(self, cfg: Optional[Namespace] = None):
f'[{self.cfg.cache_compress}]')
cache_utils.CACHE_COMPRESS = self.cfg.cache_compress

# setup formatter
logger.info('Setting up data formatter...')
self.formatter = load_formatter(
dataset_path=self.cfg.dataset_path,
generated_dataset_config=self.cfg.generated_dataset_config,
text_keys=self.cfg.text_keys,
suffixes=self.cfg.suffixes,
add_suffix=self.cfg.add_suffix)
# setup dataset builder
logger.info('Setting up dataset builder...')
self.dataset_builder = DatasetBuilder(cfg, executor_type='default')

# prepare exporter and check export path suffix
# NOTICE: no need to export dataset texts for analyzer
Expand Down Expand Up @@ -91,7 +86,7 @@ def run(self,
load_data_np = self.cfg.np
if dataset is None:
logger.info('Loading dataset from data formatter...')
dataset = self.formatter.load_dataset(load_data_np, self.cfg)
dataset = self.dataset_builder.load_dataset(num_proc=load_data_np)
else:
logger.info(f'Using existing dataset {dataset}')
if self.cfg.auto:
Expand Down
9 changes: 9 additions & 0 deletions data_juicer/core/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .dj_dataset import (DJDataset, NestedDataset,
add_same_content_to_new_column,
wrap_func_with_nested_access)
from .ray_dataset import RayDataset

__all__ = [
'DJDataset', 'NestedDataset', 'RayDataset', 'wrap_func_with_nested_access',
'add_same_content_to_new_column'
]
61 changes: 61 additions & 0 deletions data_juicer/core/data/config_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Dict


class ConfigValidationError(Exception):
"""Custom exception for validation errors"""
pass


class ConfigValidator:
"""Mixin class for configuration validation"""

# Define validation rules for each strategy type
CONFIG_VALIDATION_RULES = {
'required_fields': [], # Fields that must be present
'optional_fields': [], # Fields that are optional
'field_types': {}, # Expected types for fields
'custom_validators': {} # Custom validation functions
}

def validate_config(self, ds_config: Dict) -> None:
"""
Validate the configuration dictionary.

Args:
ds_config: Configuration dictionary to validate

Raises:
ValidationError: If validation fails
"""
# Check required fields
missing_fields = [
field for field in self.CONFIG_VALIDATION_RULES['required_fields']
if field not in ds_config
]
if missing_fields:
raise ConfigValidationError(
f"Missing required fields: {', '.join(missing_fields)}")

# Optional fields
# no need for any special checks

# Check field types
for field, expected_type in self.CONFIG_VALIDATION_RULES[
'field_types'].items():
if field in ds_config:
value = ds_config[field]
if not isinstance(value, expected_type):
raise ConfigValidationError(
f"Field '{field}' must be of "
"type '{expected_type.__name__}', "
f"got '{type(value).__name__}'")

# Run custom validators
for field, validator in self.CONFIG_VALIDATION_RULES[
'custom_validators'].items():
if field in ds_config:
try:
validator(ds_config[field])
except Exception as e:
raise ConfigValidationError(
f"Validation failed for field '{field}': {str(e)}")
Loading