Skip to content

Commit

Permalink
Remove most impl From<_> for RpcMessage. (linera-io#3098)
Browse files Browse the repository at this point in the history
## Motivation

For linera-io#3048 I will need
to add another RPC message, to _upload_ a blob. This means a
`BlobContent` can now be turned into both an
`RpcMessage::DownloadBlobContentResponse` and
`RpcMessage::UploadBlobContent`.

In general, it is dangerous to use `RpcMessage::from` to turn data types
into messages: It's easy to make a mistake when adding a new message
variant with a field that is idential with another message variant's
field that already has a `From` implementation, and accidentally turning
it into the unintended message.

## Proposal

Remove all `impl From<_> for RpcMessage` except for the `NodeError`.
Construct the messages explicitly.

## Test Plan

This doesn't change the logic. CI should catch regressions.

## Release Plan

- Nothing to do / These changes follow the usual release cycle.

## Links

- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
  • Loading branch information
afck authored Jan 7, 2025
1 parent e9fce0f commit 6c5519c
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 122 deletions.
2 changes: 1 addition & 1 deletion linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ where
key_pair,
vec![],
);
proposals.push(proposal.into());
proposals.push(RpcMessage::BlockProposal(Box::new(proposal)));
next_recipient = chain.chain_id;
}
proposals
Expand Down
84 changes: 0 additions & 84 deletions linera-rpc/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,92 +208,8 @@ impl TryFrom<RpcMessage> for Vec<BlobId> {
}
}

impl From<BlockProposal> for RpcMessage {
fn from(block_proposal: BlockProposal) -> Self {
RpcMessage::BlockProposal(Box::new(block_proposal))
}
}

impl From<HandleLiteCertRequest<'static>> for RpcMessage {
fn from(request: HandleLiteCertRequest<'static>) -> Self {
RpcMessage::LiteCertificate(Box::new(request))
}
}

impl From<HandleTimeoutCertificateRequest> for RpcMessage {
fn from(request: HandleTimeoutCertificateRequest) -> Self {
RpcMessage::TimeoutCertificate(Box::new(request))
}
}

impl From<HandleValidatedCertificateRequest> for RpcMessage {
fn from(request: HandleValidatedCertificateRequest) -> Self {
RpcMessage::ValidatedCertificate(Box::new(request))
}
}

impl From<HandleConfirmedCertificateRequest> for RpcMessage {
fn from(request: HandleConfirmedCertificateRequest) -> Self {
RpcMessage::ConfirmedCertificate(Box::new(request))
}
}

impl From<Vec<CryptoHash>> for RpcMessage {
fn from(hashes: Vec<CryptoHash>) -> Self {
RpcMessage::DownloadCertificates(hashes)
}
}

impl From<ChainInfoQuery> for RpcMessage {
fn from(chain_info_query: ChainInfoQuery) -> Self {
RpcMessage::ChainInfoQuery(Box::new(chain_info_query))
}
}

impl From<LiteVote> for RpcMessage {
fn from(vote: LiteVote) -> Self {
RpcMessage::Vote(Box::new(vote))
}
}

impl From<ChainInfoResponse> for RpcMessage {
fn from(chain_info_response: ChainInfoResponse) -> Self {
RpcMessage::ChainInfoResponse(Box::new(chain_info_response))
}
}

impl From<NodeError> for RpcMessage {
fn from(error: NodeError) -> Self {
RpcMessage::Error(Box::new(error))
}
}

impl From<CrossChainRequest> for RpcMessage {
fn from(cross_chain_request: CrossChainRequest) -> Self {
RpcMessage::CrossChainRequest(Box::new(cross_chain_request))
}
}

impl From<VersionInfo> for RpcMessage {
fn from(version_info: VersionInfo) -> Self {
RpcMessage::VersionInfoResponse(Box::new(version_info))
}
}

impl From<BlobContent> for RpcMessage {
fn from(blob: BlobContent) -> Self {
RpcMessage::DownloadBlobContentResponse(Box::new(blob))
}
}

