Skip to content

Commit

Permalink
Merge pull request #1674 from GetStream/release/6.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
xsahil03x authored Jul 12, 2023
2 parents 3b957ee + a920b58 commit 162d42b
Show file tree
Hide file tree
Showing 46 changed files with 3,701 additions and 681 deletions.
7 changes: 7 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 6.6.0

🔄 Changed

- Deprecated `Message.status` in favor of `Message.state`.
- Deprecated `RetryPolicy.retryTimeout` in favor of `RetryPolicy.delayFactor`.

## 6.5.0

🔄 Changed
Expand Down
162 changes: 117 additions & 45 deletions packages/stream_chat/lib/src/client/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ class Channel {
// Eg. Updating the message while the previous call is in progress.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError('Message Cancelled');
?.completeError(const StreamChatError('Message cancelled'));

final quotedMessage = state!.messages.firstWhereOrNull(
(m) => m.id == message.quotedMessageId,
Expand All @@ -592,7 +592,7 @@ class Channel {
localCreatedAt: DateTime.now(),
user: _client.state.currentUser,
quotedMessage: quotedMessage,
status: MessageSendingStatus.sending,
state: MessageState.sending,
attachments: message.attachments.map(
(it) {
if (it.uploadState.isSuccess) return it;
Expand Down Expand Up @@ -630,15 +630,24 @@ class Channel {
),
);

final sentMessage = response.message.syncWith(message);
final sentMessage = response.message.syncWith(message).copyWith(
// Update the message state to sent.
state: MessageState.sent,
);

state!.updateMessage(sentMessage);
if (cooldown > 0) cooldownStartedAt = DateTime.now();
return response;
} catch (e) {
if (e is StreamChatNetworkError && e.isRetriable) {
state!._retryQueue.add([message]);
state!._retryQueue.add([
message.copyWith(
// Update the message state to failed.
state: MessageState.sendingFailed,
),
]);
}

rethrow;
}
}
Expand All @@ -653,17 +662,18 @@ class Channel {
Message message, {
bool skipEnrichUrl = false,
}) async {
_checkInitialized();
final originalMessage = message;

// Cancelling previous completer in case it's called again in the process
// Eg. Updating the message while the previous call is in progress.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError('Message Cancelled');
?.completeError(const StreamChatError('Message cancelled'));

// ignore: parameter_assignments
message = message.copyWith(
status: MessageSendingStatus.updating,
state: MessageState.updating,
localUpdatedAt: DateTime.now(),
attachments: message.attachments.map(
(it) {
Expand Down Expand Up @@ -699,19 +709,30 @@ class Channel {
),
);

final updatedMessage = response.message
.syncWith(message)
.copyWith(ownReactions: message.ownReactions);
final updateMessage = response.message.syncWith(message).copyWith(
// Update the message state to updated.
state: MessageState.updated,
ownReactions: message.ownReactions,
);

state?.updateMessage(updatedMessage);
state?.updateMessage(updateMessage);

return response;
} catch (e) {
if (e is StreamChatNetworkError) {
if (e.isRetriable) {
state!._retryQueue.add([message]);
state!._retryQueue.add([
message.copyWith(
// Update the message state to failed.
state: MessageState.updatingFailed,
),
]);
} else {
state?.updateMessage(originalMessage);
// Reset the message to original state if the update fails and is not
// retriable.
state?.updateMessage(originalMessage.copyWith(
state: MessageState.updatingFailed,
));
}
}
rethrow;
Expand All @@ -729,6 +750,23 @@ class Channel {
List<String>? unset,
bool skipEnrichUrl = false,
}) async {
_checkInitialized();
final originalMessage = message;

// Cancelling previous completer in case it's called again in the process
// Eg. Updating the message while the previous call is in progress.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError(const StreamChatError('Message cancelled'));

// ignore: parameter_assignments
message = message.copyWith(
state: MessageState.updating,
localUpdatedAt: DateTime.now(),
);

state?.updateMessage(message);

try {
// Wait for the previous update call to finish. Otherwise, the order of
// messages will not be maintained.
Expand All @@ -741,78 +779,120 @@ class Channel {
),
);

final updatedMessage = response.message
.syncWith(message)
.copyWith(ownReactions: message.ownReactions);
final updatedMessage = response.message.syncWith(message).copyWith(
// Update the message state to updated.
state: MessageState.updated,
ownReactions: message.ownReactions,
);

state?.updateMessage(updatedMessage);

return response;
} catch (e) {
if (e is StreamChatNetworkError && e.isRetriable) {
state!._retryQueue.add([message]);
if (e is StreamChatNetworkError) {
if (e.isRetriable) {
state!._retryQueue.add([
message.copyWith(
// Update the message state to failed.
state: MessageState.updatingFailed,
),
]);
} else {
// Reset the message to original state if the update fails and is not
// retriable.
state?.updateMessage(originalMessage.copyWith(
state: MessageState.updatingFailed,
));
}
}

rethrow;
}
}

final _deleteMessageLock = Lock();

/// Deletes the [message] from the channel.
Future<EmptyResponse> deleteMessage(Message message, {bool? hard}) async {
final hardDelete = hard ?? false;
Future<EmptyResponse> deleteMessage(
Message message, {
bool hard = false,
}) async {
_checkInitialized();

// Directly deleting the local messages which are not yet sent to server
if (message.status == MessageSendingStatus.sending ||
message.status == MessageSendingStatus.failed) {
// Directly deleting the local messages which are not yet sent to server.
if (message.remoteCreatedAt == null) {
state!.deleteMessage(
message.copyWith(
type: 'deleted',
localDeletedAt: DateTime.now(),
status: MessageSendingStatus.sent,
state: MessageState.deleted(hard: hard),
),
hardDelete: hardDelete,
hardDelete: hard,
);

// Removing the attachments upload completer to stop the `sendMessage`
// waiting for attachments to complete.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError(Exception('Message deleted'));
?.completeError(const StreamChatError('Message deleted'));

// Returning empty response to mark the api call as success.
return EmptyResponse();
}

try {
// ignore: parameter_assignments
message = message.copyWith(
type: 'deleted',
status: MessageSendingStatus.deleting,
deletedAt: message.deletedAt ?? DateTime.now(),
);
// ignore: parameter_assignments
message = message.copyWith(
type: 'deleted',
deletedAt: DateTime.now(),
state: MessageState.deleting(hard: hard),
);

state?.deleteMessage(message, hardDelete: hardDelete);
state?.deleteMessage(message, hardDelete: hard);

try {
// Wait for the previous delete call to finish. Otherwise, the order of
// messages will not be maintained.
final response = await _deleteMessageLock.synchronized(
() => _client.deleteMessage(message.id, hard: hard),
);

final deletedMessage = message.copyWith(
status: MessageSendingStatus.sent,
state: MessageState.deleted(hard: hard),
);

state?.deleteMessage(deletedMessage, hardDelete: hardDelete);
state?.deleteMessage(deletedMessage, hardDelete: hard);

return response;
} catch (e) {
if (e is StreamChatNetworkError && e.isRetriable) {
state!._retryQueue.add([message]);
state!._retryQueue.add([
message.copyWith(
// Update the message state to failed.
state: MessageState.deletingFailed(hard: hard),
),
]);
}
rethrow;
}
}

/// Retry the operation on the message based on the failed state.
///
/// For example, if the message failed to send, it will retry sending the
/// message and vice-versa.
Future<Object> retryMessage(Message message) async {
assert(message.state.isFailed, 'Message state is not failed');

return message.state.maybeWhen(
failed: (state, _) => state.when(
sendingFailed: () => sendMessage(message),
updatingFailed: () => updateMessage(message),
deletingFailed: (hard) => deleteMessage(message, hard: hard),
),
orElse: () => throw StateError('Message state is not failed'),
);
}

/// Pins provided message
Future<UpdateMessageResponse> pinMessage(
Message message, {
Expand Down Expand Up @@ -1895,15 +1975,7 @@ class ChannelClientState {
/// Retry failed message.
Future<void> retryFailedMessages() async {
final failedMessages = [...messages, ...threads.values.expand((v) => v)]
.where(
(message) =>
message.status != MessageSendingStatus.sent &&
message.createdAt.isBefore(
DateTime.now().subtract(const Duration(seconds: 5)),
),
)
.toList();

.where((it) => it.state.isFailed);
_retryQueue.add(failedMessages);
}

Expand Down
20 changes: 14 additions & 6 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class StreamChatClient {

_retryPolicy = retryPolicy ??
RetryPolicy(
shouldRetry: (_, attempt, __) => attempt < 5,
retryTimeout: (_, attempt, __) => Duration(seconds: attempt),
shouldRetry: (_, __, error) {
return error is StreamChatNetworkError && error.isRetriable;
},
);

state = ClientState(this);
Expand Down Expand Up @@ -1397,12 +1398,19 @@ class StreamChatClient {
);

/// Deletes the given message
Future<EmptyResponse> deleteMessage(String messageId, {bool? hard}) async {
final response =
await _chatApi.message.deleteMessage(messageId, hard: hard);
if (hard == true) {
Future<EmptyResponse> deleteMessage(
String messageId, {
bool hard = false,
}) async {
final response = await _chatApi.message.deleteMessage(
messageId,
hard: hard,
);

if (hard) {
await chatPersistenceClient?.deleteMessageById(messageId);
}

return response;
}

Expand Down
Loading

0 comments on commit 162d42b

Please sign in to comment.