Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt-in Line-Protocol for Compatibility Mode #123

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion influxdb/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Client {
/// Add authorization token to [`Client`](crate::Client)
///
/// This is designed for influxdb 2.0's backward-compatible API which
/// requires authrozation by default. You can create such token from
/// requires authorization by default. You can create such token from
/// console of influxdb 2.0 .
pub fn with_token<S>(mut self, token: S) -> Self
where
Expand Down
28 changes: 25 additions & 3 deletions influxdb/src/query/line_proto_term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ impl LineProtoTerm<'_> {
match self {
Measurement(x) => Self::escape_any(x, &COMMAS_SPACES),
TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS),
FieldValue(x) => Self::escape_field_value(x),
FieldValue(x) => Self::escape_field_value(x, false),
TagValue(x) => Self::escape_tag_value(x),
}
}

fn escape_field_value(v: &Type) -> String {
pub fn escape_v2(self) -> String {
use LineProtoTerm::*;
match self {
Measurement(x) => Self::escape_any(x, &COMMAS_SPACES),
TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS),
FieldValue(x) => Self::escape_field_value(x, true),
TagValue(x) => Self::escape_tag_value(x),
}
}

fn escape_field_value(v: &Type, use_v2: bool) -> String {
use Type::*;
match v {
Boolean(v) => {
Expand All @@ -43,7 +53,13 @@ impl LineProtoTerm<'_> {
.to_string(),
Float(v) => v.to_string(),
SignedInteger(v) => format!("{}i", v),
UnsignedInteger(v) => format!("{}u", v),
UnsignedInteger(v) => {
if use_v2 {
format!("{}u", v)
} else {
format!("{}i", v)
}
}
Text(v) => format!(r#""{}""#, Self::escape_any(v, &QUOTES_SLASHES)),
}
}
Expand Down Expand Up @@ -112,6 +128,12 @@ mod test {
assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#);
assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#);

assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape(), r#"0i"#);
assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape(), r#"83i"#);

assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape_v2(), r#"0u"#);
assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape_v2(), r#"83u"#);

assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#);
assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#);
assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#);
Expand Down
30 changes: 29 additions & 1 deletion influxdb/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,36 @@ pub trait Query {
/// ```
fn build(&self) -> Result<ValidQuery, Error>;

/// Like [build] but with additional support for unsigned integers in the line protocol.
/// Please note, this crate can only interact with InfluxDB 2.0 in compatibility mode
/// and does not natively support InfluxDB 2.0.
///
/// # Examples
///
/// ```rust
/// use influxdb::{Query, Timestamp};
/// use influxdb::InfluxDbWriteable;
///
/// let use_v2 = true;
///
/// let invalid_query = Timestamp::Nanoseconds(0).into_query("measurement").build_with_opts(use_v2);
/// assert!(invalid_query.is_err());
///
/// let valid_query = Timestamp::Nanoseconds(0).into_query("measurement").add_field("myfield1", 11).build_with_opts(use_v2);
/// assert!(valid_query.is_ok());
/// ```
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error>;

fn get_type(&self) -> QueryType;
}

impl<Q: Query> Query for &Q {
fn build(&self) -> Result<ValidQuery, Error> {
Q::build(self)
Q::build_with_opts(self, false)
}

fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
Q::build_with_opts(self, use_v2)
}

fn get_type(&self) -> QueryType {
Expand All @@ -130,6 +154,10 @@ impl<Q: Query> Query for Box<Q> {
Q::build(self)
}

fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
Q::build_with_opts(self, use_v2)
}

fn get_type(&self) -> QueryType {
Q::get_type(self)
}
Expand Down
4 changes: 4 additions & 0 deletions influxdb/src/query/read_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl Query for ReadQuery {
Ok(ValidQuery(self.queries.join(";")))
}

fn build_with_opts(&self, _use_v2: bool) -> Result<ValidQuery, Error> {
Ok(ValidQuery(self.queries.join(";")))
}

fn get_type(&self) -> QueryType {
QueryType::ReadQuery
}
Expand Down
79 changes: 74 additions & 5 deletions influxdb/src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ where

impl Query for WriteQuery {
fn build(&self) -> Result<ValidQuery, Error> {
self.build_with_opts(false)
}

fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
if self.fields.is_empty() {
return Err(Error::InvalidQueryError {
error: "fields cannot be empty".to_string(),
Expand All @@ -173,10 +177,20 @@ impl Query for WriteQuery {
.tags
.iter()
.map(|(tag, value)| {
let escaped_tag_key = if use_v2 {
LineProtoTerm::TagKey(tag).escape_v2()
} else {
LineProtoTerm::TagKey(tag).escape()
};
let escaped_tag_value = if use_v2 {
LineProtoTerm::TagValue(value).escape_v2()
} else {
LineProtoTerm::TagValue(value).escape()
};
format!(
"{tag}={value}",
tag = LineProtoTerm::TagKey(tag).escape(),
value = LineProtoTerm::TagValue(value).escape(),
tag = escaped_tag_key,
value = escaped_tag_value,
)
})
.collect::<Vec<String>>()
Expand All @@ -189,18 +203,34 @@ impl Query for WriteQuery {
.fields
.iter()
.map(|(field, value)| {
let escaped_field_key = if use_v2 {
LineProtoTerm::FieldKey(field).escape_v2()
} else {
LineProtoTerm::FieldKey(field).escape()
};
let escaped_field_value = if use_v2 {
LineProtoTerm::FieldValue(value).escape_v2()
} else {
LineProtoTerm::FieldValue(value).escape()
};
format!(
"{field}={value}",
field = LineProtoTerm::FieldKey(field).escape(),
value = LineProtoTerm::FieldValue(value).escape(),
field = escaped_field_key,
value = escaped_field_value,
)
})
.collect::<Vec<String>>()
.join(",");

let escaped_measurement = if use_v2 {
LineProtoTerm::Measurement(&self.measurement).escape_v2()
} else {
LineProtoTerm::Measurement(&self.measurement).escape()
};

Ok(ValidQuery(format!(
"{measurement}{tags} {fields} {time}",
measurement = LineProtoTerm::Measurement(&self.measurement).escape(),
measurement = escaped_measurement,
tags = tags,
fields = fields,
time = self.timestamp
Expand All @@ -224,6 +254,17 @@ impl Query for Vec<WriteQuery> {
Ok(ValidQuery(qlines.join("\n")))
}

fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
let mut qlines = Vec::new();

for q in self {
let valid_query = q.build_with_opts(use_v2)?;
qlines.push(valid_query.0);
}

Ok(ValidQuery(qlines.join("\n")))
}

fn get_type(&self) -> QueryType {
QueryType::WriteQuery(
self.get(0)
Expand Down Expand Up @@ -267,6 +308,22 @@ mod tests {
.add_field("temperature_unsigned", 82u64)
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
"weather temperature=82i,wind_strength=3.7,temperature_unsigned=82i 11"
);
}

#[test]
fn test_write_builder_multiple_fields_with_v2() {
let query = Timestamp::Hours(11)
.into_query("weather".to_string())
.add_field("temperature", 82)
.add_field("wind_strength", 3.7)
.add_field("temperature_unsigned", 82u64)
.build_with_opts(true);

assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
Expand All @@ -282,6 +339,18 @@ mod tests {
.add_tag("wind_strength", <Option<u64>>::None)
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82i 11");
}

#[test]
fn test_write_builder_optional_fields_with_v2() {
let query = Timestamp::Hours(11)
.into_query("weather".to_string())
.add_field("temperature", 82u64)
.add_tag("wind_strength", <Option<u64>>::None)
.build_with_opts(true);

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82u 11");
}
Expand Down
Loading