From ebf78aa6c9ca235b00099b9ceb70cf34e414765f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 11 Jan 2025 18:03:34 -0500 Subject: [PATCH] feat: Updates to plugin API (#25799) * Add uint64_field to LineBuilder * Add bool_field to LineBuilder * Change query_rows to query in Python API * Update error output for test wal_plugin --- influxdb3/src/commands/test.rs | 2 +- influxdb3/tests/server/cli.rs | 2 +- influxdb3_processing_engine/src/plugins.rs | 6 ++++-- influxdb3_py_api/src/system_py.rs | 16 +++++++++++++++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/influxdb3/src/commands/test.rs b/influxdb3/src/commands/test.rs index 57423aa150b..fd8c09d030a 100644 --- a/influxdb3/src/commands/test.rs +++ b/influxdb3/src/commands/test.rs @@ -76,7 +76,7 @@ pub async fn command(config: Config) -> Result<(), Box> { None => { let file_path = plugin_config .input_file - .context("either input_lp or input_file must be provided")?; + .context("either --lp or --file must be provided")?; std::fs::read_to_string(file_path).context("unable to read input file")? } }; diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index 4ca330d0f1f..5226e51d26a 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -911,7 +911,7 @@ def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("arg1: " + args["arg1"]) query_params = {"host": args["host"]} - query_result = influxdb3_local.query_rows("SELECT * FROM cpu where host = $host", query_params) + query_result = influxdb3_local.query("SELECT * FROM cpu where host = $host", query_params) influxdb3_local.info("query result: " + str(query_result)) for table_batch in table_batches: diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index ab1e4a354aa..63ed01c2b37 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -470,7 +470,9 @@ def process_writes(influxdb3_local, table_batches, args=None): cpu_valid = LineBuilder("cpu")\ .tag("host", "A")\ - .int64_field("f1", 10) + .int64_field("f1", 10)\ + .uint64_field("f2", 20)\ + .bool_field("f3", True) influxdb3_local.write_to_db("foodb", cpu_valid) cpu_invalid = LineBuilder("cpu")\ @@ -509,7 +511,7 @@ def process_writes(influxdb3_local, table_batches, args=None): // the lines should still come through in the output because that's what Python sent let expected_foodb_lines = vec![ - "cpu,host=A f1=10i".to_string(), + "cpu,host=A f1=10i,f2=20u,f3=t".to_string(), "cpu,host=A f1=\"not_an_int\"".to_string(), ]; assert_eq!( diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index fc2c87cb105..e1ec4a3f444 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -116,7 +116,7 @@ impl PyPluginCallApi { } #[pyo3(signature = (query, args=None))] - fn query_rows( + fn query( &self, query: String, args: Option>, @@ -282,6 +282,14 @@ class LineBuilder: self.tags[key] = str(value) return self + def uint64_field(self, key: str, value: int) -> 'LineBuilder': + """Add an unsigned integer field to the line protocol.""" + self._validate_key(key, "field") + if value < 0: + raise ValueError(f"uint64 field '{key}' cannot be negative") + self.fields[key] = f"{value}u" + return self + def int64_field(self, key: str, value: int) -> 'LineBuilder': """Add an integer field to the line protocol.""" self._validate_key(key, "field") @@ -303,6 +311,12 @@ class LineBuilder: self.fields[key] = f'"{escaped_value}"' return self + def bool_field(self, key: str, value: bool) -> 'LineBuilder': + """Add a boolean field to the line protocol.""" + self._validate_key(key, "field") + self.fields[key] = 't' if value else 'f' + return self + def time_ns(self, timestamp_ns: int) -> 'LineBuilder': """Set the timestamp in nanoseconds.""" self._timestamp_ns = timestamp_ns