Skip to content

Conversation

jxs
Copy link
Member

@jxs jxs commented Sep 16, 2025

Description

This is a draft implementation of partial messages for gossipsub following the spec PR and based on the Go implementation. Still WIP but should give a good idea of the direction we're heading.

{
// Return err if trying to publish the same partial message state we currently have.
if existing.available_parts() == partial_message.available_parts() {
return Err(PublishError::Duplicate);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct.

  • Imagine you have parts 1,2,3.
  • You tell your peers about those parts.
  • A peer comes back and says I want part 2.
  • You republish with the same parts in order to respond.
  • You get this error and fail to respond.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for explaining Marco, updated as we spoke

data_transform: D,

/// Partial messages received.
partial_messages: HashMap<TopicHash, HashMap<Vec<u8>, P>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to store P here? I think it's better if P is owned solely by the application.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, you are right, thanks Marco!

pub(crate) struct PartialData {
pub(crate) ihave: Vec<u8>,
pub(crate) iwant: Vec<u8>,
pub(crate) message: Vec<u8>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it useful to store the message here? It seems like wasted space, you only use it to check if the peer is sending you a duplicate.

Might be simpler to let the application handle dupes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Marco, I only left wanted and has wanted to avoid sending the same message and has to avoid notifying the application layer of duplicates.
Thanks!

@jxs jxs force-pushed the gossipsub-partial-messages branch 5 times, most recently from cb0e925 to e6f1ae4 Compare September 26, 2025 11:35
@jxs jxs force-pushed the gossipsub-partial-messages branch from e6f1ae4 to 69c2d95 Compare September 26, 2025 14:24
jxs and others added 5 commits October 2, 2025 12:29
Noticed two methods which duplicated each other logic precisely, chose the one with more clear name, composed and improved the doc a tiny bit.

Pull-Request: libp2p#6173.
This is the up-streaming of sigp#570 which has been beeing used by https://github.com/sigp/lighthouse/ for some weeks now:

This started with an attempt to solve libp2p#5751 using the previous internal async-channel.
After multiple ideas were discussed off band, replacing the async-channel with an internal more tailored priority queue seemed inevitable. This priority queue allows us to implement the cancellation of in flight IDONTWANT's very cleanly with the `remove_data_messages` function. Clearing the stale messages likewise becomes simpler as we also make use of `remove_data_messages` .

Pull-Request: libp2p#6175.
Copy link
Member

@dknopik dknopik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts and some nitpicks.

#[derive(Debug)]
pub enum RpcOut {
/// Publish a Gossipsub message on network.`timeout` limits the duration the message
/// PublishV a Gossipsub message on network.`timeout` limits the duration the message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated!


/// Returns metadata describing which parts of the message are available and which parts we want.
///
/// This metadata is application-defined and should encode information about
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial sentence

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Daniel, updated

Comment on lines 717 to 718
// If partial set, filter out peers who only want partial messages for the topic.
fn get_publish_peers(&mut self, topic_hash: &TopicHash, partial: bool) -> HashSet<PeerId> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that partial being true causes the filtering is confusing, as this is set to true on non-partial messages and vice-versa. I'd either call this filter_partial or invert the condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, agreed and updated. Thanks Daniel!

/// - Optional remaining metadata if more parts are still available after this one
fn partial_message_bytes_from_metadata(
&self,
metadata: Option<impl AsRef<[u8]>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me what the expected behaviour is if metadata is None. I guess usually send everything we have to the peer?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Ethereum this is the case where we still haven't received a peer's metadata and could maybe eager push data to them. There are, at a high level, 3 options:

  • Don't return anything.
  • Return everything (probably not what we want to do)
  • Return cells we didn't have locally (cells that we got from the network, not getBlobs).

fn partial_message_bytes_from_metadata(
&self,
metadata: Option<impl AsRef<[u8]>>,
) -> Result<(impl AsRef<[u8]>, Option<impl AsRef<[u8]>>), PartialMessageError>;
Copy link
Member

@dknopik dknopik Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, if we have nothing to send to the peer, we return Ok((vec![], metadata)). This seems unintuitive and potentially inefficient to me, as we have to clone metadata and publish_partial has to compare it to the previous value. Maybe change it to allow returning Ok(None) to signal this?

timeout: Delay::new(self.config.publish_queue_duration()),
RpcOut::PartialMessage {
message: message_data,
metadata: partial_message.parts_metadata().as_ref().to_vec(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe get partial_message.parts_metadata() before the loop to avoid useless re-encoding (in case the implementation encodes ad-hoc)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah nice catch, before each peer had its own metadata so this was not possible. Updated!

///
/// Returns `Ok(())` if the data was successfully integrated, or `Err`,
/// if the data was invalid or couldn't be processed.
fn extend_from_encoded_partial_message(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused as far as I can tell. Yes, something like this is likely needed by the application, but as the behaviour does not need to access this, we can leave the exact interface up to the application (as maybe some external info is needed to extend the partial message).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, good point. Removed it

dknopik

This comment was marked as duplicate.

self.leave(&topic_hash);
#[cfg(feature = "partial_messages")]
{
self.partial_only_topics.insert(topic_hash.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, thanks Daniel updated!

pub(crate) struct PartialData {
/// The current peer partial metadata.
pub(crate) metadata: Option<Vec<u8>>,
/// The remaining heartbeats for this message to be deleted.

This comment was marked as resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, we can represent that we sent all data with None.

@MarcoPolo
Copy link

My review comments as a patch: https://notes.marcopolo.io/view/d804e35e05fa79cfc1ed38e920aa01f1

Import review commit locally:

curl https://notes.marcopolo.io/raw/d804e35e05fa79cfc1ed38e920aa01f1 | git am

Feel free to respond inline with a new patch or quote relevant sections here and respond.

(I'm experimenting with alternative code-review processes, this is a rough prototype of reviews as separate commits. Feedback welcome)

allow updating local data with remote data with monotonicity.
I.e if we receive an oudated partial `Metadata::update()` should not override previous data.
fn as_slice(&self) -> &[u8];
/// try to Update the `Metadata` with the remote data,
/// return true if it was updated.
fn update(&self, data: &[u8]) -> bool;
Copy link
Member

@dknopik dknopik Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about -> Result<bool, PartialMessageError>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, makes sense. Thanks Daniel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs This also need to take a &mut self

/// 5. Received partial data is integrated using `extend_from_encoded_partial_message()`
/// 6. The `group_id()` ties all parts of the same logical message together
pub trait Partial {
type Metadata: Metadata;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should parts_metadata return this?

Copy link
Member

@dknopik dknopik Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, I think the way to go is to remove the assoc type and keep parts_metadata the way it is for now :)

pub fn publish_partial<P: Partial>(
&mut self,
topic: impl Into<TopicHash>,
partial_message: P,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&P? 🙏

@jxs jxs changed the title Gossipsub partial messages feat: implement gossipsub partial messages extension Oct 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants