|
| 1 | +import json |
| 2 | +from functools import partial |
| 3 | +from typing import Any, Literal, Sequence, cast |
| 4 | + |
| 5 | +import chz |
| 6 | +from datasets import Dataset, concatenate_datasets, load_dataset |
| 7 | + |
| 8 | +import tinker |
| 9 | +from tinker_cookbook.recipes.code_rl.code_grading import ( |
| 10 | + extract_code_from_model, |
| 11 | + sandbox_check_correctness, |
| 12 | + taco_to_lcb_format, |
| 13 | +) |
| 14 | +from tinker_cookbook.recipes.code_rl.lcb_utils import fetch_live_code_bench_system_prompt |
| 15 | +from tinker_cookbook import renderers |
| 16 | +from tinker_cookbook.rl.problem_env import ProblemEnv, ProblemGroupBuilder, logger |
| 17 | +from tinker_cookbook.rl.types import ( |
| 18 | + Action, |
| 19 | + EnvGroupBuilder, |
| 20 | + RLDataset, |
| 21 | + RLDatasetBuilder, |
| 22 | + StepResult, |
| 23 | +) |
| 24 | +from tinker_cookbook.tokenizer_utils import get_tokenizer |
| 25 | +from tinker_cookbook.utils import logtree |
| 26 | + |
| 27 | + |
| 28 | +def _load_deepcoder_split(split: Literal["train", "test"]) -> Dataset: |
| 29 | + if split == "train": |
| 30 | + datasets = [ |
| 31 | + cast( |
| 32 | + Dataset, |
| 33 | + load_dataset("agentica-org/DeepCoder-Preview-Dataset", name=name, split="train"), |
| 34 | + ) |
| 35 | + for name in ("primeintellect", "taco", "lcbv5") |
| 36 | + ] |
| 37 | + elif split == "test": |
| 38 | + datasets = [ |
| 39 | + cast( |
| 40 | + Dataset, |
| 41 | + load_dataset("agentica-org/DeepCoder-Preview-Dataset", name=name, split="test"), |
| 42 | + ) |
| 43 | + for name in ("codeforces", "lcbv5") |
| 44 | + ] |
| 45 | + return cast(Dataset, concatenate_datasets(datasets)) |
| 46 | + |
| 47 | + |
| 48 | +def _ensure_dict(metadata: Any) -> dict[str, Any]: |
| 49 | + if isinstance(metadata, str): |
| 50 | + try: |
| 51 | + metadata = json.loads(metadata) |
| 52 | + except json.JSONDecodeError: |
| 53 | + logger.warning("Failed to deserialize metadata: %s", metadata) |
| 54 | + return {} |
| 55 | + if isinstance(metadata, dict): |
| 56 | + return metadata |
| 57 | + return {} |
| 58 | + |
| 59 | + |
| 60 | +def _normalize_tests(raw_tests: Any, metadata: dict[str, Any]) -> list[dict[str, Any]]: |
| 61 | + tests = raw_tests |
| 62 | + if isinstance(tests, str): |
| 63 | + try: |
| 64 | + tests = json.loads(tests) |
| 65 | + except json.JSONDecodeError: |
| 66 | + logger.warning("Failed to deserialize tests. Dropping sample.") |
| 67 | + return [] |
| 68 | + if isinstance(tests, dict) and "inputs" in tests and "outputs" in tests: |
| 69 | + tests = taco_to_lcb_format(tests) |
| 70 | + if isinstance(tests, dict): |
| 71 | + tests = [tests] |
| 72 | + |
| 73 | + normalized: list[dict[str, Any]] = [] |
| 74 | + for test in tests or []: |
| 75 | + if not isinstance(test, dict): |
| 76 | + continue |
| 77 | + testtype = test.get("testtype") or "stdin_stdout" |
| 78 | + test_metadata = _ensure_dict(test.get("metadata", {})) |
| 79 | + if testtype == "functional": |
| 80 | + func_name = test_metadata.get("func_name") or metadata.get("func_name") |
| 81 | + if func_name is not None: |
| 82 | + test_metadata["func_name"] = str(func_name) |
| 83 | + normalized.append( |
| 84 | + { |
| 85 | + "input": str(test.get("input", "")), |
| 86 | + "output": str(test.get("output", "")), |
| 87 | + "testtype": testtype, |
| 88 | + "metadata": test_metadata or {"func_name": None}, |
| 89 | + } |
| 90 | + ) |
| 91 | + return normalized |
| 92 | + |
| 93 | + |
| 94 | +def _build_question(example: dict[str, Any]) -> str | None: |
| 95 | + # Prefer preprocessed question if available. |
| 96 | + question = example.get("question") or example.get("prompt") or example.get("problem") |
| 97 | + if not isinstance(question, str) or not question.strip(): |
| 98 | + return None |
| 99 | + starter_code = example.get("starter_code") |
| 100 | + if isinstance(starter_code, str) and starter_code.strip(): |
| 101 | + return fetch_live_code_bench_system_prompt(question, starter_code) |
| 102 | + return fetch_live_code_bench_system_prompt(question) |
| 103 | + |
| 104 | + |
| 105 | +class CodeEnv(ProblemEnv): |
| 106 | + def __init__( |
| 107 | + self, |
| 108 | + problem: str, |
| 109 | + tests: list[dict[str, Any]], |
| 110 | + renderer: renderers.Renderer, |
| 111 | + convo_prefix: list[renderers.Message] | None = None, |
| 112 | + format_coef: float = 0.1, |
| 113 | + reward_timeout: int = 6, |
| 114 | + ): |
| 115 | + super().__init__(renderer, convo_prefix, format_coef=format_coef) |
| 116 | + self.problem = problem |
| 117 | + self.tests = tests |
| 118 | + self.reward_timeout = reward_timeout |
| 119 | + |
| 120 | + def get_question(self) -> str: |
| 121 | + return self.problem |
| 122 | + |
| 123 | + def check_format(self, sample_str: str) -> bool: |
| 124 | + return extract_code_from_model(sample_str) is not None |
| 125 | + |
| 126 | + def check_answer(self, sample_str: str) -> bool: |
| 127 | + """Not used - CodeEnv uses async check_sandbox_correctness instead.""" |
| 128 | + return False |
| 129 | + |
| 130 | + async def check_sandbox_correctness(self, sample_str: str) -> bool: |
| 131 | + """Check if the code passes all test cases using sandbox execution.""" |
| 132 | + code = extract_code_from_model(sample_str) |
| 133 | + if code is None: |
| 134 | + logtree.log_text("No code block detected in response.") |
| 135 | + return False |
| 136 | + |
| 137 | + try: |
| 138 | + success, details = await sandbox_check_correctness( |
| 139 | + self.tests, code, timeout=self.reward_timeout |
| 140 | + ) |
| 141 | + status = "✓" if success else "✗" |
| 142 | + logtree.log_text( |
| 143 | + f"Sandbox result {status}: {'All tests passed' if success else 'Failed'}" |
| 144 | + ) |
| 145 | + return success |
| 146 | + except Exception as exc: |
| 147 | + logger.warning("Sandbox check failed: %s", exc, exc_info=True) |
| 148 | + logtree.log_text(f"Sandbox check failed: {exc}") |
| 149 | + return False |
| 150 | + |
| 151 | + def get_reference_answer(self) -> str: |
| 152 | + return "" |
| 153 | + |
| 154 | + async def step(self, action: Action) -> StepResult: |
| 155 | + message, parse_success = self.renderer.parse_response(action) |
| 156 | + content = message["content"] |
| 157 | + format_ok_bool = bool(parse_success) and self.check_format(content) |
| 158 | + correct_answer_bool = await self.check_sandbox_correctness(content) |
| 159 | + format_score = float(format_ok_bool) |
| 160 | + correct_score = float(correct_answer_bool) |
| 161 | + total_reward = self.format_coef * (format_score - 1.0) + correct_score |
| 162 | + |
| 163 | + logtree.log_text(f"Problem: {self.get_question()}") |
| 164 | + logtree.log_text(f"Response: {content}") |
| 165 | + if reference := self.get_reference_answer(): |
| 166 | + logtree.log_text(f"Reference Answer: {reference}") |
| 167 | + logtree.log_text( |
| 168 | + f"Format Valid: {'✓' if format_ok_bool else '✗'}, " |
| 169 | + f"Correct: {'✓' if correct_answer_bool else '✗'}, " |
| 170 | + f"Reward: {total_reward:.2f}" |
| 171 | + ) |
| 172 | + |
| 173 | + return StepResult( |
| 174 | + reward=total_reward, |
| 175 | + episode_done=True, |
| 176 | + next_observation=tinker.ModelInput.empty(), |
| 177 | + next_stop_condition=self.stop_condition, |
| 178 | + metrics={ |
| 179 | + "format": format_score, |
| 180 | + "correct": correct_score, |
| 181 | + }, |
| 182 | + ) |
| 183 | + |
| 184 | + |
| 185 | +class DeepcoderDataset(RLDataset): |
| 186 | + def __init__( |
| 187 | + self, |
| 188 | + batch_size: int, |
| 189 | + group_size: int, |
| 190 | + renderer: renderers.Renderer, |
| 191 | + convo_prefix: list[renderers.Message] | None = None, |
| 192 | + split: Literal["train", "test"] = "train", |
| 193 | + seed: int = 0, |
| 194 | + format_coef: float = 0.1, |
| 195 | + reward_timeout: int = 6, |
| 196 | + ): |
| 197 | + self.ds = _load_deepcoder_split(split) |
| 198 | + if split == "train": |
| 199 | + self.ds = self.ds.shuffle(seed=seed) |
| 200 | + self.batch_size = batch_size |
| 201 | + self.group_size = group_size if split == "train" else 1 |
| 202 | + self.renderer = renderer |
| 203 | + self.convo_prefix = convo_prefix |
| 204 | + self.format_coef = format_coef |
| 205 | + self.reward_timeout = reward_timeout |
| 206 | + |
| 207 | + def __len__(self) -> int: |
| 208 | + return (len(self.ds) + self.batch_size - 1) // self.batch_size |
| 209 | + |
| 210 | + def get_batch(self, index: int) -> Sequence[EnvGroupBuilder]: |
| 211 | + start = index * self.batch_size |
| 212 | + end = min((index + 1) * self.batch_size, len(self.ds)) |
| 213 | + if start >= end: |
| 214 | + raise IndexError("Incorrect batch index for DeepcoderDataset") |
| 215 | + builders: list[EnvGroupBuilder] = [] |
| 216 | + for row in self.ds.select(range(start, end)): |
| 217 | + builder = self._make_env_group_builder(cast(dict[str, Any], row), self.group_size) |
| 218 | + if builder is not None: |
| 219 | + builders.append(builder) |
| 220 | + return builders |
| 221 | + |
| 222 | + def _make_env_group_builder( |
| 223 | + self, example: dict[str, Any], group_size: int |
| 224 | + ) -> ProblemGroupBuilder | None: |
| 225 | + metadata = _ensure_dict(example.get("metadata", {})) |
| 226 | + tests = _normalize_tests(example.get("tests") or example.get("ground_truth"), metadata) |
| 227 | + if not tests: |
| 228 | + logger.warning("Skipping sample without valid tests.") |
| 229 | + return None |
| 230 | + question = _build_question(example) |
| 231 | + if question is None: |
| 232 | + logger.warning("Skipping sample without question text.") |
| 233 | + return None |
| 234 | + return ProblemGroupBuilder( |
| 235 | + env_thunk=partial( |
| 236 | + CodeEnv, |
| 237 | + question, |
| 238 | + tests, |
| 239 | + self.renderer, |
| 240 | + convo_prefix=self.convo_prefix, |
| 241 | + format_coef=self.format_coef, |
| 242 | + reward_timeout=self.reward_timeout, |
| 243 | + ), |
| 244 | + num_envs=group_size, |
| 245 | + dataset_name="deepcoder", |
| 246 | + ) |
| 247 | + |
| 248 | + |
| 249 | +@chz.chz |
| 250 | +class DeepcoderDatasetBuilder(RLDatasetBuilder): |
| 251 | + batch_size: int |
| 252 | + model_name_for_tokenizer: str |
| 253 | + renderer_name: str |
| 254 | + group_size: int |
| 255 | + convo_prefix: list[renderers.Message] | None = None |
| 256 | + seed: int = 0 |
| 257 | + format_coef: float = 0.1 |
| 258 | + reward_timeout: int = 6 |
| 259 | + |
| 260 | + async def __call__(self) -> tuple[DeepcoderDataset, DeepcoderDataset]: |
| 261 | + tokenizer = get_tokenizer(self.model_name_for_tokenizer) |
| 262 | + renderer = renderers.get_renderer(self.renderer_name, tokenizer=tokenizer) |
| 263 | + train_ds = DeepcoderDataset( |
| 264 | + batch_size=self.batch_size, |
| 265 | + group_size=self.group_size, |
| 266 | + renderer=renderer, |
| 267 | + convo_prefix=self.convo_prefix, |
| 268 | + split="train", |
| 269 | + seed=self.seed, |
| 270 | + format_coef=self.format_coef, |
| 271 | + reward_timeout=self.reward_timeout, |
| 272 | + ) |
| 273 | + test_ds = DeepcoderDataset( |
| 274 | + batch_size=self.batch_size, |
| 275 | + group_size=1, |
| 276 | + renderer=renderer, |
| 277 | + convo_prefix=self.convo_prefix, |
| 278 | + split="test", |
| 279 | + seed=self.seed, |
| 280 | + format_coef=self.format_coef, |
| 281 | + reward_timeout=self.reward_timeout, |
| 282 | + ) |
| 283 | + return train_ds, test_ds |
0 commit comments