Skip to content
40 changes: 40 additions & 0 deletions lib/firestore/firestore.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,46 @@ class Firestore {

DocumentReference document(String path) => DocumentReference(_gateway, path);

/// Executes the given [TransactionHandler] and then attempts to commit the
/// changes applied within an atomic transaction.
///
/// In the [TransactionHandler], a set of reads and writes can be performed
/// atomically using the [Transaction] object passed to the [TransactionHandler].
/// After the [TransactionHandler] is run, [Firestore] will attempt to apply the
/// changes to the server. If any of the data read has been modified outside
/// of this [Transaction] since being read, then the transaction will be
/// retried by executing the provided [TransactionHandler] again. If the transaction still
/// fails after the specified [maxAttempts] retries, then the transaction will fail.
///
/// The [TransactionHandler] may be executed multiple times, it should be able
/// to handle multiple executions.
///
/// Data accessed with the transaction will not reflect local changes that
/// have not been committed. For this reason, it is required that all
/// reads are performed before any writes. Transactions must be performed
/// with an internet connection. Otherwise, reads will fail, and the final commit will fail.
///
/// By default transactions will retry 5 times. You can change the number of attempts
/// with [maxAttempts]. Attempts should be at least 1.
///
/// ```dart
/// await firestore.runTransaction(
/// (transaction) async {
/// final doc = await transaction.get('myCollection/documentId');
/// final value = doc.map['key'];
/// final newValue = value + 1;
/// transaction.update('myCollection/documentId', {'key': newValue});
/// },
/// );
/// ```
Future<T> runTransaction<T>(
TransactionHandler<T> handler, {
int maxAttempts = 5,
}) {
assert(maxAttempts >= 1, 'maxAttempts must be at least 1.');
return _gateway.runTransaction(handler, maxAttempts: maxAttempts);
}

void close() {
_gateway.close();
}
Expand Down
75 changes: 64 additions & 11 deletions lib/firestore/firestore_gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ typedef RequestAuthenticator = Future<void>? Function(
class FirestoreGateway {
final RequestAuthenticator? _authenticator;

final String database;
late final String database;

late final String documentDatabase;

final Map<String, _ListenStreamWrapper> _listenStreamCache;

Expand All @@ -28,9 +30,9 @@ class FirestoreGateway {
RequestAuthenticator? authenticator,
Emulator? emulator,
}) : _authenticator = authenticator,
database =
'projects/$projectId/databases/${databaseId ?? '(default)'}/documents',
_listenStreamCache = <String, _ListenStreamWrapper>{} {
database = 'projects/$projectId/databases/${databaseId ?? '(default)'}';
documentDatabase = '$database/documents';
_setupClient(emulator: emulator);
}

Expand Down Expand Up @@ -61,14 +63,14 @@ class FirestoreGateway {
..structuredQuery = query;
final target = Target()..query = queryTarget;
final request = ListenRequest()
..database = database
..database = documentDatabase
..addTarget = target;

_listenStreamCache[path] = _ListenStreamWrapper.create(
request,
(requestStream) => _client.listen(requestStream,
options: CallOptions(
metadata: {'google-cloud-resource-prefix': database})),
metadata: {'google-cloud-resource-prefix': documentDatabase})),
onDone: () => _listenStreamCache.remove(path));

return _mapCollectionStream(_listenStreamCache[path]!);
Expand All @@ -91,10 +93,18 @@ class FirestoreGateway {
return Document(this, response);
}

Future<Document> getDocument(path) async {
var rawDocument = await _client
.getDocument(GetDocumentRequest()..name = path)
.catchError(_handleError);
Future<Document> getDocument(
String path, {
List<int>? transaction,
}) async {
var getDocumentRequest = GetDocumentRequest()..name = path;

if (transaction != null) {
getDocumentRequest.transaction = transaction;
}

var rawDocument =
await _client.getDocument(getDocumentRequest).catchError(_handleError);
return Document(this, rawDocument);
}

Expand Down Expand Up @@ -125,14 +135,14 @@ class FirestoreGateway {
final documentsTarget = Target_DocumentsTarget()..documents.add(path);
final target = Target()..documents = documentsTarget;
final request = ListenRequest()
..database = database
..database = documentDatabase
..addTarget = target;

_listenStreamCache[path] = _ListenStreamWrapper.create(
request,
(requestStream) => _client.listen(requestStream,
options: CallOptions(
metadata: {'google-cloud-resource-prefix': database})),
metadata: {'google-cloud-resource-prefix': documentDatabase})),
onDone: () => _listenStreamCache.remove(path));

return _mapDocumentStream(_listenStreamCache[path]!.stream);
Expand All @@ -150,6 +160,49 @@ class FirestoreGateway {
.toList();
}

Future<T> runTransaction<T>(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate recursiveness as much as anyone but here it feels a bit unwarranted, especially since it exposes attempt and maxAttemps in a "public" method (the class isn't supposed to be called directly but still).

Since you have the transaction itself being run in a private method _runTransaction, would it make sense to wrap the retries in a loop instead of having the function calling itself?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your arguments make a lot of sense. @jsgalarraga , could you review this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cachapa for the review. It's been some months since I implemented the solution, so I don't remember why I didn't use a loop instead of recursion.
From a quick review at the code, I believe that the catching the aborted error in the loop and continue with it would be the hard part. However, I will think about it and try to refactor to use a loop or come up with strong reasons on why recursion might be a better solution.

Unfortunately I won't be able to get into this next week, but I will work on it as soon as I can.

PS. @evandrobubiak feel free to also contribute with code. I appreciate you reviving this PR, but I would kindly ask to avoid posting a message a couple hours after the review just to put pressure on getting the feature out.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsgalarraga thank you for getting back to me, and once again sorry for allowing this to go unreviewed for so long.

I appreciate you trying out a loop-based solution but if you think that doesn't work out I'm happy accepting the patch as-is.

TransactionHandler<T> transactionHandler, {
int maxAttempts = 5,
int attempt = 1,
}) async {
try {
return await _runTransaction(transactionHandler);
} on GrpcError catch (e) {
if (e.code == StatusCode.aborted && attempt < maxAttempts) {
return await runTransaction(
transactionHandler,
attempt: attempt++,
maxAttempts: maxAttempts,
);
} else {
rethrow;
}
}
}

Future<T> _runTransaction<T>(
TransactionHandler<T> transactionHandler,
) async {
final transactionRequest = BeginTransactionRequest(database: database);
var transactionResponse = await _client
.beginTransaction(transactionRequest)
.catchError(_handleError);

var transactionId = transactionResponse.transaction;
var transaction = Transaction(this, transactionId);

final result = await transactionHandler(transaction);

var commitRequest = CommitRequest(
database: database,
transaction: transactionId,
writes: transaction.mutations,
);
await _client.commit(commitRequest).catchError(_handleError);

return result;
}

void close() {
_listenStreamCache.forEach((_, stream) => stream.close());
_listenStreamCache.clear();
Expand Down
87 changes: 84 additions & 3 deletions lib/firestore/models.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import 'dart:collection';

import 'package:firedart/generated/google/firestore/v1/common.pb.dart';
import 'package:firedart/generated/google/firestore/v1/document.pb.dart' as fs;
import 'package:firedart/generated/google/firestore/v1/query.pb.dart';
import 'package:firedart/generated/google/firestore/v1/write.pb.dart';
import 'package:firedart/generated/google/protobuf/wrappers.pb.dart';
import 'package:firedart/generated/google/type/latlng.pb.dart';
import 'package:grpc/grpc.dart';
Expand All @@ -15,11 +17,11 @@ abstract class Reference {

String get id => path.substring(path.lastIndexOf('/') + 1);

String get fullPath => '${_gateway.database}/$path';
String get fullPath => '${_gateway.documentDatabase}/$path';

Reference(this._gateway, String path)
: path = _trimSlashes(path.startsWith(_gateway.database)
? path.substring(_gateway.database.length + 1)
: path = _trimSlashes(path.startsWith(_gateway.documentDatabase)
? path.substring(_gateway.documentDatabase.length + 1)
: path);

factory Reference.create(FirestoreGateway gateway, String path) {
Expand Down Expand Up @@ -373,3 +375,82 @@ class QueryReference extends Reference {
..compositeFilter = compositeFilter;
}
}

/// Signature of a transaction callback.
typedef TransactionHandler<T> = Future<T> Function(Transaction transaction);

/// Transaction class which is created from a call to [runTransaction()].
class Transaction {
final FirestoreGateway _gateway;
final List<int> _transaction;

Transaction(this._gateway, this._transaction);

final List<Write> _mutations = <Write>[];

/// An immutable list of the [Write]s that have been added to this transaction.
UnmodifiableListView<Write> get mutations => UnmodifiableListView(_mutations);

/// Reads the document referenced by the provided [path].
///
/// If the document does not exist, the operation throws a [GrpcError] with
/// [StatusCode.notFound].
Future<Document> get(String path) async {
return _gateway.getDocument(
_fullPath(path),
transaction: _transaction,
);
}

/// Deletes the document referred by the provided [path].
///
/// If the document does not exist, the operation does nothing and returns
/// normally.
void delete(String path) {
_mutations.add(
Write(delete: _fullPath(path)),
);
}

/// Updates fields provided in [data] for the document referred to by [path].
///
/// Only the fields specified in [data] will be updated. Fields that
/// are not specified in [data] will not be changed.
///
/// If the document does not yet exist, it will be created.
void update(String path, Map<String, dynamic> data) {
_mutations.add(
Write(
updateMask: DocumentMask(fieldPaths: data.keys),
update: fs.Document(
name: _fullPath(path),
fields: _encodeMap(data),
),
),
);
}

/// Sets fields provided in [data] for the document referred to by [path].
///
/// All fields will be overwritten with the provided [data]. This means
/// that all fields that are not specified in [data] will be deleted.
///
/// If the document does not yet exist, it will be created.
void set(String path, Map<String, dynamic> data) {
_mutations.add(
Write(
updateMask: null,
update: fs.Document(
name: _fullPath(path),
fields: _encodeMap(data),
),
),
);
}

String _fullPath(String path) => '${_gateway.documentDatabase}/$path';

Map<String, fs.Value> _encodeMap(Map<String, dynamic> map) {
return map.map((key, value) => MapEntry(key, TypeUtil.encode(value)));
}
}