Skip to content

Commit 6ed2244

Browse files
committed
message: Consider unsubscribed/unknown channels in reconcileMessages
This fixes the "fourth buggy behavior" in zulip#1798: zulip#1798 (comment) Fixes-partly: zulip#1798
1 parent 7c59991 commit 6ed2244

File tree

3 files changed

+251
-26
lines changed

3 files changed

+251
-26
lines changed

lib/model/message.dart

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -409,20 +409,75 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
409409
// from the event queue, so there's inherently a race.
410410
//
411411
// If the fetched message reflects changes we haven't yet heard from the
412-
// event queue, then it doesn't much matter which version we use: we'll
413-
// soon get the corresponding events and apply the changes anyway.
414-
// But if it lacks changes we've already heard from the event queue, then
415-
// we won't hear those events again; the only way to wind up with an
412+
// event queue, then normally (see [1]) it doesn't much matter which version
413+
// we use: we'll soon get the corresponding events and apply the changes
414+
// anyway. But if it lacks changes we've already heard from the event queue,
415+
// then we won't hear those events again; the only way to wind up with an
416416
// updated message is to use the version we have, that already reflects
417-
// those events' changes. So we always stick with the version we have.
417+
// those events' changes. So we always stick with the version we have. [1]
418+
//
419+
// [1] With one exception: if the version we have was in an unsubscribed
420+
// channel when we got it or sometime since, we take the fetched version
421+
// instead. That's because our version might be stale; we don't expect
422+
// update events for messages in unsubscribed channels.
418423
for (int i = 0; i < messages.length; i++) {
419424
final message = messages[i];
420-
messages[i] = this.messages.putIfAbsent(message.id, () {
421-
message.matchContent = null;
422-
message.matchTopic = null;
423-
return message;
424-
});
425+
426+
messages[i] = this.messages.update(message.id,
427+
(current) => _reconcileWhenPresent(current, message),
428+
ifAbsent: () => _reconcileWhenAbsent(message));
429+
}
430+
}
431+
432+
Message _reconcileWhenPresent(Message current, Message incoming) {
433+
bool currentIsMaybeStale = false;
434+
if (incoming is StreamMessage) {
435+
if (subscriptions[incoming.streamId] != null) {
436+
// The message is in a subscribed channel. Remove it from
437+
// _maybeStaleChannelMessages if it was there.
438+
currentIsMaybeStale = _maybeStaleChannelMessages.remove(incoming.id);
439+
} else {
440+
currentIsMaybeStale = _maybeStaleChannelMessages.contains(incoming.id);
441+
}
442+
}
443+
444+
return currentIsMaybeStale
445+
? _stripMatchFields(incoming)
446+
: current;
447+
}
448+
449+
Message _reconcileWhenAbsent(Message incoming) {
450+
if (
451+
incoming is StreamMessage
452+
&& subscriptions[incoming.streamId] == null
453+
) {
454+
// The message is in an unsubscribed (or unknown) channel.
455+
// Add it to _maybeStaleChannelMessages.
456+
_maybeStaleChannelMessages.add(incoming.id);
425457
}
458+
return _stripMatchFields(incoming);
459+
}
460+
461+
/// Messages in [messages] whose data stream is or was presumably broken
462+
/// by the message being in an unsubscribed channel.
463+
///
464+
/// This is the subset of [messages] where the message was
465+
/// in an unsubscribed channel when we added it or sometime since.
466+
///
467+
/// We don't expect update events for messages in unsubscribed channels,
468+
/// so if some of these maybe-stale messages appear in a fetch,
469+
/// we'll always clobber our stored version with the fetched version.
470+
/// See implementation comment in [reconcileMessages].
471+
///
472+
/// (We have seen a few such events, actually --
473+
/// maybe because the channel only recently became unsubscribed? --
474+
/// but not consistently, and we're not supposed to rely on them.)
475+
final Set<int> _maybeStaleChannelMessages = {};
476+
477+
Message _stripMatchFields(Message message) {
478+
message.matchContent = null;
479+
message.matchTopic = null;
480+
return message;
426481
}
427482

