Skip to content

Commit

Permalink
Merge pull request #1625 from GetStream/release/6.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
xsahil03x authored Jun 19, 2023
2 parents caeadfe + 1bfeff9 commit 7457df8
Show file tree
Hide file tree
Showing 51 changed files with 1,842 additions and 560 deletions.
9 changes: 9 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 6.4.0

🐞 Fixed

- [[#1293]](https://github.com/GetStream/stream-chat-flutter/issues/1293) Fixed wrong message order when sending
messages quickly.
- [[#1612]](https://github.com/GetStream/stream-chat-flutter/issues/1612) Fixed `Channel.isMutedStream` does not emit
when channel mute expires.

## 6.3.0

🐞 Fixed
Expand Down
147 changes: 103 additions & 44 deletions packages/stream_chat/lib/src/client/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/retry_queue.dart';
import 'package:stream_chat/src/core/util/utils.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:synchronized/synchronized.dart';

/// The maximum time the incoming [Event.typingStart] event is valid before a
/// [Event.typingStop] event is emitted automatically.
Expand Down Expand Up @@ -563,6 +564,8 @@ class Channel {
});
}

final _sendMessageLock = Lock();

/// Send a [message] to this channel.
///
/// If [skipPush] is true the message will not send a push notification.
Expand All @@ -586,7 +589,7 @@ class Channel {
);
// ignore: parameter_assignments
message = message.copyWith(
createdAt: message.createdAt,
localCreatedAt: DateTime.now(),
user: _client.state.currentUser,
quotedMessage: quotedMessage,
status: MessageSendingStatus.sending,
Expand Down Expand Up @@ -615,14 +618,21 @@ class Channel {
message = await attachmentsUploadCompleter.future;
}

final response = await _client.sendMessage(
message,
id!,
type,
skipPush: skipPush,
skipEnrichUrl: skipEnrichUrl,
// Wait for the previous sendMessage call to finish. Otherwise, the order
// of messages will not be maintained.
final response = await _sendMessageLock.synchronized(
() => _client.sendMessage(
message,
id!,
type,
skipPush: skipPush,
skipEnrichUrl: skipEnrichUrl,
),
);
state!.updateMessage(response.message);

final sentMessage = response.message.syncWith(message);

state!.updateMessage(sentMessage);
if (cooldown > 0) cooldownStartedAt = DateTime.now();
return response;
} catch (e) {
Expand All @@ -633,6 +643,8 @@ class Channel {
}
}

final _updateMessageLock = Lock();

/// Updates the [message] in this channel.
///
/// Waits for a [_messageAttachmentsUploadCompleter] to complete
Expand All @@ -652,7 +664,7 @@ class Channel {
// ignore: parameter_assignments
message = message.copyWith(
status: MessageSendingStatus.updating,
updatedAt: message.updatedAt,
localUpdatedAt: DateTime.now(),
attachments: message.attachments.map(
(it) {
if (it.uploadState.isSuccess) return it;
Expand All @@ -678,16 +690,20 @@ class Channel {
message = await attachmentsUploadCompleter.future;
}

final response = await _client.updateMessage(
message,
skipEnrichUrl: skipEnrichUrl,
// Wait for the previous update call to finish. Otherwise, the order of
// messages will not be maintained.
final response = await _updateMessageLock.synchronized(
() => _client.updateMessage(
message,
skipEnrichUrl: skipEnrichUrl,
),
);

final m = response.message.copyWith(
ownReactions: message.ownReactions,
);
final updatedMessage = response.message
.syncWith(message)
.copyWith(ownReactions: message.ownReactions);

state?.updateMessage(m);
state?.updateMessage(updatedMessage);

return response;
} catch (e) {
Expand All @@ -714,16 +730,20 @@ class Channel {
bool skipEnrichUrl = false,
}) async {
try {
final response = await _client.partialUpdateMessage(
message.id,
set: set,
unset: unset,
skipEnrichUrl: skipEnrichUrl,
// Wait for the previous update call to finish. Otherwise, the order of
// messages will not be maintained.
final response = await _updateMessageLock.synchronized(
() => _client.partialUpdateMessage(
message.id,
set: set,
unset: unset,
skipEnrichUrl: skipEnrichUrl,
),
);

final updatedMessage = response.message.copyWith(
ownReactions: message.ownReactions,
);
final updatedMessage = response.message
.syncWith(message)
.copyWith(ownReactions: message.ownReactions);

state?.updateMessage(updatedMessage);

Expand All @@ -736,6 +756,8 @@ class Channel {
}
}

final _deleteMessageLock = Lock();

/// Deletes the [message] from the channel.
Future<EmptyResponse> deleteMessage(Message message, {bool? hard}) async {
final hardDelete = hard ?? false;
Expand All @@ -746,7 +768,7 @@ class Channel {
state!.deleteMessage(
message.copyWith(
type: 'deleted',
deletedAt: message.deletedAt ?? DateTime.now(),
localDeletedAt: DateTime.now(),
status: MessageSendingStatus.sent,
),
hardDelete: hardDelete,
Expand All @@ -770,13 +792,18 @@ class Channel {

state?.deleteMessage(message, hardDelete: hardDelete);

final response = await _client.deleteMessage(message.id, hard: hard);
// Wait for the previous delete call to finish. Otherwise, the order of
// messages will not be maintained.
final response = await _deleteMessageLock.synchronized(
() => _client.deleteMessage(message.id, hard: hard),
);

state?.deleteMessage(
message.copyWith(status: MessageSendingStatus.sent),
hardDelete: hardDelete,
final deletedMessage = message.copyWith(
status: MessageSendingStatus.sent,
);

state?.deleteMessage(deletedMessage, hardDelete: hardDelete);

return response;
} catch (e) {
if (e is StreamChatNetworkError && e.isRetriable) {
Expand Down Expand Up @@ -1420,15 +1447,32 @@ class Channel {
);
}

// Timer to keep track of mute expiration. This is used to update the channel
// state when the mute expires.
Timer? _muteExpirationTimer;

/// Mutes the channel.
Future<EmptyResponse> mute({Duration? expiration}) {
_checkInitialized();

// If there is a expiration set, we will set a timer to automatically unmute
// the channel when the mute expires.
if (expiration != null) {
_muteExpirationTimer?.cancel();
_muteExpirationTimer = Timer(expiration, unmute);
}

return _client.muteChannel(cid!, expiration: expiration);
}

/// Unmute the channel.
Future<EmptyResponse> unmute() {
_checkInitialized();

// Cancel the mute expiration timer if it is set.
_muteExpirationTimer?.cancel();
_muteExpirationTimer = null;

return _client.unmuteChannel(cid!);
}

Expand Down Expand Up @@ -1558,6 +1602,7 @@ class Channel {
void dispose() {
client.state.removeChannel('$cid');
state?.dispose();
_muteExpirationTimer?.cancel();
_keyStrokeHandler.cancel();
}

Expand Down Expand Up @@ -1942,8 +1987,9 @@ class ChannelClientState {
)
.listen((event) {
final message = event.message!;
if (isUpToDate ||
(message.parentId != null && message.showInChannel != true)) {
final showInChannel =
message.parentId != null && message.showInChannel != true;
if (isUpToDate || showInChannel) {
updateMessage(message);
}

Expand All @@ -1960,10 +2006,10 @@ class ChannelClientState {
var newMessages = [...messages];
final oldIndex = newMessages.indexWhere((m) => m.id == message.id);
if (oldIndex != -1) {
var updatedMessage = message;
final oldMessage = newMessages[oldIndex];
var updatedMessage = message.syncWith(oldMessage);
// Add quoted message to the message if it is not present.
if (message.quotedMessageId != null && message.quotedMessage == null) {
final oldMessage = newMessages[oldIndex];
updatedMessage = updatedMessage.copyWith(
quotedMessage: oldMessage.quotedMessage,
);
Expand All @@ -1980,7 +2026,7 @@ class ChannelClientState {
return it.copyWith(
quotedMessage: updatedMessage.copyWith(
type: 'deleted',
deletedAt: updatedMessage.deletedAt ?? DateTime.now(),
deletedAt: DateTime.now(),
),
);
}).toList();
Expand All @@ -2004,7 +2050,7 @@ class ChannelClientState {
}

_channelState = _channelState.copyWith(
messages: newMessages..sort(_sortByCreatedAt),
messages: newMessages.sorted(_sortByCreatedAt),
pinnedMessages: newPinnedMessages,
channel: _channelState.channel?.copyWith(
lastMessageAt: message.createdAt,
Expand Down Expand Up @@ -2226,7 +2272,7 @@ class ChannelClientState {
...newThreads[parentId]!.where(
(newMessage) => !messages.any((m) => m.id == newMessage.id),
),
]..sort(_sortByCreatedAt);
].sorted(_sortByCreatedAt);
} else {
newThreads[parentId] = messages;
}
Expand All @@ -2245,15 +2291,10 @@ class ChannelClientState {

/// Update channelState with updated information.
void updateChannelState(ChannelState updatedState) {
final _existingStateMessages = _channelState.messages ?? [];
final _updatedStateMessages = updatedState.messages ?? [];
final _existingStateMessages = [...messages];
final newMessages = <Message>[
..._updatedStateMessages,
..._existingStateMessages
.where((m) =>
!_updatedStateMessages.any((newMessage) => newMessage.id == m.id))
.toList(),
]..sort(_sortByCreatedAt);
..._existingStateMessages.merge(updatedState.messages),
].sorted(_sortByCreatedAt);

final _existingStateWatchers = _channelState.watchers ?? [];
final _updatedStateWatchers = updatedState.watchers ?? [];
Expand Down Expand Up @@ -2471,3 +2512,21 @@ bool _pinIsValid(Message message) {
final now = DateTime.now();
return message.pinExpires!.isAfter(now);
}

extension on Iterable<Message> {
Iterable<Message> merge(Iterable<Message>? other) {
if (other == null) return this;

final messageMap = {for (final message in this) message.id: message};

for (final message in other) {
messageMap.update(
message.id,
message.syncWith,
ifAbsent: () => message,
);
}

return messageMap.values;
}
}
7 changes: 5 additions & 2 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import 'package:stream_chat/src/event_type.dart';
import 'package:stream_chat/src/ws/connection_status.dart';
import 'package:stream_chat/src/ws/websocket.dart';
import 'package:stream_chat/version.dart';
import 'package:synchronized/extension.dart';
import 'package:synchronized/synchronized.dart';

/// Handler function used for logging records. Function requires a single
/// [LogRecord] as the only parameter.
Expand Down Expand Up @@ -526,10 +526,13 @@ class StreamChatClient {
event.type == eventType4);
}

// Lock to make sure only one sync process is running at a time.
final _syncLock = Lock();

/// Get the events missed while offline to sync the offline storage
/// Will automatically fetch [cids] and [lastSyncedAt] if [persistenceEnabled]
Future<void> sync({List<String>? cids, DateTime? lastSyncAt}) {
return synchronized(() async {
return _syncLock.synchronized(() async {
final channels = cids ?? await chatPersistenceClient?.getChannelCids();
if (channels == null || channels.isEmpty) {
return;
Expand Down
Loading

0 comments on commit 7457df8

Please sign in to comment.