Skip to content

[smart-meter-analysis] Convert comed meter data from csv to parquet#59

Open
griffinsharps wants to merge 28 commits intomainfrom
43-convert-coned-meter-data-from-csv-to-parquet
Open

[smart-meter-analysis] Convert comed meter data from csv to parquet#59
griffinsharps wants to merge 28 commits intomainfrom
43-convert-coned-meter-data-from-csv-to-parquet

Conversation

@griffinsharps
Copy link
Copy Markdown
Contributor

This is a project to convert the ComEd CSV files on S3 into Parquet for efficiency.

Griffin Sharps and others added 10 commits January 26, 2026 23:22
Co-authored-by: Cursor <cursoragent@cursor.com>
…onth validator

Enhance validate_month_output.py with three preflight checks needed before
scaling to full-month execution:
- Duplicate (zip_code, account_identifier, datetime) detection per batch file
- Row count reporting (total + per-file) in validation report JSON
- Run artifact integrity via --run-dir flag (plan.json, run_summary.json,
  manifests, batch summaries)

Add PREFLIGHT_200.md checklist for 200-file EC2 validation run.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…s_sorted()

Polars 1.38 removed is_sorted() from Expr. Collect the composite key first,
then check sortedness on the resulting Series which retains the method.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restores Justfile from main and adds migrate-month recipe.
Usage: just migrate-month 202307
- batch-size 100, workers 6, lazy_sink, --resume
- Reads ~/s3_paths_<YYYYMM>_full.txt, writes to /ebs/.../out_<YYYYMM>_production
- Uses bare python (no uv) for EC2 compatibility

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
.gitignore: block *.txt, .tmp/, archive_quarantine/, tmp_polars_run_*/,
subagent_packages/ from being tracked.

pre-commit: add detect-private-key hook and a local forbid-secrets hook
that blocks .env, .secrets, credentials.json, .pem, .key, .p12, .pfx,
.jks files from being committed (even via git add -f).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace hash-based duplicate detection (n_unique ~400MB/file) with
adjacent-key streaming that leverages the required global sort order.
Sortedness and uniqueness now share a single PyArrow iter_batches
pass in full mode.

Key changes:
- _streaming_sort_and_dup_check: combined sort+dup via PyArrow
  batch iteration, O(batch_size) memory, cross-file boundary state
- Per-file datetime stats with merge (_DtStats dataclass)
- Per-file DST stats with merge (_DstFileStats dataclass)
- Enhanced sample mode: strict-increasing check (catches dups in windows)
- Row counts from parquet metadata (O(1), no data scan)
- Phase-based main() architecture (discovery -> metadata -> streaming
  -> datetime -> DST -> artifacts -> report)
- _fail() typed as NoReturn for mypy narrowing
- Add pyarrow mypy override in pyproject.toml

Removed dead functions: _check_sorted_full, _validate_no_duplicates_file,
_validate_datetime_invariants_partition, _validate_dst_option_b_partition,
_keys_is_sorted_df

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Two fixes in validate_month_output.py:

1. _slice_keys: use lf.collect() instead of streaming engine for slice
   reads — streaming may reorder rows, defeating sortedness validation.
   Slices are small (5K rows x 3 cols) so default engine is correct and fast.

2. _check_sorted_sample: track prev_end and only perform cross-slice
   boundary comparison when off >= prev_end (non-overlapping). Random
   windows can overlap head/tail/each other, making boundary checks
   invalid under overlap. Within-slice strict-monotonic checks still
   run unconditionally.

Also updates remaining collect(streaming=True) calls to
collect(engine="streaming") to fix Polars deprecation warnings.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restores Justfile from main and adds migrate-month YEAR_MONTH recipe:
- Guards against non-EC2 environments (checks /ebs mount)
- Auto-generates S3 input list via aws s3 ls + awk + sort
- Validates non-empty input list before running
- Runs migrate_month_runner.py with standard production params
  (batch-size 100, workers 6, --resume, lazy_sink)

Usage: just migrate-month 202307

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@griffinsharps griffinsharps linked an issue Feb 6, 2026 that may be closed by this pull request
40 tasks
@griffinsharps griffinsharps self-assigned this Feb 6, 2026
@griffinsharps griffinsharps added the enhancement New feature or request label Feb 6, 2026
@griffinsharps griffinsharps changed the title 43 convert comed meter data from csv to parquet [smart-meter-analysis] 43 convert comed meter data from csv to parquet Feb 6, 2026
@griffinsharps griffinsharps changed the title [smart-meter-analysis] 43 convert comed meter data from csv to parquet [smart-meter-analysis] Convert comed meter data from csv to parquet Feb 6, 2026
Griffin Sharps and others added 15 commits February 6, 2026 21:41
Annotate migrate_month_runner.py, validate_month_output.py, and Justfile
with industry-standard "why" comments for senior code review. Additions
include module-level architecture docstrings, function-level design
rationale, and parameter tuning explanations. No logic changes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Refactor migrate-month to use configurable variables (S3_PREFIX,
MIGRATE_OUT_BASE, etc.) instead of hardcoded bucket names and
usernames, preparing the repo for open-source. Add six recipes:
months-from-s3, migrate-months, validate-month, validate-months,
and migration-status. Multi-month recipes support fail-fast (default)
or continue-on-error mode with per-invocation UTC-timestamped logs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alidation

