Skip to content

Commit

Permalink
Merge pull request #1 from ut-issl/local-test-issl
Browse files Browse the repository at this point in the history
Local test issl
  • Loading branch information
shunsuke-shimomura committed Jun 26, 2024
2 parents 2cf7e3f + 7d71484 commit 23c8818
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 73 deletions.
39 changes: 35 additions & 4 deletions gaia-ccsds-c2a/src/access/cmd/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{ensure, Result};
use funty::{Floating, Integral};
use structpack::{
FloatingField, GenericFloatingField, GenericIntegralField, IntegralField, NumericField,
SizedField,
NumericValue, SizedField,
};
use tlmcmddb::{cmd as cmddb, Component};

Expand All @@ -13,20 +13,47 @@ pub struct Metadata {
pub component_name: String,
pub command_name: String,
pub cmd_id: u16,
pub description: String,
}

#[derive(Debug, Clone)]
pub struct CommandSchema {
pub sized_parameters: Vec<NumericField>,
pub sized_parameters: Vec<ParameterField>,
pub static_size: usize,
pub has_trailer_parameter: bool,
}

#[derive(Debug, Clone)]
pub struct ParameterField {
pub value: NumericField,
pub description: String,
}

impl SizedField for ParameterField {
type Value<'a> = NumericValue;

fn read<'a>(&self, bytes: &'a [u8]) -> Result<Self::Value<'a>> {
self.value.read(bytes)
}

fn write(&self, bytes: &mut [u8], value: Self::Value<'_>) -> Result<()> {
self.value.write(bytes, value)
}

fn last_bit_exclusive(&self) -> usize {
self.value.last_bit_exclusive()
}

fn bit_len(&self) -> usize {
self.value.bit_len()
}
}

