Skip to content

Commit

Permalink
feat_: phase-1 of use single content-topic for all community chats (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Jan 20, 2025
1 parent e6738e5 commit 40b4ae4
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 5 deletions.
2 changes: 1 addition & 1 deletion protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes

hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.ContentTopic)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type RawMessage struct {
Ephemeral bool
BeforeDispatch func(*RawMessage) error
HashRatchetGroupID []byte
ContentTopic string
PubsubTopic string
ResendType ResendType
ResendMethod ResendMethod
Expand Down
7 changes: 7 additions & 0 deletions protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,13 @@ func (o *Community) setPrivateKey(pk *ecdsa.PrivateKey) {
}
}

func (o *Community) UniversalChatID() string {
// Using Member updates channelID as chatID to act as a universal content-topic for all chats in the community as explained here https://forum.vac.dev/t/status-communities-review-and-proposed-usage-of-waku-content-topics/335
// This is to match filter criteria of community with the content-topic usage.
// This specific topic is chosen as existing users before the change are already subscribed to this and will not get affected by it.
return o.MemberUpdateChannelID()
}

func (o *Community) SetResendAccountsClock(clock uint64) {
o.config.CommunityDescription.ResendAccountsClock = clock
}
Expand Down
14 changes: 14 additions & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4279,6 +4279,20 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) {
return chatIDs, nil
}

func (m *Manager) GetOwnedCommunitiesUniversalChatIDs() (map[string]bool, error) {
ownedCommunities, err := m.Controlled()
if err != nil {
return nil, err
}
chatIDs := make(map[string]bool)
for _, c := range ownedCommunities {
if c.Joined() {
chatIDs[c.UniversalChatID()] = true
}
}
return chatIDs, nil
}

func (m *Manager) StoreWakuMessage(message *wakutypes.Message) error {
return m.persistence.SaveWakuMessage(message)
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.UniversalChatID()).ContentTopic)

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := wakutypes.BytesToTopic(transport.ToTopic(chat.ID))
topics := []wakutypes.TopicType{topic}
communityCommonTopic := wakutypes.BytesToTopic(transport.ToTopic(community.UniversalChatID()))
topics := []wakutypes.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
20 changes: 18 additions & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,10 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec co
}

func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMessage) (common.RawMessage, error) {
if rawMessage.ContentTopic == "" {
rawMessage.ContentTopic = rawMessage.LocalChatID
}

var err error
var id []byte
logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", rawMessage.LocalChatID))
Expand Down Expand Up @@ -1994,7 +1998,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe

case ChatTypePublic, ChatTypeProfile:
logger.Debug("sending public message", zap.String("chatName", chat.Name))
id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage)
id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage)
if err != nil {
return rawMessage, err
}
Expand All @@ -2004,6 +2008,9 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
if err != nil {
return rawMessage, err
}
// Use a single content-topic for all community chats.
// Reasoning: https://github.com/status-im/status-go/pull/5864
rawMessage.ContentTopic = community.UniversalChatID()
rawMessage.PubsubTopic = community.PubsubTopic()

canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), rawMessage.MessageType)
Expand Down Expand Up @@ -2031,7 +2038,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
}
isEncrypted := isCommunityEncrypted || isChannelEncrypted
if !isEncrypted {
id, err = m.sender.SendPublic(ctx, chat.ID, rawMessage)
id, err = m.sender.SendPublic(ctx, rawMessage.ContentTopic, rawMessage)
if err != nil {
return rawMessage, err
}
Expand Down Expand Up @@ -3341,6 +3348,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Info("failed to retrieve admin communities", zap.Error(err))
}

// fetch universal chatIDs as well.
controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs()
if err != nil {
logger.Info("failed to retrieve controlled communities", zap.Error(err))
}
for chatID, flag := range controlledCommunitiesUniversalChatIDs {
controlledCommunitiesChatIDs[chatID] = flag
}

iterator := m.retrievedMessagesIteratorFactory(chatWithMessages)
for iterator.HasNext() {
filter, messages := iterator.Next()
Expand Down
9 changes: 8 additions & 1 deletion protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2717,7 +2717,12 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))

publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)

for _, filter := range defaultFilters {
_, err := m.transport.RemoveFilterByChatID(filter.ChatID)
if err != nil {
return err
}
}
for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
Expand Down Expand Up @@ -3954,6 +3959,8 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
topics = append(topics, filter.ContentTopic)
}

filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID()))

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
// possibly missed since then
Expand Down

0 comments on commit 40b4ae4

Please sign in to comment.