428483
@override
@@ -489,6 +544,29 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
489544
);
490545
}
491546

547+
void handleChannelDeleteEvent(ChannelDeleteEvent event) {
548+
final channelIds = event.streams.map((channel) => channel.streamId);
549+
_handleSubscriptionsRemoved(channelIds);
550+
}
551+
552+
void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
553+
_handleSubscriptionsRemoved(event.streamIds);
554+
}
555+
556+
void _handleSubscriptionsRemoved(Iterable<int> channelIds) {
557+
if (channelIds.length > 1) {
558+
assert(channelIds is! Set);
559+
channelIds = Set.from(channelIds); // optimization
560+
}
561+
562+
// Linear in [messages].
563+
final affectedKnownMessageIds = messages.values
564+
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
565+
.map((message) => message.id);
566+
567+
_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
568+
}
569+
492570
void handleUserTopicEvent(UserTopicEvent event) {
493571
for (final view in _messageListViews) {
494572
view.handleUserTopicEvent(event);
@@ -502,10 +580,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
502580
}
503581

504582
void handleMessageEvent(MessageEvent event) {
583+
final message = event.message;
584+
505585
// If the message is one we already know about (from a fetch),
506586
// clobber it with the one from the event system.
507587
// See [reconcileMessages] for reasoning.
508-
messages[event.message.id] = event.message;
588+
messages[event.message.id] = message;
589+
590+
if (message is StreamMessage && subscriptions[message.streamId] == null) {
591+
// We didn't expect this event, because the channel is unsubscribed. But
592+
// that doesn't mean we should expect future events about this message.
593+
_maybeStaleChannelMessages.add(message.id);
594+
}
509595

510596
_handleMessageEventOutbox(event);
511597

@@ -594,6 +680,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
594680
// See [StreamConversation.displayRecipient] on why the invalidation is
595681
// needed.
596682
message.conversation.displayRecipient = null;
683+
684+
if (subscriptions[newStreamId] == null) {
685+
// The message was moved into an unsubscribed (or unknown) channel,
686+
// which means we expect our data on it to get stale.
687+
_maybeStaleChannelMessages.add(messageId);
688+
}
597689
}
598690

599691
if (newTopic != origTopic) {
@@ -616,6 +708,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
616708
void handleDeleteMessageEvent(DeleteMessageEvent event) {
617709
for (final messageId in event.messageIds) {
618710
messages.remove(messageId);
711+
_maybeStaleChannelMessages.remove(messageId);
619712
_editMessageRequests.remove(messageId);
620713
}
621714
for (final view in _messageListViews) {

lib/model/store.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,17 @@ class PerAccountStore extends PerAccountStoreBase with
821821

822822
case ChannelEvent():
823823
assert(debugLog("server event: stream/${event.op}"));
824+
if (event is ChannelDeleteEvent) {
825+
_messages.handleChannelDeleteEvent(event);
826+
}
824827
_channels.handleChannelEvent(event);
825828
notifyListeners();
826829

827830
case SubscriptionEvent():
828831
assert(debugLog("server event: subscription/${event.op}"));
832+
if (event is SubscriptionRemoveEvent) {
833+
_messages.handleSubscriptionRemoveEvent(event);
834+
}
829835
_channels.handleSubscriptionEvent(event);
830836
notifyListeners();
831837

test/model/message_test.dart

Lines changed: 141 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void main() {
3636

3737
// These "late" variables are the common state operated on by each test.
3838
// Each test case calls [prepare] to initialize them.
39-
late Subscription subscription;
39+
late Subscription? subscription;
4040
late PerAccountStore store;
4141
late FakeApiConnection connection;
4242
// [messageList] is here only for the sake of checking when it notifies.
@@ -54,15 +54,18 @@ void main() {
5454
/// Initialize [store] and the rest of the test state.
5555
Future<void> prepare({
5656
ZulipStream? stream,
57+
bool isChannelSubscribed = true,
5758
int? zulipFeatureLevel,
5859
}) async {
5960
stream ??= eg.stream(streamId: eg.defaultStreamMessageStreamId);
60-
subscription = eg.subscription(stream);
6161
final selfAccount = eg.selfAccount.copyWith(zulipFeatureLevel: zulipFeatureLevel);
6262
store = eg.store(account: selfAccount,
6363
initialSnapshot: eg.initialSnapshot(zulipFeatureLevel: zulipFeatureLevel));
6464
await store.addStream(stream);
65-
await store.addSubscription(subscription);
65+
if (isChannelSubscribed) {
66+
subscription = eg.subscription(stream);
67+
await store.addSubscription(subscription!);
68+
}
6669
connection = store.connection as FakeApiConnection;
6770
notifiedCount = 0;
6871
messageList = MessageListView.init(store: store,
@@ -533,18 +536,141 @@ void main() {
533536
});
534537
});
535538

536-
test('on ID collision, new message does not clobber old in store.messages', () async {
537-
await prepare();
538-
final message = eg.streamMessage(id: 1, content: '<p>foo</p>');
539-
await addMessages([message]);
540-
check(store.messages).deepEquals({1: message});
541-
final newMessage = eg.streamMessage(id: 1, content: '<p>bar</p>');
542-
final messages = [newMessage];
543-
store.reconcileMessages(messages);
544-
check(messages).deepEquals(
545-
// (We'll check more messages in an upcoming commit.)
546-
[message].map(conditionIdentical));
547-
check(store.messages).deepEquals({1: message});
539+
group('fetched message with ID already in store.messages', () {
540+
late Message messageCopy;
541+
542+
/// Makes a copy of the single message in [MessageStore.messages]
543+
/// by round-tripping through [Message.fromJson] and [Message.toJson].
544+
///
545+
/// If that message's [StreamMessage.conversation.displayRecipient]
546+
/// is null, callers must provide a non-null [displayRecipient]
547+
/// to allow [StreamConversation.fromJson] to complete without throwing.
548+
Message copyStoredMessage({String? displayRecipient}) {
549+
final message = store.messages.values.single;
550+
551+
Map<String, dynamic> json = message.toJson();
552+
if (
553+
message is StreamMessage
554+
&& message.conversation.displayRecipient == null
555+
) {
556+
if (displayRecipient == null) throw ArgumentError();
557+
json['display_recipient'] = displayRecipient;
558+
}
559+
560+
return Message.fromJson(json);
561+
}
562+
563+
/// Checks if the single message in [MessageStore.messages]
564+
/// is identical to [message].
565+
void checkStoredMessageIdenticalTo(Message message) {
566+
check(store.messages)
567+
.deepEquals({message.id: conditionIdentical(message)});
568+
}
569+
570+
test('DM', () async {
571+
await prepare();
572+
final message = eg.dmMessage(id: 1, from: eg.otherUser, to: [eg.selfUser]);
573+
574+
store.reconcileMessages([message]);
575+
checkStoredMessageIdenticalTo(message);
576+
store.reconcileMessages([copyStoredMessage()]);
577+
// Not clobbering, because the first call didn't mark stale.
578+
checkStoredMessageIdenticalTo(message);
579+
});
580+
581+
group('channel message; chooses correctly whether to clobber the stored version', () {
582+
// Exercise the ways we move the message in and out of the "maybe stale"
583+
// state. These include reconcileMessage itself, so sometimes we test
584+
// repeated calls to that with nothing else happening in between.
585+
586+
test('various conditions', () async {
587+
final channel = eg.stream();
588+
await prepare(stream: channel, isChannelSubscribed: true);
589+
final message = eg.streamMessage(id: 1, stream: channel);
590+
591+
final otherChannel = eg.stream();
592+
await store.addStream(otherChannel);
593+
594+
store.reconcileMessages([message]);
595+
checkStoredMessageIdenticalTo(message);
596+
store.reconcileMessages([copyStoredMessage()]);
597+
// Not clobbering, because the first call didn't mark stale,
598+
// because the message was in a subscribed channel.
599+
checkStoredMessageIdenticalTo(message);
600+
601+
await store.removeSubscription(channel.streamId);
602+
messageCopy = copyStoredMessage();
603+
store.reconcileMessages([messageCopy]);
604+
// Clobbering because the unsubscribe event marked the message stale.
605+
checkStoredMessageIdenticalTo(messageCopy);
606+
messageCopy = copyStoredMessage();
607+
store.reconcileMessages([messageCopy]);
608+
// (Check that reconcileMessage itself didn't unmark as stale.)
609+
checkStoredMessageIdenticalTo(messageCopy);
610+
611+
await store.addSubscription(eg.subscription(channel));
612+
messageCopy = copyStoredMessage();
613+
store.reconcileMessages([messageCopy]);
614+
// The channel became subscribed,
615+
// but the message's data hasn't been refreshed, so clobber…
616+
checkStoredMessageIdenticalTo(messageCopy);
617+
618+
store.reconcileMessages([copyStoredMessage()]);
619+
// …Now it's been refreshed, by reconcileMessages, so don't clobber.
620+
checkStoredMessageIdenticalTo(messageCopy);
621+
622+
check(store.subscriptions[otherChannel.streamId]).isNull();
623+
await store.handleEvent(
624+
eg.updateMessageEventMoveFrom(origMessages: [message],
625+
newStreamId: otherChannel.streamId));
626+
messageCopy = copyStoredMessage(displayRecipient: otherChannel.name);
627+
store.reconcileMessages([messageCopy]);
628+
// Message was moved to an unsubscribed channel, so clobber.
629+
checkStoredMessageIdenticalTo(messageCopy);
630+
messageCopy = copyStoredMessage();
631+
store.reconcileMessages([messageCopy]);
632+
// (Check that reconcileMessage itself didn't unmark as stale.)
633+
checkStoredMessageIdenticalTo(messageCopy);
634+
635+
// Subscribe, to mark message as not-stale, setting up another check…
636+
await store.addSubscription(eg.subscription(otherChannel));
637+
store.reconcileMessages([copyStoredMessage()]);
638+
await store.handleEvent(ChannelDeleteEvent(id: 1, streams: [otherChannel]));
639+
messageCopy = copyStoredMessage();
640+
store.reconcileMessages([messageCopy]);
641+
// Message was in a channel that became unknown, so clobber.
642+
checkStoredMessageIdenticalTo(messageCopy);
643+
});
644+
645+
test('in unsubscribed channel on first call', () async {
646+
await prepare(isChannelSubscribed: false);
647+
final message = eg.streamMessage(id: 1);
648+
649+
store.reconcileMessages([message]);
650+
checkStoredMessageIdenticalTo(message);
651+
652+
messageCopy = copyStoredMessage();
653+
store.reconcileMessages([messageCopy]);
654+
checkStoredMessageIdenticalTo(messageCopy);
655+
messageCopy = copyStoredMessage();
656+
store.reconcileMessages([messageCopy]);
657+
checkStoredMessageIdenticalTo(messageCopy);
658+
});
659+
660+
test('new-message event when in unsubscribed channel', () async {
661+
await prepare(isChannelSubscribed: false);
662+
final message = eg.streamMessage(id: 1);
663+
664+
await store.handleEvent(eg.messageEvent(message));
665+
666+
messageCopy = copyStoredMessage();
667+
store.reconcileMessages([messageCopy]);
668+
checkStoredMessageIdenticalTo(messageCopy);
669+
messageCopy = copyStoredMessage();
670+
store.reconcileMessages([messageCopy]);
671+
checkStoredMessageIdenticalTo(messageCopy);
672+
});
673+
});
548674
});
549675

550676
test('matchContent and matchTopic are removed', () async {

0 commit comments

Comments
 (0)