impl From<ConfirmedBlock> for RpcMessage {
fn from(block: ConfirmedBlock) -> Self {
RpcMessage::DownloadConfirmedBlockResponse(Box::new(block))
}
}

impl From<Vec<ConfirmedBlockCertificate>> for RpcMessage {
fn from(certificates: Vec<ConfirmedBlockCertificate>) -> Self {
RpcMessage::DownloadCertificatesResponse(certificates)
}
}
21 changes: 13 additions & 8 deletions linera-rpc/src/simple/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ impl ValidatorNode for SimpleClient {
&self,
proposal: BlockProposal,
) -> Result<ChainInfoResponse, NodeError> {
self.query(proposal.into()).await
let request = RpcMessage::BlockProposal(Box::new(proposal));
self.query(request).await
}

/// Processes a hash certificate.
Expand All @@ -93,11 +94,11 @@ impl ValidatorNode for SimpleClient {
delivery: CrossChainMessageDelivery,
) -> Result<ChainInfoResponse, NodeError> {
let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
let request = HandleLiteCertRequest {
let request = RpcMessage::LiteCertificate(Box::new(HandleLiteCertRequest {
certificate: certificate.cloned(),
wait_for_outgoing_messages,
};
self.query(request.into()).await
}));
self.query(request).await
}

/// Processes a validated certificate.
Expand All @@ -107,7 +108,8 @@ impl ValidatorNode for SimpleClient {
blobs: Vec<Blob>,
) -> Result<ChainInfoResponse, NodeError> {
let request = HandleValidatedCertificateRequest { certificate, blobs };
self.query(request.into()).await
let request = RpcMessage::ValidatedCertificate(Box::new(request));
self.query(request).await
}

/// Processes a confirmed certificate.
Expand All @@ -123,7 +125,8 @@ impl ValidatorNode for SimpleClient {
blobs,
wait_for_outgoing_messages,
};
self.query(request.into()).await
let request = RpcMessage::ConfirmedCertificate(Box::new(request));
self.query(request).await
}

/// Processes a timeout certificate.
Expand All @@ -132,15 +135,17 @@ impl ValidatorNode for SimpleClient {
certificate: TimeoutCertificate,
) -> Result<ChainInfoResponse, NodeError> {
let request = HandleTimeoutCertificateRequest { certificate };
self.query(request.into()).await
let request = RpcMessage::TimeoutCertificate(Box::new(request));
self.query(request).await
}

/// Handles information queries for this chain.
async fn handle_chain_info_query(
&self,
query: ChainInfoQuery,
) -> Result<ChainInfoResponse, NodeError> {
self.query(query.into()).await
let request = RpcMessage::ChainInfoQuery(Box::new(query));
self.query(request).await
}

fn subscribe(
Expand Down
4 changes: 2 additions & 2 deletions linera-rpc/src/simple/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod tests {
message_contents: ChainInfoQuery,
trailing_bytes: Vec<u8>,
) {
let message = RpcMessage::from(message_contents);
let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
let payload = bincode::serialize(&message).expect("RpcMessage is serializable");

let mut buffer = BytesMut::with_capacity(
Expand Down Expand Up @@ -174,7 +174,7 @@ mod tests {
leading_bytes: Vec<u8>,
message_contents: ChainInfoQuery,
) {
let message = RpcMessage::from(message_contents);
let message = RpcMessage::ChainInfoQuery(Box::new(message_contents));
let serialized_message =
bincode::serialize(&message).expect("Serialization should succeed");

Expand Down
22 changes: 13 additions & 9 deletions linera-rpc/src/simple/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ where
// Cross-shard requests
self.handle_network_actions(actions);
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
warn!(nickname = self.server.state.nickname(), %error, "Failed to handle block proposal");
Expand Down Expand Up @@ -234,7 +234,7 @@ where
}
}
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
if let WorkerError::MissingCertificateValue = &error {
Expand All @@ -257,7 +257,7 @@ where
// Cross-shard requests
self.handle_network_actions(actions);
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
error!(nickname = self.server.state.nickname(), %error, "Failed to handle timeout certificate");
Expand All @@ -276,7 +276,7 @@ where
// Cross-shard requests
self.handle_network_actions(actions);
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
error!(nickname = self.server.state.nickname(), %error, "Failed to handle validated certificate");
Expand Down Expand Up @@ -304,7 +304,7 @@ where
}
}
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
error!(nickname = self.server.state.nickname(), %error, "Failed to handle confirmed certificate");
Expand All @@ -318,7 +318,7 @@ where
// Cross-shard requests
self.handle_network_actions(actions);
// Response
Ok(Some(info.into()))
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
}
Err(error) => {
error!(nickname = self.server.state.nickname(), %error, "Failed to handle chain info query");
Expand All @@ -332,14 +332,17 @@ where
self.handle_network_actions(actions);
}
Err(error) => {
error!(nickname = self.server.state.nickname(), %error, "Failed to handle cross-chain request");
let nickname = self.server.state.nickname();
error!(nickname, %error, "Failed to handle cross-chain request");
}
}
// No user to respond to.
Ok(None)
}

