Skip to content

Commit 22b3fe7

Browse files
authored
add codepipeline (#142)
1 parent 9f16ab4 commit 22b3fe7

File tree

2 files changed

+635
-0
lines changed

2 files changed

+635
-0
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
---
2+
3+
title: Code Data Synthesis Pipeline
4+
icon: mdi:code-braces
5+
createTime: 2025/11/21 18:59:51
6+
permalink: /en/guide/codepipeline/
7+
8+
---
9+
10+
# Code Data Synthesis and Filtering Pipeline
11+
12+
## 1. Overview
13+
14+
The code pipeline is designed to process different types of code data, including pretraining code corpora, instruction fine-tuning data, and general code generation datasets. Functionally, it can be divided into three categories:
15+
16+
1. **Pretraining Code Filtering Pipeline**: Applies heuristic and statistical filters to raw pretraining code, removing auto-generated, low‑quality, or unexpected file types to obtain a high‑quality pretraining corpus.
17+
2. **Code SFT Synthesis Pipeline**: Uses existing code as seeds, generates natural‑language instructions from code, then regenerates code from the instructions, combined with quality evaluation and sandbox execution to construct instruction–code pairs for code instruction fine‑tuning.
18+
3. **Code Generation Dataset Pipeline**: Takes dialogue messages or templates as input, enhances them into high‑quality instructions, generates corresponding code, and filters samples via quality scoring and sandbox execution to build datasets for code generation tasks.
19+
20+
All three pipelines follow the same design principle of **unified data storage + composable operators**:
21+
22+
- **Unified storage (`FileStorage`)**: All intermediate results are written to cache files, which makes debugging and resume‑from‑checkpoint easier.
23+
- **Decoupled operators**: Each step is an independent `Operator` that you can add, remove, reorder, or replace as needed.
24+
- **Multiple use cases**:
25+
- Pretraining code corpus cleaning (CPU‑only, no LLM required)
26+
- API‑based synthesis of instruction–code pairs for SFT
27+
- Enhanced instruction & code generation for code‑centric datasets
28+
29+
The following sections describe the inputs, core operators, and logic for each pipeline.
30+
31+
32+
## 2. Quick Start
33+
34+
### Step 1: Install DataFlow
35+
36+
```shell
37+
pip install open-dataflow
38+
```
39+
40+
### Step 2: Create a new working directory
41+
42+
```shell
43+
mkdir run_dataflow
44+
cd run_dataflow
45+
```
46+
47+
### Step 3: Initialize DataFlow
48+
49+
```shell
50+
dataflow init
51+
```
52+
53+
After initialization, the `run_dataflow` directory will contain several example pipelines (paths may differ slightly). For this document, the relevant ones are:
54+
55+
```text
56+
run_dataflow/pipelines/api_pipelines/code_gen_dataset_pipeline.py
57+
run_dataflow/pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
58+
run_dataflow/pipelines/cpu_pipelines/code_pt_filter.py
59+
```
60+
61+
### Step 4: Configure API key and LLM serving (API pipelines only)
62+
63+
For Linux and macOS:
64+
65+
```shell
66+
export DF_API_KEY="sk-xxxxx"
67+
```
68+
69+
For Windows (PowerShell):
70+
71+
```powershell
72+
$env:DF_API_KEY = "sk-xxxxx"
73+
```
74+
75+
In code‑based API pipelines, `APILLMServing_request` is typically configured as:
76+
77+
```python
78+
self.llm_serving = APILLMServing_request(
79+
api_url="https://api.openai.com/v1/chat/completions",
80+
model_name="gpt-4o",
81+
max_workers=10, # or 100, depending on the pipeline
82+
)
83+
```
84+
85+
86+
### Step 5: Run the example pipelines
87+
88+
From the `run_dataflow` directory you can run any of the following:
89+
90+
```bash
91+
# 1) Pretraining code filtering (CPU)
92+
python pipelines/cpu_pipelines/code_pt_filter.py
93+
94+
# 2) Synthesize SFT instruction–code pairs from existing code
95+
python pipelines/api_pipelines/code_code_to_sft_data_pipeline.py
96+
97+
# 3) Generate a code dataset from raw conversations / templates
98+
python pipelines/api_pipelines/code_gen_dataset_pipeline.py
99+
```
100+
101+
## 3. Data Flow and Pipeline Logic
102+
103+
### 1. **Common Inputs and FileStorage**
104+
105+
All three pipelines use `FileStorage` to manage input and cached data, differing only in their default input files:
106+
107+
- **PTCodeFilter_CPUPipeline**
108+
- `first_entry_file_name="../example_data/CodePipeline/code_input.jsonl"`
109+
- **CodeSFTSynthesis_APIPipeline**
110+
- `first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl"`
111+
- **CodeGenDataset_APIPipeline**
112+
- `first_entry_file_name="../example_data/CodePipeline/raw_code.jsonl"`
113+
114+
In real‑world usage, you only need to replace these example paths with your own JSON/JSONL files; the rest of the operators can remain unchanged.
115+
116+
---
117+
118+
### 2. **Pretraining Code Filtering Pipeline (PTCodeFilter_CPUPipeline)**
119+
120+
This pipeline (in `pipelines/cpu_pipelines/code_pt_filter.py`) is intended for **multi‑dimensional filtering and quality assessment of large‑scale code corpora without using any LLMs**.
121+
The default input typically provides:
122+
123+
- **`lines`**: Code split into lines, used for line‑level and length‑related filters.
124+
- **`text`**: Full code text, used for composition analysis, encoded‑data detection, and document‑level quality checks.
125+
- **`dataframe`**: Structured metadata such as file path and language type, used for file‑type filtering.
126+
127+
The main operators are:
128+
129+
1. **Auto‑generated code filter: `CodeAutoGeneratedFilter`**
130+
- **Parameters**: `min_score=1.0, max_score=1.0`
131+
- **Input**: `lines`
132+
- **Output key**: `autogen_filter_label`
133+
- **Function**: Detects and filters code that appears to be automatically generated (e.g., large boilerplate sections, generator markers).
134+
135+
2. **Code length filter: `CodeLengthSampleFilter`**
136+
- **Parameters**: `min_score=1.0, max_score=1.0`
137+
- **Input**: `lines`
138+
- **Output key**: `length_filter_label`
139+
- **Function**: Filters samples that are unusually short or long based on line/character counts.
140+
141+
3. **Text composition filter: `CodeTextCompositionFilter`**
142+
- **Parameters**: `min_score=1.0, max_score=1.0`
143+
- **Input**: `text`
144+
- **Output key**: `text_composition_filter_label`
145+
- **Function**: Filters samples whose character composition is abnormal (e.g., too much non‑code content).
146+
147+
4. **Encoded‑data filter: `CodeEncodedDataFilter`**
148+
- **Parameters**: `min_score=1.0, max_score=1.0`
149+
- **Input**: `text`
150+
- **Output key**: `encoded_data_filter_label`
151+
- **Function**: Detects and filters large blocks of encoded data, such as Base64 or long hexadecimal blobs.
152+
153+
5. **Document quality filter: `CodeDocumentQualityFilter`**
154+
- **Parameters**: `min_score=1.0, max_score=1.0`
155+
- **Thresholds** (example configuration):
156+
- `min_num_chars=100`, `max_num_chars=100000`
157+
- `min_num_words=10`, `max_num_words=50000`
158+
- `max_frac_duplicate_lines=0.3`
159+
- `max_frac_duplicate_2gram~5gram=0.3`
160+
- `max_frac_curly_bracket=0.1`
161+
- `max_frac_all_caps_words=0.3`
162+
- `min_entropy_unigram=2.0`
163+
- **Input**: `text`
164+
- **Output key**: `doc_quality_filter_label`
165+
- **Function**: Combines length, redundancy, and entropy metrics to filter low‑quality code documents.
166+
167+
6. **File‑type content filter: `CodeFileTypeContentFilter`**
168+
- **Input**: `dataframe`
169+
- **Output key**: `file_type_filter_label`
170+
- **Function**: Filters out files whose type or content does not match the desired set (e.g., non‑code or unwanted languages).
171+
172+
7. **Generic score filter (optional): `CodeGenericScoreFilter`**
173+
- In the example code this is commented out; you can aggregate multiple scores into a single field (e.g., `quality_score`) and then apply this operator for a unified threshold.
174+
175+
---
176+
177+
### 3. **Code SFT Synthesis Pipeline (CodeSFTSynthesis_APIPipeline)**
178+
179+
This pipeline (in `pipelines/api_pipelines/code_code_to_sft_data_pipeline.py`) is used to **synthesize high‑quality instruction–code pairs from existing code**, suitable for instruction fine‑tuning of code models.
180+
181+
#### 3.1 Input and storage
182+
183+
- Default input file: `../example_data/CodePipeline/code_synthesis_input.jsonl`
184+
- Typical fields:
185+
- **`raw_code`**: Raw code snippets that serve as seeds for synthesis.
186+
187+
`FileStorage` writes each step’s outputs to separate cache files, enabling debugging and incremental reruns.
188+
189+
#### 3.2 Main operators and data flow
190+
191+
1. **Code → Instruction: `CodeCodeToInstructionGenerator`**
192+
- **Input**: `raw_code`
193+
- **Output key**: `generated_instruction`
194+
- **Function**: Uses an LLM to generate natural‑language instructions or task descriptions from code, yielding human‑readable SFT instructions that are tightly aligned with the code.
195+
196+
2. **Instruction → Code: `CodeInstructionToCodeGenerator`**
197+
- **Input**: `generated_instruction`
198+
- **Output key**: `generated_code`
199+
- **Function**: Regenerates code from the instructions, both validating instruction clarity and producing instruction–code pairs.
200+
201+
3. **Quality evaluation: `CodeQualitySampleEvaluator`**
202+
- **Inputs**: `generated_instruction`, `generated_code`
203+
- **Function**: Uses an LLM to evaluate consistency, clarity, and executability, and outputs quality scores and feedback.
204+
205+
4. **Score‑based filtering: `CodeQualityScoreFilter`**
206+
- **Parameters**: `min_score=0.0`, `max_score=10.0`
207+
- **Inputs**: `generated_instruction`, `generated_code`
208+
- **Output key**: `quality_score_filter_label`
209+
- **Function**: Tags or filters out low‑quality samples based on the evaluation scores.
210+
211+
5. **Sandbox execution: `CodeSandboxSampleEvaluator`**
212+
- **Parameters**: `language='python'`
213+
- **Input**: `generated_code`
214+
- **Function**: Executes code in an isolated environment to detect syntax errors or obvious runtime issues.
215+
216+
---
217+
218+
### 4. **Code Generation Dataset Pipeline (CodeGenDataset_APIPipeline)**
219+
220+
This pipeline (in `pipelines/api_pipelines/code_gen_dataset_pipeline.py`) is geared towards **constructing high‑quality “instruction + code” datasets from raw conversations or template messages**.
221+
222+
#### 4.1 Input and storage
223+
224+
- Default input file: `../example_data/CodePipeline/raw_code.jsonl`
225+
- Typical fields:
226+
- **`messages`**: Raw dialogue turns, templates, or coarse‑grained task descriptions used as seeds for instruction enhancement.
227+
228+
#### 4.2 Main operators and data flow
229+
230+
1. **Instruction enhancement: `CodeEnhancementInstructionGenerator`**
231+
- **Input**: `messages`
232+
- **Output key**: `generated_instruction`
233+
- **Function**: Converts rough inputs (e.g., chat history, short prompts) into clear, structured, and high‑quality instructions suitable for code generation.
234+
235+
2. **Instruction → Code: `CodeInstructionToCodeGenerator`**
236+
- **Input**: `generated_instruction`
237+
- **Output key**: `generated_code`
238+
- **Function**: Generates code from instructions to form instruction–code samples.
239+
240+
3. **Quality evaluation: `CodeQualitySampleEvaluator`**
241+
- **Inputs**: `generated_instruction`, `generated_code`
242+
- **Function**: Evaluates instruction–code consistency, completeness, and overall quality.
243+
244+
4. **High‑score filtering: `CodeQualityScoreFilter`**
245+
- **Parameters**: `min_score=7.0`, `max_score=10.0`
246+
- **Inputs**: `generated_instruction`, `generated_code`
247+
- **Output keys**:
248+
- `quality_score`: Numeric quality score
249+
- `quality_feedback`: Textual feedback from the LLM
250+
- `quality_score_filter_label`: Pass/fail label
251+
- **Function**: Keeps only high‑quality samples that are directly usable for training or evaluation.
252+
253+
5. **Sandbox execution: `CodeSandboxSampleEvaluator`**
254+
- **Parameters**: `language='python'`
255+
- **Input**: `generated_code`
256+
- **Function**: Verifies that generated code is syntactically and (in simple cases) semantically valid.
257+
258+
259+
## 4. Pipeline Example
260+
261+
The following example shows how to initialize a code SFT synthesis pipeline and sequentially run the main operators.
262+
263+
```python
264+
class CodeSFTSynthesis_APIPipeline:
265+
def __init__(self, llm_serving: LLMServingABC | None = None):
266+
self.storage = FileStorage(
267+
first_entry_file_name="../example_data/CodePipeline/code_synthesis_input.jsonl",
268+
cache_path="./cache",
269+
file_name_prefix="dataflow_cache_step",
270+
cache_type="jsonl",
271+
)
272+
273+
self.llm_serving = llm_serving or APILLMServing_request(
274+
api_url="https://api.openai.com/v1/chat/completions",
275+
model_name="gpt-4o",
276+
max_workers=100,
277+
)
278+
279+
self.instruction_synthesizer_step1 = CodeCodeToInstructionGenerator(llm_serving=self.llm_serving)
280+
self.code_generator_step2 = CodeInstructionToCodeGenerator(llm_serving=self.llm_serving)
281+
self.pair_evaluator_step3 = CodeQualitySampleEvaluator(llm_serving=self.llm_serving)
282+
self.score_filter_step4 = CodeQualityScoreFilter(
283+
llm_serving=self.llm_serving,
284+
min_score=0.0,
285+
max_score=10.0,
286+
)
287+
self.sandbox_evaluator_step5 = CodeSandboxSampleEvaluator(language="python")
288+
289+
def forward(self):
290+
self.instruction_synthesizer_step1.run(
291+
storage=self.storage.step(),
292+
input_key="raw_code",
293+
output_key="generated_instruction",
294+
)
295+
self.code_generator_step2.run(
296+
storage=self.storage.step(),
297+
input_key="generated_instruction",
298+
output_key="generated_code",
299+
)
300+
self.pair_evaluator_step3.run(
301+
storage=self.storage.step(),
302+
input_instruction_key="generated_instruction",
303+
input_code_key="generated_code",
304+
)
305+
self.score_filter_step4.run(
306+
storage=self.storage.step(),
307+
input_instruction_key="generated_instruction",
308+
input_code_key="generated_code",
309+
output_key="quality_score_filter_label",
310+
)
311+
self.sandbox_evaluator_step5.run(
312+
storage=self.storage.step(),
313+
input_key="generated_code",
314+
)
315+
316+
317+
if __name__ == "__main__":
318+
pl = CodeSFTSynthesis_APIPipeline()
319+
pl.forward()
320+
```
321+
322+

0 commit comments

Comments
 (0)