Co-authored-by: Cursor <cursoragent@cursor.com>
…alidator

OOM fix: the previous validator loaded entire key columns into memory via
pl.read_parquet(), causing OOM on ~272M-row compacted files on the 8 GB EC2.

Replace with _validate_adjacent_keys_streaming() which uses
pq.ParquetFile.iter_batches() to read only the 3 key columns in fixed-size
batches (default 1M rows). Memory footprint is now O(batch_size) regardless
of file size, plus a single carry-forward prev_key tuple per batch boundary.

Validation semantics preserved: cross-boundary sort/duplicate checks plus
within-batch vectorized Polars shift(1) comparisons. No group_by anywhere.

Also adds:
- _pre_validate_no_existing_compacted(): globs canonical dir for
  compacted_*.parquet early in run_compaction(), fails loud before any
  writes unless --overwrite-compact is set
- Enriched sort_dup_validation JSON: error_location (file, file_idx,
  batch_idx, row offsets), validator_version, validator_method, batch_size
- --validation-batch-size CLI flag threaded through RunnerConfig ->
  CompactionConfig (default 1_000_000; lower = less peak memory)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
bytes_per_row is estimated from compressed on-disk Parquet, but the carry
buffer in _stream_write_chunks accumulates uncompressed in-memory rows.
For months like 202508 (~2B rows, high compression), rows_per_chunk_raw
could reach 272M, requiring 5-10 GiB RAM at peak.

Add MAX_ROWS_PER_CHUNK = 50_000_000 as a hard ceiling.  Normal months
(~50M rows) produce a single output file unchanged; dense months produce
~40 output files of ~200 MB each with ~1.5 GiB peak RSS.

Also adds rows_per_chunk_raw, rows_per_chunk_capped, and max_rows_per_chunk
to compaction_plan.json for auditability, and four tests covering the
constant value, the cap and no-cap code paths, and plan key presence.

Also adds !tests/test_*.py exception to .gitignore so that committed
unit tests in tests/ are not blocked by the scratch-file exclusion rule.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…g 1 GiB

Replace per-chunk Polars df.write_parquet() with a pq.ParquetWriter that
accumulates multiple row groups (each up to ROWS_PER_ROW_GROUP = 50M rows)
into the same output file. A new file is opened only when the on-disk size
reaches target_size_bytes after a row-group flush. This removes the need to
estimate rows_per_chunk from bytes_per_row and eliminates the MAX_ROWS_PER_CHUNK
cap, allowing true ~1 GiB output files regardless of compression ratio.

- Rename MAX_ROWS_PER_CHUNK -> ROWS_PER_ROW_GROUP (constant unchanged: 50M)
- Delete _write_compacted_chunk (superseded by flush_row_group closure)
- Rewrite _stream_write_chunks: same carry-loop, new pq.ParquetWriter rollover
- Update run_compaction: remove rows_per_chunk_raw/capped, add estimated_n_output_files
- Update compaction_plan.json keys to match

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…xport restructure script

Pipeline changes (compact_month_output.py, migrate_month_runner.py):
- Drop Hive-style year=/month= prefixes from partition directory names (YYYY/MM)
- Rename compacted output files from compacted_NNNN to part-NNNNN (5-digit, Spark convention)
- Add _write_success_marker() to write _SUCCESS.json after each compaction swap
- Update idempotency guard and pre-validation glob to match part-* pattern
- Bump batch_id zero-padding from 4 to 5 digits (batch_00000)

New script (restructure_for_export.py):
- Copies 49 months of blessed Parquet data to a clean export layout (YYYY/MM/part-NNNNN.parquet)
- Renames compacted_NNNN → part-NNNNN during copy; originals untouched
- Generates _SUCCESS.json per month from pyarrow footer metadata
- Post-copy verification: file count, byte-for-byte size, Parquet readability
- Supports --dry-run and --force flags; safety guard enforces /ebs prefix

Tests: fix _setup_month_dir to use new plain 2023/07 path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
AGENTS.md was untracked despite being the canonical agent/project guide
(CLAUDE.md is a symlink to it and stays gitignored). Also adds .cursor/
to .gitignore so Cursor IDE config never accidentally gets committed.
Griffin Sharps and others added 3 commits March 2, 2026 20:03
Root-level Python scripts belong in scripts/ per repo layout conventions.
Also fixes deprecated typing.Dict/List annotations to satisfy ruff UP035/UP006.
Adds the CODE QUALITY & TESTING section (check, test, lint, lint-fix,
format, typecheck, test-coverage) that existed on main but was absent
from this branch.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Convert ComEd meter data from csv to parquet

1 participant