diff --git a/gaia-ccsds-c2a/src/access/cmd/schema.rs b/gaia-ccsds-c2a/src/access/cmd/schema.rs index dda835d..5ba5ce6 100644 --- a/gaia-ccsds-c2a/src/access/cmd/schema.rs +++ b/gaia-ccsds-c2a/src/access/cmd/schema.rs @@ -2,7 +2,7 @@ use anyhow::{ensure, Result}; use funty::{Floating, Integral}; use structpack::{ FloatingField, GenericFloatingField, GenericIntegralField, IntegralField, NumericField, - SizedField, + NumericValue, SizedField, }; use tlmcmddb::{cmd as cmddb, Component}; @@ -13,20 +13,47 @@ pub struct Metadata { pub component_name: String, pub command_name: String, pub cmd_id: u16, + pub description: String, } #[derive(Debug, Clone)] pub struct CommandSchema { - pub sized_parameters: Vec, + pub sized_parameters: Vec, pub static_size: usize, pub has_trailer_parameter: bool, } +#[derive(Debug, Clone)] +pub struct ParameterField { + pub value: NumericField, + pub description: String, +} + +impl SizedField for ParameterField { + type Value<'a> = NumericValue; + + fn read<'a>(&self, bytes: &'a [u8]) -> Result> { + self.value.read(bytes) + } + + fn write(&self, bytes: &mut [u8], value: Self::Value<'_>) -> Result<()> { + self.value.write(bytes, value) + } + + fn last_bit_exclusive(&self) -> usize { + self.value.last_bit_exclusive() + } + + fn bit_len(&self) -> usize { + self.value.bit_len() + } +} + impl CommandSchema { pub fn build_writer<'b>( &'b self, bytes: &'b mut [u8], - ) -> Writer<'b, std::slice::Iter<'b, NumericField>> { + ) -> Writer<'b, std::slice::Iter<'b, ParameterField>> { Writer::new( self.sized_parameters.iter(), self.static_size, @@ -76,6 +103,7 @@ impl<'a> Iterator for Iter<'a> { component_name: self.name.to_string(), command_name: command.name.to_string(), cmd_id: command.code, + description: command.description.to_string(), }; return build_schema(command) .map(|schema| Some((metadata, schema))) @@ -92,7 +120,10 @@ fn build_schema(db: &cmddb::Command) -> Result { for parameter in params_iter.by_ref() { if let Some(field) = build_numeric_field(static_size_bits, parameter) { static_size_bits += field.bit_len(); - sized_parameters.push(field); + sized_parameters.push(ParameterField { + value: field, + description: parameter.description.clone(), + }); } else { // raw parameter is present has_trailer_parameter = true; diff --git a/gaia-ccsds-c2a/src/access/tlm/schema.rs b/gaia-ccsds-c2a/src/access/tlm/schema.rs index ef0ff76..928489d 100644 --- a/gaia-ccsds-c2a/src/access/tlm/schema.rs +++ b/gaia-ccsds-c2a/src/access/tlm/schema.rs @@ -61,7 +61,16 @@ impl FloatingFieldSchema { } } -pub enum FieldSchema { +pub struct FieldSchema { + pub metadata: FieldMetadata, + pub value: FieldValueSchema, +} + +pub struct FieldMetadata { + pub description: String, +} + +pub enum FieldValueSchema { Integral(IntegralFieldSchema), Floating(FloatingFieldSchema), } @@ -135,39 +144,44 @@ fn build_field_schema( let converter = build_integral_converter(&field.conversion_info); Ok(( &field.name, - match obs.variable_type { - tlmdb::VariableType::Int8 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::I8(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Int16 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::I16(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Int32 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::I32(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Uint8 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::U8(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Uint16 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::U16(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Uint32 => FieldSchema::Integral(IntegralFieldSchema { - converter, - field: GenericIntegralField::U32(build_telemetry_integral_field(bit_range)?), - }), - tlmdb::VariableType::Float => FieldSchema::Floating(FloatingFieldSchema { - converter: as_polynomial(converter)?, - field: GenericFloatingField::F32(build_telemetry_floating_field(bit_range)?), - }), - tlmdb::VariableType::Double => FieldSchema::Floating(FloatingFieldSchema { - converter: as_polynomial(converter)?, - field: GenericFloatingField::F64(build_telemetry_floating_field(bit_range)?), - }), + FieldSchema { + metadata: FieldMetadata { + description: field.description.clone(), + }, + value: match obs.variable_type { + tlmdb::VariableType::Int8 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::I8(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Int16 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::I16(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Int32 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::I32(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Uint8 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::U8(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Uint16 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::U16(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Uint32 => FieldValueSchema::Integral(IntegralFieldSchema { + converter, + field: GenericIntegralField::U32(build_telemetry_integral_field(bit_range)?), + }), + tlmdb::VariableType::Float => FieldValueSchema::Floating(FloatingFieldSchema { + converter: as_polynomial(converter)?, + field: GenericFloatingField::F32(build_telemetry_floating_field(bit_range)?), + }), + tlmdb::VariableType::Double => FieldValueSchema::Floating(FloatingFieldSchema { + converter: as_polynomial(converter)?, + field: GenericFloatingField::F64(build_telemetry_floating_field(bit_range)?), + }), + }, }, )) } diff --git a/tmtc-c2a/proto/tmtc_generic_c2a.proto b/tmtc-c2a/proto/tmtc_generic_c2a.proto index 8b71d01..17df29d 100644 --- a/tmtc-c2a/proto/tmtc_generic_c2a.proto +++ b/tmtc-c2a/proto/tmtc_generic_c2a.proto @@ -45,6 +45,7 @@ message CommandSchema { message CommandSchemaMetadata { uint32 id = 1; + string description = 2; } message CommandParameterSchema { @@ -53,7 +54,7 @@ message CommandParameterSchema { } message CommandParameterSchemaMetadata { - // TODO: string description = 1; + string description = 1; } enum CommandParameterDataType { @@ -74,11 +75,18 @@ message TelemetrySchemaMetadata { message TelemetryFieldSchema { TelemetryFieldSchemaMetadata metadata = 1; string name = 2; - // TODO: TelemetryFieldDataType data_type = 3; +} + +enum TelemetryFieldDataType { + Integer = 0; + Double = 1; + Enum = 3; + Bytes = 4; } message TelemetryFieldSchemaMetadata { - // TODO: string description = 1; + string description = 1; + TelemetryFieldDataType data_type = 2; } message TelemetryChannelSchema { diff --git a/tmtc-c2a/src/kble_gs.rs b/tmtc-c2a/src/kble_gs.rs index 376a306..5bd2df3 100644 --- a/tmtc-c2a/src/kble_gs.rs +++ b/tmtc-c2a/src/kble_gs.rs @@ -9,7 +9,7 @@ use gaia_ccsds_c2a::{ }; use tokio::{ net::{TcpListener, ToSocketAddrs}, - sync::{broadcast, mpsc}, + sync::{broadcast, mpsc, oneshot}, }; use tracing::{error, info}; @@ -25,7 +25,7 @@ pub fn new() -> (Link, Socket) { } pub struct Socket { - cmd_rx: mpsc::Receiver>, + cmd_rx: mpsc::Receiver<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -36,24 +36,33 @@ impl Socket { let accept_fut = listener.accept(); let leak_fut = async { loop { - self.cmd_rx.recv().await; + if let Some((_, resp_tx)) = self.cmd_rx.recv().await { + if let Err(e) = + resp_tx.send(Err(anyhow!("kble socket to satellite is not ready"))) + { + break e; + } + } } }; let (incoming, addr) = tokio::select! { - accept = accept_fut => accept?, - _ = leak_fut => unreachable!(), + accept = accept_fut => accept.map_err(|_| anyhow!("response receiver has gone"))?, + resp_res = leak_fut => return resp_res, }; info!("accept kble connection from {addr}"); let wss = tokio_tungstenite::accept_async(incoming).await?; let (mut sink, mut stream) = kble_socket::from_tungstenite(wss); let uplink = async { loop { - let cmd_bytes = self + let (cmd_bytes, resp_tx) = self .cmd_rx .recv() .await .ok_or_else(|| anyhow!("command sender has gone"))?; - sink.send(cmd_bytes.into()).await?; + let res = sink.send(cmd_bytes.into()).await; + resp_tx + .send(res) + .map_err(|_| anyhow!("response receiver has gone"))?; } }; let downlink = async { @@ -78,7 +87,7 @@ impl Socket { } pub struct Link { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -123,7 +132,7 @@ impl aos::SyncAndChannelCoding for Downlink { #[derive(Debug, Clone)] pub struct Uplink { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, } #[async_trait::async_trait] @@ -137,7 +146,9 @@ impl tc::SyncAndChannelCoding for Uplink { data: &[u8], ) -> Result<()> { let tf_bytes = build_tf(scid, vcid, frame_type, sequence_number, data)?; - self.cmd_tx.send(tf_bytes).await?; + let (resp_tx, resp_rx) = oneshot::channel(); + self.cmd_tx.send((tf_bytes, resp_tx)).await?; + resp_rx.await??; Ok(()) } } diff --git a/tmtc-c2a/src/registry/cmd.rs b/tmtc-c2a/src/registry/cmd.rs index a2bc194..53e42ec 100644 --- a/tmtc-c2a/src/registry/cmd.rs +++ b/tmtc-c2a/src/registry/cmd.rs @@ -5,7 +5,7 @@ use std::{ }; use anyhow::{anyhow, Result}; -use gaia_ccsds_c2a::access::cmd::schema::{from_tlmcmddb, CommandSchema}; +use gaia_ccsds_c2a::access::cmd::schema::{from_tlmcmddb, CommandSchema, Metadata}; use itertools::Itertools; use crate::proto::tmtc_generic_c2a as proto; @@ -62,7 +62,7 @@ struct CommandSchemaWithId { #[derive(Debug, Clone)] pub struct Registry { prefix_map: satconfig::CommandPrefixMap, - schema_map: HashMap<(String, String), CommandSchemaWithId>, + schema_map: HashMap<(String, String), (Metadata, CommandSchemaWithId)>, } impl Registry { @@ -100,16 +100,18 @@ impl Registry { self.schema_map .iter() .sorted_by_key(|&((component_name, _), _)| component_name) - .group_by(|&((component_name, _), schema_with_id)| { + .group_by(|&((component_name, _), (_, schema_with_id))| { (component_name, schema_with_id.apid) }) .into_iter() .map(|((component_name, apid), group)| { let command_schema_map = group - .map(|((_, command_name), schema_with_id)| { + .map(|((_, command_name), (metadata, schema_with_id))| { let trailer_parameter = if schema_with_id.schema.has_trailer_parameter { Some(proto::CommandParameterSchema { - metadata: Some(proto::CommandParameterSchemaMetadata {}), + metadata: Some(proto::CommandParameterSchemaMetadata { + description: metadata.description.clone(), + }), data_type: proto::CommandParameterDataType::CmdParameterBytes .into(), }) @@ -121,7 +123,7 @@ impl Registry { .sized_parameters .iter() .map(|param| { - let data_type = match param { + let data_type = match param.value { structpack::NumericField::Integral(_) => { proto::CommandParameterDataType::CmdParameterInteger } @@ -130,7 +132,9 @@ impl Registry { } }; proto::CommandParameterSchema { - metadata: Some(proto::CommandParameterSchemaMetadata {}), + metadata: Some(proto::CommandParameterSchemaMetadata { + description: param.description.clone(), + }), data_type: data_type.into(), } }) @@ -140,6 +144,7 @@ impl Registry { let command_schema = proto::CommandSchema { metadata: Some(proto::CommandSchemaMetadata { id: schema_with_id.command_id as u32, + description: metadata.description.clone(), }), parameters, }; @@ -166,11 +171,14 @@ impl Registry { destination_type, execution_type, } = self.prefix_map.get(&prefix)?.get(&component)?; - let CommandSchemaWithId { - apid, - command_id, - schema, - } = self.schema_map.get(&(component, command))?; + let ( + _, + CommandSchemaWithId { + apid, + command_id, + schema, + }, + ) = self.schema_map.get(&(component, command))?; Some(FatCommandSchema { apid: *apid, command_id: *command_id, @@ -190,8 +198,8 @@ impl Registry { .flatten() .map(|schema| { let (metadata, schema) = schema?; - let component = metadata.component_name; - let cmddb_name = metadata.command_name; + let component = metadata.component_name.clone(); + let cmddb_name = metadata.command_name.clone(); let apid = *apid_map .get(&component) .ok_or_else(|| anyhow!("APID is not defined for {component}"))?; @@ -200,7 +208,7 @@ impl Registry { command_id: metadata.cmd_id, schema, }; - Ok(((component, cmddb_name), schema_with_id)) + Ok(((component, cmddb_name), (metadata, schema_with_id))) }) .collect::>()?; Ok(Self { diff --git a/tmtc-c2a/src/registry/tlm.rs b/tmtc-c2a/src/registry/tlm.rs index 0a9ef52..58a2559 100644 --- a/tmtc-c2a/src/registry/tlm.rs +++ b/tmtc-c2a/src/registry/tlm.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::{anyhow, Result}; use gaia_ccsds_c2a::access::tlm::schema::{ - from_tlmcmddb, FieldSchema, FloatingFieldSchema, IntegralFieldSchema, + from_tlmcmddb, FieldSchema, FieldValueSchema, FloatingFieldSchema, IntegralFieldSchema, }; use itertools::Itertools; @@ -55,6 +55,16 @@ pub struct FieldMetadata { original_name: String, pub converted_name: String, pub raw_name: String, + pub description: String, + pub data_type: DataType, +} + +#[derive(Debug, Clone)] +pub enum DataType { + Integer, + Double, + Enum, + Bytes, } #[derive(Debug, Clone)] @@ -95,7 +105,15 @@ impl Registry { .chain(fat_tlm_schema.schema.floating_fields.iter().map(|(m, _)| m)) .sorted_by_key(|m| m.order) .map(|m| proto::TelemetryFieldSchema { - metadata: Some(proto::TelemetryFieldSchemaMetadata {}), + metadata: Some(proto::TelemetryFieldSchemaMetadata { + description: m.description.clone(), + data_type: match m.data_type { + DataType::Integer => proto::TelemetryFieldDataType::Integer as i32, + DataType::Double => proto::TelemetryFieldDataType::Double as i32, + DataType::Enum => proto::TelemetryFieldDataType::Enum as i32, + DataType::Bytes => proto::TelemetryFieldDataType::Bytes as i32, + }, + }), name: m.original_name.to_string(), }) .collect(); @@ -203,12 +221,27 @@ fn build_telemetry_schema<'a>( }; for (order, pair) in iter.enumerate() { let (field_name, field_schema) = pair?; - let name_pair = build_field_metadata(order, field_name); - match field_schema { - FieldSchema::Integral(field_schema) => { + let data_type = match &field_schema.value { + FieldValueSchema::Integral(schema) => match schema.converter { + Some(gaia_ccsds_c2a::access::tlm::converter::Integral::Polynomial(_)) => { + DataType::Double + } + Some(gaia_ccsds_c2a::access::tlm::converter::Integral::Status(_)) => DataType::Enum, + None => DataType::Integer, + }, + FieldValueSchema::Floating(_) => DataType::Double, + }; + let name_pair = build_field_metadata( + order, + field_name, + &field_schema.metadata.description, + data_type, + ); + match field_schema.value { + FieldValueSchema::Integral(field_schema) => { schema.integral_fields.push((name_pair, field_schema)); } - FieldSchema::Floating(field_schema) => { + FieldValueSchema::Floating(field_schema) => { schema.floating_fields.push((name_pair, field_schema)); } } @@ -216,11 +249,18 @@ fn build_telemetry_schema<'a>( Ok(schema) } -fn build_field_metadata(order: usize, tlmdb_name: &str) -> FieldMetadata { +fn build_field_metadata( + order: usize, + tlmdb_name: &str, + description: &str, + data_type: DataType, +) -> FieldMetadata { FieldMetadata { order, original_name: tlmdb_name.to_string(), converted_name: tlmdb_name.to_string(), raw_name: format!("{tlmdb_name}@RAW"), + description: description.to_string(), + data_type, } }