RpcMessage::VersionInfoQuery => Ok(Some(linera_version::VersionInfo::default().into())),
RpcMessage::VersionInfoQuery => {
Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
}

RpcMessage::Vote(_)
| RpcMessage::Error(_)
Expand Down Expand Up @@ -400,7 +403,8 @@ where
self.server.shard_id,
shard_id
);
if let Err(error) = self.cross_chain_sender.try_send((request.into(), shard_id)) {
let request = RpcMessage::CrossChainRequest(Box::new(request));
if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
error!(%error, "dropping cross-chain request");
break;
}
Expand Down
13 changes: 7 additions & 6 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,13 @@ impl Runnable for Job {
let messages = certificates
.iter()
.map(|certificate| {
HandleConfirmedCertificateRequest {
certificate: certificate.clone(),
blobs: vec![],
wait_for_outgoing_messages: true,
}
.into()
RpcMessage::ConfirmedCertificate(Box::new(
HandleConfirmedCertificateRequest {
certificate: certificate.clone(),
blobs: vec![],
wait_for_outgoing_messages: true,
},
))
})
.collect();
let responses = context
Expand Down
26 changes: 14 additions & 12 deletions linera-service/src/proxy/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,19 @@ where
match message {
VersionInfoQuery => {
// We assume each shard is running the same version as the proxy
Ok(Some(linera_version::VersionInfo::default().into()))
Ok(Some(RpcMessage::VersionInfoResponse(
linera_version::VersionInfo::default().into(),
)))
}
GenesisConfigHashQuery => Ok(Some(RpcMessage::GenesisConfigHashResponse(Box::new(
self.genesis_config.hash(),
)))),
DownloadBlobContent(blob_id) => {
let content = self.storage.read_blob(*blob_id).await?.into_content();
Ok(Some(RpcMessage::DownloadBlobContentResponse(Box::new(
content,
))))
}
GenesisConfigHashQuery => Ok(Some(RpcMessage::GenesisConfigHashResponse(
self.genesis_config.hash().into(),
))),
DownloadBlobContent(blob_id) => Ok(Some(
self.storage
.read_blob(*blob_id)
.await?
.into_content()
.into(),
)),
DownloadConfirmedBlock(hash) => {
Ok(Some(RpcMessage::DownloadConfirmedBlockResponse(Box::new(
self.storage
Expand All @@ -313,7 +314,8 @@ where
))))
}
DownloadCertificates(hashes) => {
Ok(Some(self.storage.read_certificates(hashes).await?.into()))
let certificates = self.storage.read_certificates(hashes).await?;
Ok(Some(RpcMessage::DownloadCertificatesResponse(certificates)))
}
BlobLastUsedBy(blob_id) => Ok(Some(RpcMessage::BlobLastUsedByResponse(Box::new(
self.storage.read_blob_state(*blob_id).await?.last_used_by,
Expand Down

0 comments on commit 6c5519c

Please sign in to comment.