[smart-meter-analysis] Convert comed meter data from csv to parquet#59
Open
griffinsharps wants to merge 28 commits intomainfrom
Open
[smart-meter-analysis] Convert comed meter data from csv to parquet#59griffinsharps wants to merge 28 commits intomainfrom
griffinsharps wants to merge 28 commits intomainfrom
Conversation
Co-authored-by: Cursor <cursoragent@cursor.com>
…onth validator Enhance validate_month_output.py with three preflight checks needed before scaling to full-month execution: - Duplicate (zip_code, account_identifier, datetime) detection per batch file - Row count reporting (total + per-file) in validation report JSON - Run artifact integrity via --run-dir flag (plan.json, run_summary.json, manifests, batch summaries) Add PREFLIGHT_200.md checklist for 200-file EC2 validation run. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…s_sorted() Polars 1.38 removed is_sorted() from Expr. Collect the composite key first, then check sortedness on the resulting Series which retains the method. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restores Justfile from main and adds migrate-month recipe. Usage: just migrate-month 202307 - batch-size 100, workers 6, lazy_sink, --resume - Reads ~/s3_paths_<YYYYMM>_full.txt, writes to /ebs/.../out_<YYYYMM>_production - Uses bare python (no uv) for EC2 compatibility Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
.gitignore: block *.txt, .tmp/, archive_quarantine/, tmp_polars_run_*/, subagent_packages/ from being tracked. pre-commit: add detect-private-key hook and a local forbid-secrets hook that blocks .env, .secrets, credentials.json, .pem, .key, .p12, .pfx, .jks files from being committed (even via git add -f). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace hash-based duplicate detection (n_unique ~400MB/file) with adjacent-key streaming that leverages the required global sort order. Sortedness and uniqueness now share a single PyArrow iter_batches pass in full mode. Key changes: - _streaming_sort_and_dup_check: combined sort+dup via PyArrow batch iteration, O(batch_size) memory, cross-file boundary state - Per-file datetime stats with merge (_DtStats dataclass) - Per-file DST stats with merge (_DstFileStats dataclass) - Enhanced sample mode: strict-increasing check (catches dups in windows) - Row counts from parquet metadata (O(1), no data scan) - Phase-based main() architecture (discovery -> metadata -> streaming -> datetime -> DST -> artifacts -> report) - _fail() typed as NoReturn for mypy narrowing - Add pyarrow mypy override in pyproject.toml Removed dead functions: _check_sorted_full, _validate_no_duplicates_file, _validate_datetime_invariants_partition, _validate_dst_option_b_partition, _keys_is_sorted_df Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Two fixes in validate_month_output.py: 1. _slice_keys: use lf.collect() instead of streaming engine for slice reads — streaming may reorder rows, defeating sortedness validation. Slices are small (5K rows x 3 cols) so default engine is correct and fast. 2. _check_sorted_sample: track prev_end and only perform cross-slice boundary comparison when off >= prev_end (non-overlapping). Random windows can overlap head/tail/each other, making boundary checks invalid under overlap. Within-slice strict-monotonic checks still run unconditionally. Also updates remaining collect(streaming=True) calls to collect(engine="streaming") to fix Polars deprecation warnings. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restores Justfile from main and adds migrate-month YEAR_MONTH recipe: - Guards against non-EC2 environments (checks /ebs mount) - Auto-generates S3 input list via aws s3 ls + awk + sort - Validates non-empty input list before running - Runs migrate_month_runner.py with standard production params (batch-size 100, workers 6, --resume, lazy_sink) Usage: just migrate-month 202307 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
40 tasks
Annotate migrate_month_runner.py, validate_month_output.py, and Justfile with industry-standard "why" comments for senior code review. Additions include module-level architecture docstrings, function-level design rationale, and parameter tuning explanations. No logic changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r-data-from-csv-to-parquet
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Refactor migrate-month to use configurable variables (S3_PREFIX, MIGRATE_OUT_BASE, etc.) instead of hardcoded bucket names and usernames, preparing the repo for open-source. Add six recipes: months-from-s3, migrate-months, validate-month, validate-months, and migration-status. Multi-month recipes support fail-fast (default) or continue-on-error mode with per-invocation UTC-timestamped logs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alidation Co-authored-by: Cursor <cursoragent@cursor.com>
…alidator OOM fix: the previous validator loaded entire key columns into memory via pl.read_parquet(), causing OOM on ~272M-row compacted files on the 8 GB EC2. Replace with _validate_adjacent_keys_streaming() which uses pq.ParquetFile.iter_batches() to read only the 3 key columns in fixed-size batches (default 1M rows). Memory footprint is now O(batch_size) regardless of file size, plus a single carry-forward prev_key tuple per batch boundary. Validation semantics preserved: cross-boundary sort/duplicate checks plus within-batch vectorized Polars shift(1) comparisons. No group_by anywhere. Also adds: - _pre_validate_no_existing_compacted(): globs canonical dir for compacted_*.parquet early in run_compaction(), fails loud before any writes unless --overwrite-compact is set - Enriched sort_dup_validation JSON: error_location (file, file_idx, batch_idx, row offsets), validator_version, validator_method, batch_size - --validation-batch-size CLI flag threaded through RunnerConfig -> CompactionConfig (default 1_000_000; lower = less peak memory) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
bytes_per_row is estimated from compressed on-disk Parquet, but the carry buffer in _stream_write_chunks accumulates uncompressed in-memory rows. For months like 202508 (~2B rows, high compression), rows_per_chunk_raw could reach 272M, requiring 5-10 GiB RAM at peak. Add MAX_ROWS_PER_CHUNK = 50_000_000 as a hard ceiling. Normal months (~50M rows) produce a single output file unchanged; dense months produce ~40 output files of ~200 MB each with ~1.5 GiB peak RSS. Also adds rows_per_chunk_raw, rows_per_chunk_capped, and max_rows_per_chunk to compaction_plan.json for auditability, and four tests covering the constant value, the cap and no-cap code paths, and plan key presence. Also adds !tests/test_*.py exception to .gitignore so that committed unit tests in tests/ are not blocked by the scratch-file exclusion rule. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…g 1 GiB Replace per-chunk Polars df.write_parquet() with a pq.ParquetWriter that accumulates multiple row groups (each up to ROWS_PER_ROW_GROUP = 50M rows) into the same output file. A new file is opened only when the on-disk size reaches target_size_bytes after a row-group flush. This removes the need to estimate rows_per_chunk from bytes_per_row and eliminates the MAX_ROWS_PER_CHUNK cap, allowing true ~1 GiB output files regardless of compression ratio. - Rename MAX_ROWS_PER_CHUNK -> ROWS_PER_ROW_GROUP (constant unchanged: 50M) - Delete _write_compacted_chunk (superseded by flush_row_group closure) - Rewrite _stream_write_chunks: same carry-loop, new pq.ParquetWriter rollover - Update run_compaction: remove rows_per_chunk_raw/capped, add estimated_n_output_files - Update compaction_plan.json keys to match Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…xport restructure script Pipeline changes (compact_month_output.py, migrate_month_runner.py): - Drop Hive-style year=/month= prefixes from partition directory names (YYYY/MM) - Rename compacted output files from compacted_NNNN to part-NNNNN (5-digit, Spark convention) - Add _write_success_marker() to write _SUCCESS.json after each compaction swap - Update idempotency guard and pre-validation glob to match part-* pattern - Bump batch_id zero-padding from 4 to 5 digits (batch_00000) New script (restructure_for_export.py): - Copies 49 months of blessed Parquet data to a clean export layout (YYYY/MM/part-NNNNN.parquet) - Renames compacted_NNNN → part-NNNNN during copy; originals untouched - Generates _SUCCESS.json per month from pyarrow footer metadata - Post-copy verification: file count, byte-for-byte size, Parquet readability - Supports --dry-run and --force flags; safety guard enforces /ebs prefix Tests: fix _setup_month_dir to use new plain 2023/07 path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
AGENTS.md was untracked despite being the canonical agent/project guide (CLAUDE.md is a symlink to it and stays gitignored). Also adds .cursor/ to .gitignore so Cursor IDE config never accidentally gets committed.
Root-level Python scripts belong in scripts/ per repo layout conventions. Also fixes deprecated typing.Dict/List annotations to satisfy ruff UP035/UP006.
Adds the CODE QUALITY & TESTING section (check, test, lint, lint-fix, format, typecheck, test-coverage) that existed on main but was absent from this branch.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is a project to convert the ComEd CSV files on S3 into Parquet for efficiency.