Skip to content

Commit

Permalink
Merge pull request #267 from GetStream/feat/custom-attachment-upload
Browse files Browse the repository at this point in the history
Async Attachment upload, Support for Custom Attachment Upload (CDN)
  • Loading branch information
imtoori authored Feb 18, 2021
2 parents d6a3a97 + 29ac662 commit d7d0a45
Show file tree
Hide file tree
Showing 62 changed files with 2,768 additions and 1,332 deletions.
6 changes: 6 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 1.1.0-beta

- Fixed minor bugs
- Add support for custom attachment upload [docs here](https://getstream.io/chat/docs/flutter-dart/file_uploads/?language=dart)
- Add support for asynchronous attachment upload

## 1.0.3-beta

- Fixed issue with disconnecting after connecting without awaiting the connection result
Expand Down
1 change: 1 addition & 0 deletions packages/stream_chat/analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ include: package:pedantic/analysis_options.yaml
analyzer:
exclude:
- lib/**/*.g.dart
- lib/**/*.freezed.dart
- example/*
- test/*

Expand Down
303 changes: 274 additions & 29 deletions packages/stream_chat/lib/src/api/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import 'dart:async';
import 'dart:convert';
import 'dart:math';

import 'package:pedantic/pedantic.dart' show unawaited;
import 'package:dio/dio.dart';
import 'package:logging/logging.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/api/retry_queue.dart';
import 'package:stream_chat/src/event_type.dart';
import 'package:stream_chat/src/models/attachment_file.dart';
import 'package:stream_chat/src/models/channel_state.dart';
import 'package:stream_chat/src/models/user.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:uuid/uuid.dart';

import '../client.dart';
import '../models/event.dart';
Expand Down Expand Up @@ -171,19 +172,150 @@ class Channel {
/// Call [watch] to initialize the client or instantiate it using [Channel.fromState]
Future<bool> get initialized => _initializedCompleter.future;

/// Send a message to this channel
final _cancelableAttachmentUploadRequest = <String, CancelToken>{};
final _messageAttachmentsUploadCompleter = <String, Completer>{};

/// Cancels [attachmentId] upload request. Throws exception if the request hasn't
/// even started yet, Already completed or Already cancelled.
///
/// Optionally, provide a [reason] for the cancellation.
void cancelAttachmentUpload(
String attachmentId, {
String reason,
}) {
final cancelToken = _cancelableAttachmentUploadRequest[attachmentId];
if (cancelToken == null) {
throw Exception(
"Upload request for this Attachment hasn't started yet or else Already completed",
);
}
if (cancelToken.isCancelled) throw Exception('Already cancelled');
cancelToken.cancel(reason);
}

/// Retries the failed [attachmentId] upload request.
Future<void> retryAttachmentUpload(String messageId, String attachmentId) {
return _uploadAttachments(messageId, [attachmentId]);
}

Future<void> _uploadAttachments(
String messageId,
Iterable<String> attachmentIds,
) {
var message = state.messages.firstWhere(
(it) => it.id == messageId,
orElse: () => null,
);

if (message == null) {
throw Exception('Error, Message not found');
}

final attachments = message.attachments.where((it) {
if (it.uploadState.isSuccess) return false;
return attachmentIds.contains(it.id);
});

if (attachments.isEmpty) {
client.logger.info('No attachments available to upload');
if (message.attachments.every((it) => it.uploadState.isSuccess)) {
_messageAttachmentsUploadCompleter.remove(messageId)?.complete(message);
}
return Future.value();
}

client.logger.info('Found ${attachments.length} attachments');
return Future.wait(attachments.map((it) {
client.logger.info('Uploading ${it.id} attachment...');

void updateAttachment(Attachment attachment) {
final index = message.attachments.indexWhere((it) {
return it.id == attachment.id;
});
if (index != -1) {
message.attachments[index] = attachment;
state?.addMessage(message);
}
}

void onSendProgress(int sent, int total) {
updateAttachment(it.copyWith(
uploadState: UploadState.inProgress(uploaded: sent, total: total),
));
}

final isImage = it.type == 'image';
final cancelToken = CancelToken();
Future<String> future;
if (isImage) {
future = sendImage(
it.file,
onSendProgress: onSendProgress,
cancelToken: cancelToken,
).then((it) => it.file);
} else {
future = sendFile(
it.file,
onSendProgress: onSendProgress,
cancelToken: cancelToken,
).then((it) => it.file);
}
_cancelableAttachmentUploadRequest[it.id] = cancelToken;
return future.then((url) {
client.logger.info('Attachment ${it.id} uploaded successfully...');
if (isImage) {
updateAttachment(
it.copyWith(imageUrl: url, uploadState: UploadState.success()),
);
} else {
updateAttachment(
it.copyWith(assetUrl: url, uploadState: UploadState.success()),
);
}
}).catchError((e, stk) {
updateAttachment(
it.copyWith(uploadState: UploadState.failed(error: e.toString())),
);
}).whenComplete(() {
_cancelableAttachmentUploadRequest.remove(it.id);
});
})).whenComplete(() {
if (message.attachments.every((it) => it.uploadState.isSuccess)) {
_messageAttachmentsUploadCompleter.remove(messageId)?.complete(message);
}
});
}

/// Send a [message] to this channel.
/// Waits for a [_messageAttachmentsUploadCompleter] to complete
/// before actually sending the message.
Future<SendMessageResponse> sendMessage(Message message) async {
final messageId = message.id ?? Uuid().v4();
// Cancelling previous completer in case it's called again in the process
// Eg. Updating the message while the previous call is in progress.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError('Message Cancelled');

final quotedMessage = state?.messages?.firstWhere(
(m) => m.id == message?.quotedMessageId,
orElse: () => null,
);
final newMessage = message.copyWith(
message = message.copyWith(
createdAt: message.createdAt ?? DateTime.now(),
user: _client.state.user,
id: messageId,
quotedMessage: quotedMessage,
status: MessageSendingStatus.sending,
attachments: message.attachments?.map(
(it) {
if (it.uploadState.isSuccess) return it;
return it.copyWith(
uploadState: UploadState.inProgress(
uploaded: 0,
total: it.file?.size ?? it.extraData['file_size'],
),
);
},
)?.toList(),
);

if (message.parentId != null && message.id == null) {
Expand All @@ -195,49 +327,156 @@ class Channel {
));
}

state?.addMessage(newMessage);
state?.addMessage(message);

try {
final response = await _client.post(
'$_channelURL/message',
data: {
'message': message
.copyWith(
id: messageId,
)
.toJson()
if (message.attachments?.isNotEmpty == true) {
final attachmentsUploadCompleter = Completer<Message>();
_messageAttachmentsUploadCompleter[message.id] =
attachmentsUploadCompleter;

unawaited(_uploadAttachments(
message.id,
message.attachments.map((it) => it.id),
));

message = await attachmentsUploadCompleter.future;
}

final response = await _client.sendMessage(message, id, type);
state?.addMessage(response.message);
return response;
} catch (error) {
if (error is DioError && error.type != DioErrorType.RESPONSE) {
state?.retryQueue?.add([message]);
}
rethrow;
}
}

/// Updates the [message] in this channel.
/// Waits for a [_messageAttachmentsUploadCompleter] to complete
/// before actually updating the message.
Future<UpdateMessageResponse> updateMessage(Message message) async {
// Cancelling previous completer in case it's called again in the process
// Eg. Updating the message while the previous call is in progress.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError('Message Cancelled');

message = message.copyWith(
status: MessageSendingStatus.updating,
updatedAt: message.updatedAt ?? DateTime.now(),
attachments: message.attachments?.map(
(it) {
if (it.uploadState.isSuccess) return it;
return it.copyWith(
uploadState: UploadState.inProgress(
uploaded: 0,
total: it.file?.size ?? it.extraData['file_size'],
),
);
},
)?.toList(),
);

state?.addMessage(message);

try {
if (message.attachments?.isNotEmpty == true) {
final attachmentsUploadCompleter = Completer<Message>();
_messageAttachmentsUploadCompleter[message.id] =
attachmentsUploadCompleter;

unawaited(_uploadAttachments(
message.id,
message.attachments.map((it) => it.id),
));

message = await attachmentsUploadCompleter.future;
}

final response = await _client.updateMessage(message);
state?.addMessage(response?.message?.copyWith(
ownReactions: message.ownReactions,
));
return response;
} catch (error) {
if (error is DioError && error.type != DioErrorType.RESPONSE) {
state?.retryQueue?.add([message]);
}
rethrow;
}
}

/// Deletes the [message] from the channel.
Future<EmptyResponse> deleteMessage(Message message) async {
// Directly deleting the local messages which are not yet sent to server
if (message.status == MessageSendingStatus.sending ||
message.status == MessageSendingStatus.failed) {
state.addMessage(message.copyWith(
type: 'deleted',
status: MessageSendingStatus.sent,
));

// Removing the attachments upload completer to stop the `sendMessage`
// waiting for attachments to complete.
_messageAttachmentsUploadCompleter
.remove(message.id)
?.completeError(Exception('Message deleted'));
return EmptyResponse();
}

try {
message = message.copyWith(
type: 'deleted',
status: MessageSendingStatus.deleting,
deletedAt: message.deletedAt ?? DateTime.now(),
);

final res = _client.decode(response.data, SendMessageResponse.fromJson);
state?.addMessage(message);

final response = await _client.deleteMessage(message);

state?.addMessage(res.message);
state?.addMessage(message.copyWith(status: MessageSendingStatus.sent));

return res;
return response;
} catch (error) {
if (error is DioError && error.type != DioErrorType.RESPONSE) {
state?.retryQueue?.add([newMessage]);
state?.retryQueue?.add([message]);
}
rethrow;
}
}

/// Send a file to this channel
Future<SendFileResponse> sendFile(MultipartFile file) async {
final response = await _client.post(
'$_channelURL/file',
data: FormData.fromMap({'file': file}),
Future<SendFileResponse> sendFile(
AttachmentFile file, {
ProgressCallback onSendProgress,
CancelToken cancelToken,
}) {
return _client.sendFile(
file,
id,
type,
onSendProgress: onSendProgress,
cancelToken: cancelToken,
);
return _client.decode(response.data, SendFileResponse.fromJson);
}

/// Send an image to this channel
Future<SendImageResponse> sendImage(MultipartFile file) async {
final response = await _client.post(
'$_channelURL/image',
data: FormData.fromMap({'file': file}),
Future<SendImageResponse> sendImage(
AttachmentFile file, {
ProgressCallback onSendProgress,
CancelToken cancelToken,
}) {
return _client.sendImage(
file,
id,
type,
onSendProgress: onSendProgress,
cancelToken: cancelToken,
);
return _client.decode(response.data, SendImageResponse.fromJson);
}

/// Delete a file from this channel
Expand Down Expand Up @@ -967,7 +1206,13 @@ class ChannelClientState {
?.getChannelThreads(_channel.cid)
?.then((threads) {
_threads = threads;
retryFailedMessages();
})?.then((_) {
_channel._client.chatPersistenceClient
?.getChannelStateByCid(_channel.cid)
?.then((state) {
updateChannelState(state);
retryFailedMessages();
});
});
}

Expand Down
Loading

0 comments on commit d7d0a45

Please sign in to comment.