From 3e959914e99fe120414c1505fe600a7329df93c4 Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Tue, 15 Apr 2025 22:00:38 -0600 Subject: [PATCH 1/6] added tests --- Cargo.lock | 19 ++ Cargo.toml | 1 + src/bin/main.rs | 6 +- tests/snapshots/test__split_stdout.snap.new | 7 + tests/test.rs | 297 ++++++++++++++++++++ 5 files changed, 327 insertions(+), 3 deletions(-) create mode 100644 tests/snapshots/test__split_stdout.snap.new diff --git a/Cargo.lock b/Cargo.lock index 5fa5b18..bf50f52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1274,6 +1274,7 @@ dependencies = [ "assert_cmd", "datafusion", "insta", + "predicates", "structopt", "tempfile", "thiserror 2.0.12", @@ -1374,6 +1375,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" +dependencies = [ + "num-traits", +] + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2002,6 +2012,12 @@ dependencies = [ "adler2", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "num" version = "0.4.3" @@ -2305,7 +2321,10 @@ checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" dependencies = [ "anstyle", "difflib", + "float-cmp", + "normalize-line-endings", "predicates-core", + "regex", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 076744c..30361a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ datafusion = {version = "46.0.1", features = ["avro"]} structopt = "0.3.26" thiserror = "2.0.12" tokio = { version="1.44.2", features = ["rt-multi-thread"]} +predicates = "3.1.3" [dev-dependencies] assert_cmd = "2" diff --git a/src/bin/main.rs b/src/bin/main.rs index 710c45f..f69ec09 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -86,7 +86,7 @@ pub enum Commands { #[structopt(short,long)] chunks: usize, #[structopt(parse(from_os_str))] - output_dir: Option, + output: Option, }, #[structopt(about = "Concatenate multiple files or all files in a directory")] @@ -131,8 +131,8 @@ async fn main() -> Result<(), DfKitError> { Commands::Reverse { filename, output } => { reverse(&ctx, &filename, output).await?; } - Commands::Split { filename, chunks, output_dir} => { - let out_dir = output_dir.unwrap_or_else(|| env::current_dir().unwrap()); + Commands::Split { filename, chunks, output} => { + let out_dir = output.unwrap_or_else(|| env::current_dir().unwrap()); dfsplit(&ctx, &filename, chunks, &out_dir).await?; } Commands::Cat { files, dir, output } => { diff --git a/tests/snapshots/test__split_stdout.snap.new b/tests/snapshots/test__split_stdout.snap.new new file mode 100644 index 0000000..7e7cca1 --- /dev/null +++ b/tests/snapshots/test__split_stdout.snap.new @@ -0,0 +1,7 @@ +--- +source: tests/test.rs +assertion_line: 335 +expression: "String::from_utf8(output).unwrap()" +--- +Written chunk 1 to /var/folders/w1/2v_jhxb96_dgnjv2nmk2wwnw0000gn/T/.tmpn4qpqD/out/data_1.csv +Written chunk 2 to /var/folders/w1/2v_jhxb96_dgnjv2nmk2wwnw0000gn/T/.tmpn4qpqD/out/data_2.csv diff --git a/tests/test.rs b/tests/test.rs index 33fe3ed..5057028 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -3,6 +3,7 @@ use tempfile::tempdir; use std::fs; use std::path::Path; use insta::assert_snapshot; +use dfkit::utils::parse_file_list; fn write_temp_file(dir: &Path, name: &str, contents: &str) -> std::path::PathBuf { let file_path = dir.join(name); @@ -41,3 +42,299 @@ fn test_view_with_limit() { +-------+-----+ "); } + +#[test] +fn test_query() { + let temp = tempdir().unwrap(); + let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args(["query", input.to_str().unwrap(), "--sql", "SELECT * FROM t WHERE age > 35"]); + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +------+-----+ + | name | age | + +------+-----+ + | bob | 40 | + +------+-----+ + "); +} + +#[test] +fn test_query_with_output() { + let temp = tempdir().unwrap(); + let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let output_path = temp.path().join("out.csv"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "query", + input.to_str().unwrap(), + "--sql", + "SELECT * FROM t WHERE age > 35", + "--output", + output_path.to_str().unwrap(), + ]); + + cmd.assert().success(); + + assert!(output_path.exists(), "Output file was not created"); +} + +#[test] +fn test_convert_csv_to_json() { + let temp = tempdir().unwrap(); + let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let output = temp.path().join("output.json"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert().success(); + + let output_contents = fs::read_to_string(&output).unwrap(); + assert!(output_contents.contains("\"name\":\"alice\"") || output_contents.contains("alice")); // Loose check +} + +#[test] +fn test_convert_csv_to_parquet() { + let temp = tempdir().unwrap(); + let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let output = temp.path().join("output.parquet"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert().success(); + assert!(output.exists(), "Parquet file not created"); +} + +#[test] +fn test_convert_to_avro_should_fail() { + let temp = tempdir().unwrap(); + let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let output = temp.path().join("output.avro"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert() + .failure() + .stderr(predicates::str::contains("Avro write support not implemented")); +} + +#[test] +fn test_describe_command() { + let temp = tempdir().unwrap(); + let input = write_temp_file( + temp.path(), + "input.csv", + "name,age\nalice,30\nbob,40\ncharlie,50\n" + ); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "describe", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +------------+---------+------+ + | describe | name | age | + +------------+---------+------+ + | count | 3 | 3.0 | + | null_count | 0 | 0.0 | + | mean | null | 40.0 | + | std | null | 10.0 | + | min | alice | 30.0 | + | max | charlie | 50.0 | + | median | null | 40.0 | + +------------+---------+------+ + "); +} + +#[test] +fn test_schema_command() { + let temp = tempdir().unwrap(); + let input = write_temp_file( + temp.path(), + "input.csv", + "name,age\nalice,30\nbob,40\n" + ); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "schema", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +-------------+-----------+-------------+ + | column_name | data_type | is_nullable | + +-------------+-----------+-------------+ + | name | Utf8 | YES | + | age | Int64 | YES | + +-------------+-----------+-------------+ + "); +} + +#[test] +fn test_count_command() { + let temp = tempdir().unwrap(); + let input = write_temp_file( + temp.path(), + "input.csv", + "name,age\nalice,30\nbob,40\ncharlie,50\n" + ); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "count", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +----------+ + | count(*) | + +----------+ + | 3 | + +----------+ + "); +} + +#[test] +fn test_sort_command_ascending() { + let temp = tempdir().unwrap(); + let input = write_temp_file( + temp.path(), + "input.csv", + "name,age\nalice,30\nbob,40\ncharlie,50\n" + ); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "sort", + input.to_str().unwrap(), + "--columns", "age", + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +---------+-----+ + | name | age | + +---------+-----+ + | alice | 30 | + | bob | 40 | + | charlie | 50 | + +---------+-----+ + "); +} + +#[test] +fn test_sort_command_descending() { + let temp = tempdir().unwrap(); + let input = write_temp_file( + temp.path(), + "input.csv", + "name,age\nalice,30\nbob,40\ncharlie,50\n" + ); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "sort", + input.to_str().unwrap(), + "--columns", "age", + "--descending", + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +---------+-----+ + | name | age | + +---------+-----+ + | charlie | 50 | + | bob | 40 | + | alice | 30 | + +---------+-----+ + "); +} + +#[test] +fn test_reverse_stdout() { + let temp = tempdir().unwrap(); + let input_path = temp.path().join("input.csv"); + fs::write(&input_path, "name,age\nalice,30\nbob,40\ncharlie,25").unwrap(); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + let output = cmd + .args([ + "reverse", + input_path.to_str().unwrap(), + ]) + .assert() + .success() + .get_output() + .stdout + .clone(); + + assert_snapshot!(String::from_utf8(output).unwrap(), @r###" + +---------+-----+ + | name | age | + +---------+-----+ + | charlie | 25 | + | bob | 40 | + | alice | 30 | + +---------+-----+ + "###); +} + +#[test] +fn test_split_creates_chunks() { + let temp = tempdir().unwrap(); + let input_path = temp.path().join("data.csv"); + let output_dir = temp.path().join("out"); + + // Write input file + fs::write( + &input_path, + "name,age\nalice,30\nbob,40\ncharlie,25\ndave,20\nellen,45\n", + ) + .unwrap(); + + // Run the CLI command + let output = Command::cargo_bin("dfkit") + .unwrap() + .args(&[ + "split", + input_path.to_str().unwrap(), + "--chunks", + "2", + output_dir.to_str().unwrap(), + ]) + .assert() + .success() + .get_output() + .stdout + .clone(); + + // Assert output files exist using parse_file_list + let mut files= parse_file_list(None, Some(output_dir.clone())).unwrap(); + + files.sort(); + + assert_eq!(files.len(), 2); +} From ec04d6b0e0252f47aa69369609b773f96cf1a676 Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Wed, 16 Apr 2025 15:56:22 -0600 Subject: [PATCH 2/6] added cat test --- tests/snapshots/test__split_stdout.snap.new | 7 ---- tests/test.rs | 42 ++++++++++++++++++++- 2 files changed, 41 insertions(+), 8 deletions(-) delete mode 100644 tests/snapshots/test__split_stdout.snap.new diff --git a/tests/snapshots/test__split_stdout.snap.new b/tests/snapshots/test__split_stdout.snap.new deleted file mode 100644 index 7e7cca1..0000000 --- a/tests/snapshots/test__split_stdout.snap.new +++ /dev/null @@ -1,7 +0,0 @@ ---- -source: tests/test.rs -assertion_line: 335 -expression: "String::from_utf8(output).unwrap()" ---- -Written chunk 1 to /var/folders/w1/2v_jhxb96_dgnjv2nmk2wwnw0000gn/T/.tmpn4qpqD/out/data_1.csv -Written chunk 2 to /var/folders/w1/2v_jhxb96_dgnjv2nmk2wwnw0000gn/T/.tmpn4qpqD/out/data_2.csv diff --git a/tests/test.rs b/tests/test.rs index 5057028..69f90fe 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -316,7 +316,7 @@ fn test_split_creates_chunks() { .unwrap(); // Run the CLI command - let output = Command::cargo_bin("dfkit") + let _ = Command::cargo_bin("dfkit") .unwrap() .args(&[ "split", @@ -338,3 +338,43 @@ fn test_split_creates_chunks() { assert_eq!(files.len(), 2); } + +#[test] +fn test_cat_concatenates_csv_files() { + let temp = tempdir().unwrap(); + let file1 = temp.path().join("part1.csv"); + let file2 = temp.path().join("part2.csv"); + let out_file = temp.path().join("combined.csv"); + + // Create sample CSV files + fs::write(&file1, "name,age\nalice,30\nbob,40\n").unwrap(); + fs::write(&file2, "name,age\ncharlie,25\ndave,20\n").unwrap(); + + let input_files = format!("{},{}", file1.display(), file2.display()); + + // Run the CLI command + let _ = Command::cargo_bin("dfkit") + .unwrap() + .args(&[ + "cat", + "--files", + &input_files, + "--output", + out_file.to_str().unwrap(), + ]) + .assert() + .success() + .get_output() + .stdout + .clone(); + + // Check the output file contents + let result_csv = fs::read_to_string(&out_file).unwrap(); + assert_snapshot!(result_csv, @r" + name,age + alice,30 + bob,40 + charlie,25 + dave,20 + "); +} From b213700e6662398ff80f7c11b3e92dee52d2334b Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Wed, 16 Apr 2025 16:10:34 -0600 Subject: [PATCH 3/6] test refactor --- tests/test.rs | 78 ++++++++++++++++++++++----------------------------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/tests/test.rs b/tests/test.rs index 69f90fe..87099e3 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,7 +1,7 @@ use assert_cmd::Command; use tempfile::tempdir; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use insta::assert_snapshot; use dfkit::utils::parse_file_list; @@ -11,10 +11,18 @@ fn write_temp_file(dir: &Path, name: &str, contents: &str) -> std::path::PathBuf file_path } +fn create_basic_csv(dir: &Path) -> PathBuf { + write_temp_file(dir, "input.csv", "name,age\nalice,30\nbob,40\n") +} + +fn create_extended_csv(dir: &Path) -> PathBuf { + write_temp_file(dir, "input.csv", "name,age\nalice,30\nbob,40\ncharlie,50\n") +} + #[test] fn test_view() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args(["view", input.to_str().unwrap()]); let output = cmd.assert().success().get_output().stdout.clone(); @@ -30,7 +38,7 @@ fn test_view() { #[test] fn test_view_with_limit() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args(["view", input.to_str().unwrap(), "-l", "1"]); let output = cmd.assert().success().get_output().stdout.clone(); @@ -46,7 +54,7 @@ fn test_view_with_limit() { #[test] fn test_query() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args(["query", input.to_str().unwrap(), "--sql", "SELECT * FROM t WHERE age > 35"]); let output = cmd.assert().success().get_output().stdout.clone(); @@ -62,7 +70,7 @@ fn test_query() { #[test] fn test_query_with_output() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let output_path = temp.path().join("out.csv"); let mut cmd = Command::cargo_bin("dfkit").unwrap(); @@ -83,7 +91,7 @@ fn test_query_with_output() { #[test] fn test_convert_csv_to_json() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let output = temp.path().join("output.json"); let mut cmd = Command::cargo_bin("dfkit").unwrap(); @@ -102,7 +110,7 @@ fn test_convert_csv_to_json() { #[test] fn test_convert_csv_to_parquet() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let output = temp.path().join("output.parquet"); let mut cmd = Command::cargo_bin("dfkit").unwrap(); @@ -119,7 +127,7 @@ fn test_convert_csv_to_parquet() { #[test] fn test_convert_to_avro_should_fail() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let output = temp.path().join("output.avro"); let mut cmd = Command::cargo_bin("dfkit").unwrap(); @@ -137,11 +145,7 @@ fn test_convert_to_avro_should_fail() { #[test] fn test_describe_command() { let temp = tempdir().unwrap(); - let input = write_temp_file( - temp.path(), - "input.csv", - "name,age\nalice,30\nbob,40\ncharlie,50\n" - ); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args([ @@ -151,28 +155,24 @@ fn test_describe_command() { let output = cmd.assert().success().get_output().stdout.clone(); assert_snapshot!(String::from_utf8(output).unwrap(), @r" - +------------+---------+------+ - | describe | name | age | - +------------+---------+------+ - | count | 3 | 3.0 | - | null_count | 0 | 0.0 | - | mean | null | 40.0 | - | std | null | 10.0 | - | min | alice | 30.0 | - | max | charlie | 50.0 | - | median | null | 40.0 | - +------------+---------+------+ + +------------+-------+--------------------+ + | describe | name | age | + +------------+-------+--------------------+ + | count | 2 | 2.0 | + | null_count | 0 | 0.0 | + | mean | null | 35.0 | + | std | null | 7.0710678118654755 | + | min | alice | 30.0 | + | max | bob | 40.0 | + | median | null | 35.0 | + +------------+-------+--------------------+ "); } #[test] fn test_schema_command() { let temp = tempdir().unwrap(); - let input = write_temp_file( - temp.path(), - "input.csv", - "name,age\nalice,30\nbob,40\n" - ); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args([ @@ -194,11 +194,7 @@ fn test_schema_command() { #[test] fn test_count_command() { let temp = tempdir().unwrap(); - let input = write_temp_file( - temp.path(), - "input.csv", - "name,age\nalice,30\nbob,40\ncharlie,50\n" - ); + let input = create_extended_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args([ @@ -219,11 +215,7 @@ fn test_count_command() { #[test] fn test_sort_command_ascending() { let temp = tempdir().unwrap(); - let input = write_temp_file( - temp.path(), - "input.csv", - "name,age\nalice,30\nbob,40\ncharlie,50\n" - ); + let input = create_extended_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args([ @@ -247,11 +239,7 @@ fn test_sort_command_ascending() { #[test] fn test_sort_command_descending() { let temp = tempdir().unwrap(); - let input = write_temp_file( - temp.path(), - "input.csv", - "name,age\nalice,30\nbob,40\ncharlie,50\n" - ); + let input = create_extended_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args([ @@ -276,7 +264,7 @@ fn test_sort_command_descending() { #[test] fn test_reverse_stdout() { let temp = tempdir().unwrap(); - let input_path = temp.path().join("input.csv"); + let input_path = create_extended_csv(temp.path()); fs::write(&input_path, "name,age\nalice,30\nbob,40\ncharlie,25").unwrap(); let mut cmd = Command::cargo_bin("dfkit").unwrap(); From 053c3d1260b2adbff0634be23afcd28bb44fe090 Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Wed, 16 Apr 2025 16:58:02 -0600 Subject: [PATCH 4/6] test updates and refactor --- .github/workflows/ci.yml | 2 +- .gitignore | 1 + src/commands.rs | 93 +++---------------------------------- src/utils.rs | 13 ++++++ tests/.test.rs.pending-snap | 31 +++++++++++++ tests/test.rs | 22 ++++++--- 6 files changed, 68 insertions(+), 94 deletions(-) create mode 100644 tests/.test.rs.pending-snap diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21e0eab..4ee5656 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,5 +44,5 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: - files: tarpaulin-report.xml + files: cobertura.xml diff --git a/.gitignore b/.gitignore index 15a18b6..38cfb37 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target .DS_Store .idea/ +cobertura.xml diff --git a/src/commands.rs b/src/commands.rs index 35cbbe2..74b3c4f 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -2,12 +2,10 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use datafusion::arrow::compute::concat_batches; -use datafusion::common::DataFusionError; -use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::logical_expr::col; use datafusion::prelude::SessionContext; -use crate::utils::{file_type, register_table, DfKitError, FileFormat}; +use crate::utils::{file_type, register_table, write_output, DfKitError}; pub async fn view(ctx: &SessionContext, filename: &Path, limit: Option) -> Result<(), DfKitError> { let df = register_table(&ctx, "t", &filename).await?; @@ -28,15 +26,7 @@ pub async fn query(ctx: &SessionContext, filename: &Path, sql: Option, o let df_sql = ctx.sql(&*sql.unwrap()).await?; if let Some(path) = output { - match file_type { - FileFormat::Csv => df_sql.write_csv(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => df_sql.write_parquet(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => df_sql.write_json(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string()))); - } - }; - + write_output(df_sql, &path, &file_type).await?; println!("File written to: {}, successfully.", path.display()); } else { df_sql.show().await?; @@ -49,14 +39,7 @@ pub async fn convert(ctx: &SessionContext, filename: &Path, output_filename: &Pa let df = register_table(ctx, "t", &filename).await?; let output_file_type = file_type(&output_filename)?; - match output_file_type { - FileFormat::Csv => df.write_csv(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => df.write_parquet(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => df.write_json(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string()))); - } - }; + write_output(df, &output_filename, &output_file_type).await?; Ok(()) } @@ -103,20 +86,7 @@ pub async fn sort( if let Some(out_path) = output { let format = file_type(&out_path)?; - match format { - FileFormat::Csv => { - sorted_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Parquet => { - sorted_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Json => { - sorted_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); - } - }; + write_output(sorted_df, &out_path, &format).await?; println!("Sorted file written to: {}", out_path.display()); } else { sorted_df.show().await?; @@ -151,28 +121,7 @@ pub async fn reverse( if let Some(out_path) = output { let format = file_type(&out_path)?; - match format { - FileFormat::Csv => { - reversed_df - .write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Parquet => { - reversed_df - .write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Json => { - reversed_df - .write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented( - "Avro write not supported".into(), - ))); - } - }; + write_output(reversed_df, &out_path, &format).await?; println!("Reversed file written to: {}", out_path.display()); } else { reversed_df.show().await?; @@ -202,28 +151,7 @@ pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, outpu let chunk_filename = format!("{}_{}.{}", stem, i + 1, extension); let chunk_path = output_dir.join(chunk_filename); - match format { - FileFormat::Csv => { - chunk_df - .write_csv(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Parquet => { - chunk_df - .write_parquet(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Json => { - chunk_df - .write_json(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented( - "Avro split write not supported".into(), - ))) - } - }; + write_output(chunk_df, &chunk_path, &format).await?; println!("Written chunk {} to {}", i + 1, chunk_path.display()); } @@ -246,14 +174,7 @@ pub async fn cat(ctx: &SessionContext, files: Vec, out_path: &Path) -> } let format = file_type(&out_path)?; - match format { - FileFormat::Csv => final_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => final_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => final_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); - } - }; + write_output(final_df, out_path, &format).await?; println!("Concatenated file written to: {}", out_path.display()); Ok(()) diff --git a/src/utils.rs b/src/utils.rs index 70e613e..bbf1fd0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,6 @@ use std::path::{Path, PathBuf}; use datafusion::arrow::error::ArrowError; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::prelude::*; use datafusion::error::DataFusionError; use thiserror::Error; @@ -86,3 +87,15 @@ pub fn parse_file_list(files: Option, dir: Option) -> Result Result<(), DfKitError> { + match format { + FileFormat::Csv => df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Parquet => df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Json => df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Avro => { + return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); + } + }; + Ok(()) +} + diff --git a/tests/.test.rs.pending-snap b/tests/.test.rs.pending-snap new file mode 100644 index 0000000..9943e63 --- /dev/null +++ b/tests/.test.rs.pending-snap @@ -0,0 +1,31 @@ +{"run_id":"1744843728-685315000","line":361,"new":{"module_name":"test","snapshot_name":"cat_concatenates_csv_files","metadata":{"source":"tests/test.rs","assertion_line":361,"expression":"result_csv"},"snapshot":"name,age\ncharlie,25\ndave,20\nalice,30\nbob,40"},"old":{"module_name":"test","metadata":{},"snapshot":"name,age\nalice,30\nbob,40\ncharlie,25\ndave,20"}} +{"run_id":"1744843968-718417000","line":369,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":61,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":206,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":282,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":253,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":184,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":228,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":45,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":29,"new":null,"old":null} +{"run_id":"1744843968-718417000","line":157,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":206,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":369,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":61,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":282,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":184,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":228,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":253,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":29,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":45,"new":null,"old":null} +{"run_id":"1744844006-306017000","line":157,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":369,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":206,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":157,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":61,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":282,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":184,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":228,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":253,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":29,"new":null,"old":null} +{"run_id":"1744844140-176006000","line":45,"new":null,"old":null} diff --git a/tests/test.rs b/tests/test.rs index 87099e3..3643c71 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -139,7 +139,7 @@ fn test_convert_to_avro_should_fail() { cmd.assert() .failure() - .stderr(predicates::str::contains("Avro write support not implemented")); + .stderr(predicates::str::contains("Avro write not supported")); } #[test] @@ -351,14 +351,22 @@ fn test_cat_concatenates_csv_files() { out_file.to_str().unwrap(), ]) .assert() - .success() - .get_output() - .stdout - .clone(); + .success(); - // Check the output file contents + // Read and sort the output CSV let result_csv = fs::read_to_string(&out_file).unwrap(); - assert_snapshot!(result_csv, @r" + let lines: Vec<&str> = result_csv.lines().collect(); + + let header = lines[0]; + let mut records = lines[1..].to_vec(); + records.sort(); // Sort records alphabetically + + let sorted_result = std::iter::once(header) + .chain(records.into_iter()) + .collect::>() + .join("\n"); + + assert_snapshot!(sorted_result, @r" name,age alice,30 bob,40 From dba65f096a4fac506fc471197dcdebb5e18ee257 Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Wed, 16 Apr 2025 18:16:54 -0600 Subject: [PATCH 5/6] Delete .test.rs.pending-snap --- tests/.test.rs.pending-snap | 31 ------------------------------- 1 file changed, 31 deletions(-) delete mode 100644 tests/.test.rs.pending-snap diff --git a/tests/.test.rs.pending-snap b/tests/.test.rs.pending-snap deleted file mode 100644 index 9943e63..0000000 --- a/tests/.test.rs.pending-snap +++ /dev/null @@ -1,31 +0,0 @@ -{"run_id":"1744843728-685315000","line":361,"new":{"module_name":"test","snapshot_name":"cat_concatenates_csv_files","metadata":{"source":"tests/test.rs","assertion_line":361,"expression":"result_csv"},"snapshot":"name,age\ncharlie,25\ndave,20\nalice,30\nbob,40"},"old":{"module_name":"test","metadata":{},"snapshot":"name,age\nalice,30\nbob,40\ncharlie,25\ndave,20"}} -{"run_id":"1744843968-718417000","line":369,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":61,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":206,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":282,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":253,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":184,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":228,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":45,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":29,"new":null,"old":null} -{"run_id":"1744843968-718417000","line":157,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":206,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":369,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":61,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":282,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":184,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":228,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":253,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":29,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":45,"new":null,"old":null} -{"run_id":"1744844006-306017000","line":157,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":369,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":206,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":157,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":61,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":282,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":184,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":228,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":253,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":29,"new":null,"old":null} -{"run_id":"1744844140-176006000","line":45,"new":null,"old":null} From bff4880cc1f416ba32c95520e0bd990d06f9d389 Mon Sep 17 00:00:00 2001 From: jsai28 <54253219+jsai28@users.noreply.github.com> Date: Wed, 16 Apr 2025 18:32:37 -0600 Subject: [PATCH 6/6] Update ci.yml --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ee5656..176be24 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,5 +44,6 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: + token: ${{ secrets.CODECOV_TOKEN }} files: cobertura.xml