AEMO Data Snippets
+Dividing large AEMO Data CSVs into parquet partitions
+This script can be run via the command line to divide a large AEMO data CSV (e.g. from the Monthly Data Archive, such as rebids in BIDPEROFFER) into Parquet partitions. This is advantageous for using packages such as Dask to analyse such data.
+It assumes that the first row of the table is the header (i.e. columns) for a single data table.
+Requirements
+Written using Python 3.11. Uses pathlib
and type annotations, so probably need at least Python > 3.5.
# Python script (executable via CLI) to create parquet partitions
+# for large AEMO data CSVs. Assumes first line is table header and that only one table
+# type is in the file
+#
+# Copyright (C) 2023 Abhijith Prakash
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import argparse
+import logging
+from pathlib import Path
+
+import pandas as pd
+from tqdm import tqdm
+
+
+def arg_parser():
+= (
+ description "Chunk large monthly AEMO data table CSVs into parquet partitions. "
+ + "Assumes that the table header is in the 2nd row"
+
+ )= argparse.ArgumentParser(description=description)
+ parser
+ parser.add_argument("-file", type=str, required=True, help=("File to process. Must be CSV")
+
+ )
+ parser.add_argument("-output_dir",
+ type=str,
+ =True,
+ requiredhelp=(
+ "Directory to write parquet chunks to. Will be created if it does not exist"
+
+ ),
+ )
+ parser.add_argument("-chunksize",
+ type=int,
+ =10**6,
+ defaulthelp=("Size of each DataFrame chunk (# of lines). Default 10^6"),
+
+ )= parser.parse_args()
+ args return args
+
+
+def get_columns(file_path: Path) -> pd.Index:
+= pd.read_csv(file_path, header=1, nrows=0)
+ col_df return col_df.columns
+
+
+def estimate_size_of_lines(file_path: Path, columns=pd.Index) -> float:
+= 1000
+ sample_size = pd.read_csv(file_path, skiprows=2, nrows=sample_size, header=None)
+ sample = columns
+ sample.columns = sample.memory_usage().sum()
+ total_size = total_size / len(sample)
+ size_per_line return size_per_line
+
+
+def chunk_file(file_path: Path, output_dir: Path, chunksize: int) -> None:
+if not file_path.suffix.lower() == ".csv":
+ "File is not a CSV")
+ logging.error(
+ exit()= get_columns(file_path)
+ cols = estimate_size_of_lines(file_path, cols)
+ size_per_line = file_path.stat().st_size
+ file_size = file_path.stem
+ file_stem with pd.read_csv(file_path, chunksize=chunksize, skiprows=2, header=None) as reader:
+ with tqdm(total=file_size, desc="Progress estimate based on file size") as pbar:
+ for i, chunk in enumerate(reader):
+ = cols
+ chunk.columns = Path(file_stem + f"_chunk{i}.parquet")
+ out_file / out_file)
+ chunk.to_parquet(output_dir # See here for comparison of pandas DataFrame size vs CSV size:
+ # https://stackoverflow.com/questions/18089667/how-to-estimate-how-much-memory-a-pandas-dataframe-will-need#32970117
+ * chunksize) / 2)
+ pbar.update((size_per_line
+
+def main():
+format="\n%(levelname)s:%(message)s", level=logging.INFO)
+ logging.basicConfig(= arg_parser()
+ args = Path(args.file)
+ f = Path(args.output_dir)
+ output_dir if not output_dir.exists():
+ =True)
+ output_dir.mkdir(parentselif len(sorted(output_dir.glob(f.stem + "*.parquet"))) > 1:
+ "Pre-existing chunks of this file in output directory. Exiting.")
+ logging.error(
+ exit()if not f.exists():
+ "Path does not exist")
+ logging.error(
+ exit()if not f.is_file():
+ "Path provided does not point to a file")
+ logging.error(
+ exit()
+ chunk_file(f, output_dir, args.chunksize)
+
+if __name__ == "__main__":
+ main()