impl CommandSchema {
pub fn build_writer<'b>(
&'b self,
bytes: &'b mut [u8],
) -> Writer<'b, std::slice::Iter<'b, NumericField>> {
) -> Writer<'b, std::slice::Iter<'b, ParameterField>> {
Writer::new(
self.sized_parameters.iter(),
self.static_size,
Expand Down Expand Up @@ -76,6 +103,7 @@ impl<'a> Iterator for Iter<'a> {
component_name: self.name.to_string(),
command_name: command.name.to_string(),
cmd_id: command.code,
description: command.description.to_string(),
};
return build_schema(command)
.map(|schema| Some((metadata, schema)))
Expand All @@ -92,7 +120,10 @@ fn build_schema(db: &cmddb::Command) -> Result<CommandSchema> {
for parameter in params_iter.by_ref() {
if let Some(field) = build_numeric_field(static_size_bits, parameter) {
static_size_bits += field.bit_len();
sized_parameters.push(field);
sized_parameters.push(ParameterField {
value: field,
description: parameter.description.clone(),
});
} else {
// raw parameter is present
has_trailer_parameter = true;
Expand Down
82 changes: 48 additions & 34 deletions gaia-ccsds-c2a/src/access/tlm/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,16 @@ impl FloatingFieldSchema {
}
}

pub enum FieldSchema {
pub struct FieldSchema {
pub metadata: FieldMetadata,
pub value: FieldValueSchema,
}

pub struct FieldMetadata {
pub description: String,
}

pub enum FieldValueSchema {
Integral(IntegralFieldSchema),
Floating(FloatingFieldSchema),
}
Expand Down Expand Up @@ -135,39 +144,44 @@ fn build_field_schema(
let converter = build_integral_converter(&field.conversion_info);
Ok((
&field.name,
match obs.variable_type {
tlmdb::VariableType::Int8 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I8(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Int16 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I16(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Int32 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I32(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint8 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U8(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint16 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U16(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint32 => FieldSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U32(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Float => FieldSchema::Floating(FloatingFieldSchema {
converter: as_polynomial(converter)?,
field: GenericFloatingField::F32(build_telemetry_floating_field(bit_range)?),
}),
tlmdb::VariableType::Double => FieldSchema::Floating(FloatingFieldSchema {
converter: as_polynomial(converter)?,
field: GenericFloatingField::F64(build_telemetry_floating_field(bit_range)?),
}),
FieldSchema {
metadata: FieldMetadata {
description: field.description.clone(),
},
value: match obs.variable_type {
tlmdb::VariableType::Int8 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I8(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Int16 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I16(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Int32 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::I32(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint8 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U8(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint16 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U16(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Uint32 => FieldValueSchema::Integral(IntegralFieldSchema {
converter,
field: GenericIntegralField::U32(build_telemetry_integral_field(bit_range)?),
}),
tlmdb::VariableType::Float => FieldValueSchema::Floating(FloatingFieldSchema {
converter: as_polynomial(converter)?,
field: GenericFloatingField::F32(build_telemetry_floating_field(bit_range)?),
}),
tlmdb::VariableType::Double => FieldValueSchema::Floating(FloatingFieldSchema {
converter: as_polynomial(converter)?,
field: GenericFloatingField::F64(build_telemetry_floating_field(bit_range)?),
}),
},
},
))
}
Expand Down
14 changes: 11 additions & 3 deletions tmtc-c2a/proto/tmtc_generic_c2a.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message CommandSchema {

message CommandSchemaMetadata {
uint32 id = 1;
string description = 2;
}

message CommandParameterSchema {
Expand All @@ -53,7 +54,7 @@ message CommandParameterSchema {
}

message CommandParameterSchemaMetadata {
// TODO: string description = 1;
string description = 1;
}

enum CommandParameterDataType {
Expand All @@ -74,11 +75,18 @@ message TelemetrySchemaMetadata {
message TelemetryFieldSchema {
TelemetryFieldSchemaMetadata metadata = 1;
string name = 2;
// TODO: TelemetryFieldDataType data_type = 3;
}

enum TelemetryFieldDataType {
Integer = 0;
Double = 1;
Enum = 3;
Bytes = 4;
}

message TelemetryFieldSchemaMetadata {
// TODO: string description = 1;
string description = 1;
TelemetryFieldDataType data_type = 2;
}

message TelemetryChannelSchema {
Expand Down
31 changes: 21 additions & 10 deletions tmtc-c2a/src/kble_gs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use gaia_ccsds_c2a::{
};
use tokio::{
net::{TcpListener, ToSocketAddrs},
sync::{broadcast, mpsc},
sync::{broadcast, mpsc, oneshot},
};
use tracing::{error, info};

Expand All @@ -25,7 +25,7 @@ pub fn new() -> (Link, Socket) {
}

pub struct Socket {
cmd_rx: mpsc::Receiver<Vec<u8>>,
cmd_rx: mpsc::Receiver<(Vec<u8>, oneshot::Sender<Result<()>>)>,
tlm_tx: broadcast::Sender<Vec<u8>>,
}

Expand All @@ -36,24 +36,33 @@ impl Socket {
let accept_fut = listener.accept();
let leak_fut = async {
loop {
self.cmd_rx.recv().await;
if let Some((_, resp_tx)) = self.cmd_rx.recv().await {
if let Err(e) =
resp_tx.send(Err(anyhow!("kble socket to satellite is not ready")))
{
break e;
}
}
}
};
let (incoming, addr) = tokio::select! {
accept = accept_fut => accept?,
_ = leak_fut => unreachable!(),
accept = accept_fut => accept.map_err(|_| anyhow!("response receiver has gone"))?,
resp_res = leak_fut => return resp_res,
};
info!("accept kble connection from {addr}");
let wss = tokio_tungstenite::accept_async(incoming).await?;
let (mut sink, mut stream) = kble_socket::from_tungstenite(wss);
let uplink = async {
loop {
let cmd_bytes = self
let (cmd_bytes, resp_tx) = self
.cmd_rx
.recv()
.await
.ok_or_else(|| anyhow!("command sender has gone"))?;
sink.send(cmd_bytes.into()).await?;
let res = sink.send(cmd_bytes.into()).await;
resp_tx
.send(res)
.map_err(|_| anyhow!("response receiver has gone"))?;
}
};
let downlink = async {
Expand All @@ -78,7 +87,7 @@ impl Socket {
}

pub struct Link {
cmd_tx: mpsc::Sender<Vec<u8>>,
cmd_tx: mpsc::Sender<(Vec<u8>, oneshot::Sender<Result<()>>)>,
tlm_tx: broadcast::Sender<Vec<u8>>,
}

Expand Down Expand Up @@ -123,7 +132,7 @@ impl aos::SyncAndChannelCoding for Downlink {

#[derive(Debug, Clone)]
pub struct Uplink {
cmd_tx: mpsc::Sender<Vec<u8>>,
cmd_tx: mpsc::Sender<(Vec<u8>, oneshot::Sender<Result<()>>)>,
}

#[async_trait::async_trait]
Expand All @@ -137,7 +146,9 @@ impl tc::SyncAndChannelCoding for Uplink {
data: &[u8],
) -> Result<()> {
let tf_bytes = build_tf(scid, vcid, frame_type, sequence_number, data)?;
self.cmd_tx.send(tf_bytes).await?;
let (resp_tx, resp_rx) = oneshot::channel();
self.cmd_tx.send((tf_bytes, resp_tx)).await?;
resp_rx.await??;
Ok(())
}
}
Expand Down
Loading

0 comments on commit 23c8818

Please sign in to comment.