From ac1da32bf47e0d919c3d8977277c564949e3b8c8 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 6 Mar 2024 09:29:42 +0200 Subject: [PATCH 1/5] feat: async writer --- .../sink/async/AsyncBigQuerySink.java | 65 ++++++ .../sink/async/AsyncBigQuerySinkBuilder.java | 79 +++++++ .../sink/async/AsyncBigQuerySinkWriter.java | 198 ++++++++++++++++++ .../sink/async/AsyncClientProvider.java | 79 +++++++ .../sink/async/ProtoElementConverter.java | 27 +++ .../bigquery/sink/async/StreamRequest.java | 43 ++++ .../sink/async/StreamRequestSerializer.java | 44 ++++ 7 files changed, 535 insertions(+) create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java new file mode 100644 index 0000000..1506db2 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java @@ -0,0 +1,65 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.vinted.flink.bigquery.model.Rows; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +public class AsyncBigQuerySink extends AsyncSinkBase, StreamRequest> { + private final AsyncClientProvider provider; + private final RateLimitingStrategy strategy; + + public static AsyncBigQuerySinkBuilder builder() { + return new AsyncBigQuerySinkBuilder<>(); + } + + protected AsyncBigQuerySink(AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { + super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes); + this.provider = provider; + this.strategy = rateLimitingStrategy; + } + + @Override + public StatefulSinkWriter, BufferedRequestState> createWriter(InitContext initContext) throws IOException { + return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(getMaxBatchSize()) + .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) + .setMaxInFlightRequests(getMaxInFlightRequests()) + .setMaxBufferedRequests(getMaxBufferedRequests()) + .setMaxTimeInBufferMS(getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) + .setRateLimitingStrategy(strategy) + .build(), + List.of() + ); + } + + @Override + public StatefulSinkWriter, BufferedRequestState> restoreWriter(InitContext initContext, Collection> collection) throws IOException { + return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(getMaxBatchSize()) + .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) + .setMaxInFlightRequests(getMaxInFlightRequests()) + .setMaxBufferedRequests(getMaxBufferedRequests()) + .setMaxTimeInBufferMS(getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) + .setRateLimitingStrategy(strategy) + .build(), + collection + ); + } + + @Override + public SimpleVersionedSerializer> getWriterStateSerializer() { + return new StreamRequestSerializer(); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java new file mode 100644 index 0000000..2b76685 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java @@ -0,0 +1,79 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; + +import java.time.Duration; + +public class AsyncBigQuerySinkBuilder extends AsyncSinkBaseBuilder, StreamRequest, AsyncBigQuerySinkBuilder> { + private static final int DEFAULT_MAX_BATCH_SIZE = 1; + private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; + private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB + + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); + + private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; + private AsyncClientProvider provider; + + private RowValueSerializer serializer; + + private RateLimitingStrategy strategy = null; + + public AsyncBigQuerySinkBuilder setClientProvider(AsyncClientProvider provider) { + this.provider = provider; + return this; + } + + public AsyncBigQuerySinkBuilder setRowSerializer(RowValueSerializer serializer) { + this.serializer = serializer; + return this; + } + + public AsyncBigQuerySinkBuilder setRateLimitStrategy(RateLimitingStrategy strategy) { + this.strategy = strategy; + return this; + } + + @Override + public AsyncSinkBase, StreamRequest> build() { + if (getMaxBatchSize() == null) { + setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); + } + + if (getMaxInFlightRequests() == null) { + setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); + } + + if (getMaxBufferedRequests() == null) { + setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); + } + + if (getMaxBatchSizeInBytes() == null) { + setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); + } + + if (getMaxTimeInBufferMS() == null) { + setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); + } + + if (getMaxRecordSizeInBytes() == null) { + setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); + } + + return new AsyncBigQuerySink<>( + this.provider, + this.strategy, + new ProtoElementConverter<>(this.serializer), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes() + ); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java new file mode 100644 index 0000000..682c284 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -0,0 +1,198 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.*; +import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.sink.AppendException; +import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSinkWriter; +import io.grpc.Status; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamRequest> { + private static final Logger logger = LoggerFactory.getLogger(AsyncSinkWriter.class); + private final AsyncClientProvider clientProvider; + + private final Executor appendExecutor; + + protected transient Map streamMap = new ConcurrentHashMap<>(); + + protected BigQueryWriteClient client; + + public AsyncBigQuerySinkWriter(AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { + super(elementConverter, context, configuration, bufferedRequestStates); + appendExecutor = Executors.newFixedThreadPool(4); + this.clientProvider = clientProvider; + } + + protected final StreamWriter streamWriter(String traceId, String streamName, TableId table) { + return streamMap.computeIfAbsent(streamName, name -> { + logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamName); + // Stream name can't contain index + return this.clientProvider.getWriter(streamName, table); + }); + } + + protected final void recreateStreamWriter(String traceId, String streamName, String writerId, TableId table) { + logger.info("Trace-id {} Closing all writers for {}", traceId, streamName); + try { + flush(true); + streamMap.replaceAll((key, writer) -> { + var newWriter = writer; + if (writer.getWriterId().equals(writerId)) { + try { + writer.close(); + } catch (Exception e) { + logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); + } + newWriter = this.clientProvider.getWriter(streamName, table); + } + return newWriter; + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void submitRequestEntries(List list, Consumer> consumer) { + var traceId = UUID.randomUUID().toString(); + var parent = this; + + CompletableFuture.runAsync(() -> { + var counter = new CountDownLatch(list.size()); + var result = new ConcurrentLinkedDeque(); + list.forEach(request -> { + var writer = streamWriter(traceId, request.getStream(), request.getTable()); + if (writer.isClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, request.getStream()); + recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); + writer = streamWriter(traceId, request.getStream(), request.getTable()); + } + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, request.getStream(), writer.getStreamName(), writer.getWriterId()); + try { + var apiFuture = writer.append(request.getData()); + ApiFutures.addCallback(apiFuture, new AppendCallBack<>(parent, writer.getWriterId(), traceId, request, result, counter), appendExecutor); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + counter.countDown(); + getFatalExceptionCons().accept(new RuntimeException(t)); + } + }); + try { + counter.await(); + var finalResult = new ArrayList<>(result); + consumer.accept(finalResult); + } catch (InterruptedException e) { + getFatalExceptionCons().accept(new RuntimeException(e)); + } + }, appendExecutor); + } + + @Override + protected long getSizeInBytes(StreamRequest StreamRequest) { + return StreamRequest.getData().getSerializedSize(); + } + + + static class AppendCallBack implements ApiFutureCallback { + private final AsyncBigQuerySinkWriter parent; + private final StreamRequest request; + + private final String writerId; + private final String traceId; + + private final ConcurrentLinkedDeque out; + + private final CountDownLatch counter; + + public AppendCallBack(AsyncBigQuerySinkWriter parent, String writerId, String traceId, StreamRequest request, ConcurrentLinkedDeque out, CountDownLatch counter) { + this.parent = parent; + this.writerId = writerId; + this.traceId = traceId; + this.request = request; + this.out = out; + this.counter = counter; + } + + @Override + public void onSuccess(AppendRowsResponse result) { +// this.retries.accept(List.of()); + counter.countDown(); + } + + + @Override + public void onFailure(Throwable t) { + var status = Status.fromThrowable(t); + switch (status.getCode()) { + case INTERNAL: + case CANCELLED: + case FAILED_PRECONDITION: + case DEADLINE_EXCEEDED: + out.add(request); + break; + case ABORTED: + case UNAVAILABLE: { + this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); + out.add(request); + break; + } + case INVALID_ARGUMENT: + if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { +// Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); + logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); + var data = request.getData().getSerializedRowsList(); + var first = data.subList(0, data.size() / 2); + var second = data.subList(data.size() / 2, data.size()); + try { + var retryRequest = List.of( + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build()), + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build()) + ); + out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build())); + out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build())); + } catch (Throwable e) { + this.parent.getFatalExceptionCons().accept(new RuntimeException(e)); + } + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + } + break; + case UNKNOWN: + if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); + out.add(request); + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); +// this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + } + break; + default: + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); +// this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + } + + counter.countDown(); + } + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java new file mode 100644 index 0000000..bcb8943 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java @@ -0,0 +1,79 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.*; +import com.google.protobuf.Descriptors; +import com.vinted.flink.bigquery.model.config.Credentials; +import com.vinted.flink.bigquery.model.config.WriterSettings; +import com.vinted.flink.bigquery.schema.SchemaTransformer; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.Executors; + +public class AsyncClientProvider implements Serializable { + private final Credentials credentials; + private final WriterSettings writerSettings; + + private transient BigQueryWriteClient bigQueryWriteClient; + + public AsyncClientProvider(Credentials credentials, WriterSettings writerSettings) { + this.credentials = credentials; + this.writerSettings = writerSettings; + } + + public BigQueryWriteClient getClient() { + if (this.bigQueryWriteClient == null) { + try { + bigQueryWriteClient = BigQueryWriteClient + .create(this.writerSettings.toBqWriteSettings(credentials)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return bigQueryWriteClient; + } + + public StreamWriter getWriter(String streamName, TableId table) { + try { + var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table)); + var protoSchema = ProtoSchemaConverter.convert(descriptor); + var executorProvider = this.writerSettings.getWriterThreads() > 1 ? + FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) : + BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); + return StreamWriter + .newBuilder(streamName, getClient()) + .setMaxInflightRequests(this.writerSettings.getMaxInflightRequests()) + .setMaxInflightBytes(this.writerSettings.getMaxInflightBytes()) + .setMaxRetryDuration(this.writerSettings.getMaxRetryDuration()) + .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) + .setExecutorProvider(executorProvider) + .setLocation(table.getProject()) + .setWriterSchema(protoSchema) + .build(); + + } catch (IOException | Descriptors.DescriptorValidationException e) { + throw new RuntimeException(e); + } + } + + public WriterSettings writeSettings() { + return this.writerSettings; + } + + TableSchema getTableSchema(TableId table) { + var schema = BigQueryOptions + .newBuilder() + .setProjectId(table.getProject()) + .setCredentials(credentials.getCredentials()) + .build() + .getService() + .getTable(table.getDataset(), table.getTable()) + .getDefinition() + .getSchema(); + + return SchemaTransformer.convertTableSchema(schema); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java new file mode 100644 index 0000000..7f0c148 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java @@ -0,0 +1,27 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.protobuf.ByteString; +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import java.util.stream.Collectors; + +public class ProtoElementConverter implements ElementConverter, StreamRequest> { + private final RowValueSerializer serializer; + + public ProtoElementConverter(RowValueSerializer serializer) { + this.serializer = serializer; + } + + @Override + public StreamRequest apply(Rows rows, SinkWriter.Context context) { + var prows = ProtoRows + .newBuilder() + .addAllSerializedRows(rows.getData().stream().map(r -> ByteString.copyFrom(serializer.serialize(r))).collect(Collectors.toList())) + .build(); + return new StreamRequest(rows.getStream(), rows.getTable(), prows); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java new file mode 100644 index 0000000..dc1dd49 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java @@ -0,0 +1,43 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.ProtoRows; + +import java.io.Serializable; + +public class StreamRequest implements Serializable { + private String stream; + private TableId table; + + private ProtoRows data; + + public StreamRequest(String stream, TableId table, ProtoRows data) { + this.stream = stream; + this.table = table; + this.data = data; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public TableId getTable() { + return table; + } + + public void setTable(TableId table) { + this.table = table; + } + + public ProtoRows getData() { + return data; + } + + public void setData(ProtoRows data) { + this.data = data; + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java new file mode 100644 index 0000000..847d15b --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java @@ -0,0 +1,44 @@ +package com.vinted.flink.bigquery.sink.async; + +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.vinted.flink.bigquery.sink.buffered.BigQueryCommittable; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; + +import java.io.*; + +public class StreamRequestSerializer extends AsyncSinkWriterStateSerializer { + @Override + protected void serializeRequestToStream(StreamRequest request, DataOutputStream dataOutputStream) throws IOException { + dataOutputStream.writeUTF(request.getStream()); + dataOutputStream.writeUTF(request.getTable().getProject()); + dataOutputStream.writeUTF(request.getTable().getDataset()); + dataOutputStream.writeUTF(request.getTable().getTable()); + var bytes = request.getData().toByteArray(); + dataOutputStream.writeInt(bytes.length); + dataOutputStream.write(bytes); + dataOutputStream.flush(); + } + + @Override + protected StreamRequest deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException { + var name = dataInputStream.readUTF(); + var project = dataInputStream.readUTF(); + var dataset = dataInputStream.readUTF(); + var table = dataInputStream.readUTF(); + var dataLength = dataInputStream.readInt(); + var data = dataInputStream.readNBytes(dataLength); + var tableId = TableId.of(project, dataset, table); + try { + return new StreamRequest(name, tableId, ProtoRows.parseFrom(data)); + } catch (Exception e) { + throw new RuntimeException("name=" + name + " project=" + project + " dataset=" + dataset + " table=" + table + " " + e.getMessage()); + } + + } + + @Override + public int getVersion() { + return 1; + } +} From 458ba65f91c95cde8935650aab0e99fc1dc808d8 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 13 Mar 2024 12:17:40 +0200 Subject: [PATCH 2/5] feat: add async writer --- .../sink/async/AsyncBigQuerySinkBuilder.java | 2 +- .../sink/async/AsyncBigQuerySinkWriter.java | 138 ++++++++------ .../sink/async/AsyncClientProvider.java | 27 ++- .../sink/async/AsyncWriterException.java | 14 ++ .../sink/async/ProtoElementConverter.java | 7 +- .../bigquery/sink/async/StreamRequest.java | 14 +- .../sink/async/StreamRequestSerializer.java | 4 +- .../flink/bigquery/AsyncBigQuerySinkTest.java | 135 ++++++++++++++ .../bigquery/BigQueryDefaultSinkTest.java | 66 ------- .../vinted/flink/bigquery/util/FlinkTest.java | 9 +- .../util/MockAsyncProtoClientProvider.java | 171 ++++++++++++++++++ 11 files changed, 460 insertions(+), 127 deletions(-) create mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncWriterException.java create mode 100644 src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java create mode 100644 src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java index 2b76685..4346582 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java @@ -67,7 +67,7 @@ public AsyncSinkBase, StreamRequest> build() { return new AsyncBigQuerySink<>( this.provider, this.strategy, - new ProtoElementConverter<>(this.serializer), + new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), getMaxBatchSize(), getMaxInFlightRequests(), getMaxBufferedRequests(), diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 682c284..4e895a8 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -5,6 +5,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; +import com.vinted.flink.bigquery.client.BigQueryStreamWriter; import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.sink.AppendException; @@ -15,6 +16,8 @@ import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +31,14 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private static final Logger logger = LoggerFactory.getLogger(AsyncSinkWriter.class); private final AsyncClientProvider clientProvider; + private final SinkWriterMetricGroup metricGroup; + + private final transient Map metrics = new HashMap<>(); + private final Executor appendExecutor; + private final Executor waitExecutor = Executors.newSingleThreadExecutor(); + protected transient Map streamMap = new ConcurrentHashMap<>(); protected BigQueryWriteClient client; @@ -37,36 +46,67 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR public AsyncBigQuerySinkWriter(AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { super(elementConverter, context, configuration, bufferedRequestStates); appendExecutor = Executors.newFixedThreadPool(4); + this.metricGroup = context.metricGroup(); this.clientProvider = clientProvider; } + private void registerInflightMetric(StreamWriter writer) { + var group = metricGroup + .addGroup("stream", writer.getStreamName()) + .addGroup("writer_id", writer.getWriterId()); + + group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds); + } + + private void registerAppendMetrics(StreamRequest request) { + metrics.computeIfAbsent(request.getStream(), s -> { + var metric = new BigQueryStreamMetrics(request.getStream()); + var group = metricGroup + .addGroup("table", request.getTable().getTable()) + .addGroup("stream", request.getStream()); + group.gauge("stream_offset", (Gauge) metric::getOffset); + group.gauge("batch_count", metric::getBatchCount); + group.gauge("batch_size_mb", metric::getBatchSizeInMb); + group.gauge("split_batch_count", metric::getSplitBatchCount); + group.gauge("callback_timeouts", metric::getTimeoutCount); + + return metric; + }); + } + protected final StreamWriter streamWriter(String traceId, String streamName, TableId table) { - return streamMap.computeIfAbsent(streamName, name -> { + var writer = streamMap.computeIfAbsent(streamName, name -> { logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamName); - // Stream name can't contain index - return this.clientProvider.getWriter(streamName, table); + + var newWriter = this.clientProvider.getWriter(streamName, table); + registerInflightMetric(newWriter); + return newWriter; }); + + if (writer.isClosed() || writer.isUserClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, streamName); + recreateStreamWriter(traceId, streamName, writer.getWriterId(), table); + return streamMap.get(streamName); + } + + return writer; } protected final void recreateStreamWriter(String traceId, String streamName, String writerId, TableId table) { logger.info("Trace-id {} Closing all writers for {}", traceId, streamName); - try { - flush(true); - streamMap.replaceAll((key, writer) -> { - var newWriter = writer; - if (writer.getWriterId().equals(writerId)) { - try { - writer.close(); - } catch (Exception e) { - logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); - } - newWriter = this.clientProvider.getWriter(streamName, table); + streamMap.replaceAll((key, writer) -> { + var newWriter = writer; + if (writer.getWriterId().equals(writerId)) { + try { + writer.close(); + } catch (Exception e) { + logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); } - return newWriter; - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + newWriter = this.clientProvider.getWriter(streamName, table); + registerInflightMetric(newWriter); + } + return newWriter; + }); } @Override @@ -74,16 +114,12 @@ protected void submitRequestEntries(List list, Consumer { + CompletableFuture.runAsync(() -> { var counter = new CountDownLatch(list.size()); var result = new ConcurrentLinkedDeque(); list.forEach(request -> { + registerAppendMetrics(request); var writer = streamWriter(traceId, request.getStream(), request.getTable()); - if (writer.isClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, request.getStream()); - recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); - writer = streamWriter(traceId, request.getStream(), request.getTable()); - } logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, request.getStream(), writer.getStreamName(), writer.getWriterId()); try { var apiFuture = writer.append(request.getData()); @@ -91,7 +127,7 @@ protected void submitRequestEntries(List list, Consumer list, Consumer(result); consumer.accept(finalResult); } catch (InterruptedException e) { - getFatalExceptionCons().accept(new RuntimeException(e)); + getFatalExceptionCons().accept(new AsyncWriterException(traceId, Status.Code.INTERNAL, e)); } - }, appendExecutor); + }, waitExecutor); } @Override @@ -132,7 +168,6 @@ public AppendCallBack(AsyncBigQuerySinkWriter parent, String writerId, String @Override public void onSuccess(AppendRowsResponse result) { -// this.retries.accept(List.of()); counter.countDown(); } @@ -141,58 +176,59 @@ public void onSuccess(AppendRowsResponse result) { public void onFailure(Throwable t) { var status = Status.fromThrowable(t); switch (status.getCode()) { - case INTERNAL: - case CANCELLED: - case FAILED_PRECONDITION: - case DEADLINE_EXCEEDED: - out.add(request); - break; - case ABORTED: case UNAVAILABLE: { this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); - out.add(request); + retry(t, traceId, request); break; } case INVALID_ARGUMENT: if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { -// Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); + Optional.ofNullable(this.parent.metrics.get(request.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); var data = request.getData().getSerializedRowsList(); var first = data.subList(0, data.size() / 2); var second = data.subList(data.size() / 2, data.size()); try { - var retryRequest = List.of( - new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build()), - new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build()) - ); - out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build())); - out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build())); + out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build(), request.getRetries() - 1)); + out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build(), request.getRetries() - 1)); } catch (Throwable e) { - this.parent.getFatalExceptionCons().accept(new RuntimeException(e)); + this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), e)); } } else { logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); } break; case UNKNOWN: if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + Optional.ofNullable(this.parent.metrics.get(request.getStream())) + .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); - out.add(request); + retry(t, traceId, request); } else { logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); -// this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); } break; default: logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); -// this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - this.parent.getFatalExceptionCons().accept(new RuntimeException("failed")); + this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); } counter.countDown(); } + + private void retry(Throwable t, String traceId, StreamRequest request) { + var status = Status.fromThrowable(t); + request.setRetries(request.getRetries() - 1); + if (request.getRetries() > 0) { + logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), request.getRetries()); + out.add(request); + } else { + logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); + this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + } + } } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java index bcb8943..e773bca 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncClientProvider.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.sink.async; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; @@ -8,6 +9,7 @@ import com.vinted.flink.bigquery.model.config.Credentials; import com.vinted.flink.bigquery.model.config.WriterSettings; import com.vinted.flink.bigquery.schema.SchemaTransformer; +import org.threeten.bp.Duration; import java.io.IOException; import java.io.Serializable; @@ -43,17 +45,36 @@ public StreamWriter getWriter(String streamName, TableId table) { var executorProvider = this.writerSettings.getWriterThreads() > 1 ? FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) : BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); - return StreamWriter + + + + var streamWriterBuilder = StreamWriter .newBuilder(streamName, getClient()) .setMaxInflightRequests(this.writerSettings.getMaxInflightRequests()) .setMaxInflightBytes(this.writerSettings.getMaxInflightBytes()) .setMaxRetryDuration(this.writerSettings.getMaxRetryDuration()) .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) + .setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider()) .setExecutorProvider(executorProvider) .setLocation(table.getProject()) - .setWriterSchema(protoSchema) - .build(); + .setWriterSchema(protoSchema); + + if (writerSettings.getRetrySettings() != null) { + var settings = writerSettings.getRetrySettings(); + var retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis())) + .setRetryDelayMultiplier(settings.getRetryDelayMultiplier()) + .setMaxAttempts(settings.getMaxRetryAttempts()) + .setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis())) + .build(); + + streamWriterBuilder.setRetrySettings(retrySettings); + } + + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); + return streamWriterBuilder.build(); } catch (IOException | Descriptors.DescriptorValidationException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncWriterException.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncWriterException.java new file mode 100644 index 0000000..8977c1a --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncWriterException.java @@ -0,0 +1,14 @@ +package com.vinted.flink.bigquery.sink.async; + +import io.grpc.Status; + +public class AsyncWriterException extends RuntimeException { + + public AsyncWriterException(String traceId, Status.Code code, Throwable cause) { + super( + String.format("Trace-id %s Received error %s with status %s", traceId, cause.getMessage(), + code.toString()), + cause + ); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java index 7f0c148..1efdc18 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/ProtoElementConverter.java @@ -12,8 +12,11 @@ public class ProtoElementConverter implements ElementConverter, StreamRequest> { private final RowValueSerializer serializer; - public ProtoElementConverter(RowValueSerializer serializer) { + private final int retries; + + public ProtoElementConverter(RowValueSerializer serializer, int retries) { this.serializer = serializer; + this.retries = retries; } @Override @@ -22,6 +25,6 @@ public StreamRequest apply(Rows rows, SinkWriter.Context context) { .newBuilder() .addAllSerializedRows(rows.getData().stream().map(r -> ByteString.copyFrom(serializer.serialize(r))).collect(Collectors.toList())) .build(); - return new StreamRequest(rows.getStream(), rows.getTable(), prows); + return new StreamRequest(rows.getStream(), rows.getTable(), prows, this.retries); } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java index dc1dd49..8e80fc1 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequest.java @@ -8,13 +8,15 @@ public class StreamRequest implements Serializable { private String stream; private TableId table; - private ProtoRows data; - public StreamRequest(String stream, TableId table, ProtoRows data) { + private int retries; + + public StreamRequest(String stream, TableId table, ProtoRows data, int retries) { this.stream = stream; this.table = table; this.data = data; + this.retries = retries; } public String getStream() { @@ -40,4 +42,12 @@ public ProtoRows getData() { public void setData(ProtoRows data) { this.data = data; } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java index 847d15b..2821cea 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/StreamRequestSerializer.java @@ -14,6 +14,7 @@ protected void serializeRequestToStream(StreamRequest request, DataOutputStream dataOutputStream.writeUTF(request.getTable().getProject()); dataOutputStream.writeUTF(request.getTable().getDataset()); dataOutputStream.writeUTF(request.getTable().getTable()); + dataOutputStream.writeInt(request.getRetries()); var bytes = request.getData().toByteArray(); dataOutputStream.writeInt(bytes.length); dataOutputStream.write(bytes); @@ -26,11 +27,12 @@ protected StreamRequest deserializeRequestFromStream(long requestSize, DataInput var project = dataInputStream.readUTF(); var dataset = dataInputStream.readUTF(); var table = dataInputStream.readUTF(); + var retries = dataInputStream.readInt(); var dataLength = dataInputStream.readInt(); var data = dataInputStream.readNBytes(dataLength); var tableId = TableId.of(project, dataset, table); try { - return new StreamRequest(name, tableId, ProtoRows.parseFrom(data)); + return new StreamRequest(name, tableId, ProtoRows.parseFrom(data), retries); } catch (Exception e) { throw new RuntimeException("name=" + name + " project=" + project + " dataset=" + dataset + " table=" + table + " " + e.getMessage()); } diff --git a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java new file mode 100644 index 0000000..e6a694d --- /dev/null +++ b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java @@ -0,0 +1,135 @@ +package com.vinted.flink.bigquery; + +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import com.vinted.flink.bigquery.sink.async.AsyncBigQuerySink; +import com.vinted.flink.bigquery.util.FlinkTest; +import com.vinted.flink.bigquery.util.MockAsyncProtoClientProvider; +import com.vinted.flink.bigquery.util.MockJsonClientProvider; +import io.grpc.Status; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.*; + + +@ExtendWith(FlinkTest.class) +public class AsyncBigQuerySinkTest { + TableId testTable = TableId.of("test-project", "test-dataset", "test-table"); + String stream = "projects/test/datasets/test/tables/test/streams/stream1"; + + @Test + public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenSuccessfulAppend(); + + runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + + verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); + } + + @Test + public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { + var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5)); + mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause)); + mockClientProvider.givenRetryCount(2); + + + assertThatThrownBy(() -> { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + + verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); + assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); + } + + @Test + public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenStreamIsFinalized(stream); + + assertThatThrownBy(() -> { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + + verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); + } + + @Test + public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE); + mockClientProvider.givenRetryCount(2); + + assertThatThrownBy(() -> { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); + assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); + } + + @Test + public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenAppendingTooLargeBatch(); + + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(6) + )))); + + + verify(mockClientProvider.getMockProtoWriter(), times(3)).append(any()); + } + + private Rows givenRow(int count) { + var data = new ArrayList(count); + IntStream.rangeClosed(1, count) + .forEach(i -> data.add("{\"value\": " + i + "}")); + + return new Rows<>(data, -1, stream, testTable); + } + + private Function>> pipeline(List> data) { + return env -> env.fromCollection(data); + } + + private Function>> withBigQuerySink(MockAsyncProtoClientProvider mockClientProvider, Function>> pipeline) { + var sink = AsyncBigQuerySink.builder() + .setClientProvider(mockClientProvider) + .setMaxBatchSize(1) + .setMaxBufferedRequests(2) + .setRowSerializer((RowValueSerializer) String::getBytes) + .build(); + + return pipeline.andThen(s -> s.sinkTo(sink)); + } +} diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index ed1ee4d..94872ed 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -43,23 +43,6 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } - @Test - @Disabled("Retry logic causes locking") - public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL); - - assertThatThrownBy(() -> { - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRow(1) - )))); - }).isInstanceOf(JobExecutionException.class); - - - verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); - } - @Test @Disabled("Retry logic causes locking") public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { @@ -81,39 +64,6 @@ public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTim assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); } - @Test - @Disabled("Retry causes out of order exception in committer and later in writer") - public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenTimeoutForAppend(); - - assertThatThrownBy(() -> { - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRow(1) - )))); - }).isInstanceOf(JobExecutionException.class); - - - verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any()); - } - - @Test - public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS); - - assertThatThrownBy(() -> { - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRow(1) - )))); - }).isInstanceOf(JobExecutionException.class); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); - } - @Test public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT); @@ -130,22 +80,6 @@ public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTe verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } - @Test - public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenStreamIsFinalized(stream); - - assertThatThrownBy(() -> { - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRow(1) - )))); - }).isInstanceOf(JobExecutionException.class); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); - } - @Test @Disabled("Retry logic causes locking") public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { diff --git a/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java b/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java index 132d62f..526a898 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java @@ -65,6 +65,8 @@ public void afterEach(ExtensionContext context) throws Exception { flinkCluster.after(); MockClock.reset(); MockJsonClientProvider.reset(); + MockProtoClientProvider.reset(); + MockAsyncProtoClientProvider.reset(); ProcessFunctionWithError.clear(); TestSink.clear(); } @@ -75,6 +77,7 @@ public void beforeEach(ExtensionContext context) throws Exception { MockClock.reset(); MockJsonClientProvider.reset(); MockProtoClientProvider.reset(); + MockAsyncProtoClientProvider.reset(); ProcessFunctionWithError.clear(); TestSink.clear(); } @@ -91,13 +94,17 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte return new MockClock(); } if (MockJsonClientProvider.class.equals(type)) { - return new MockJsonClientProvider(); + return new MockJsonClientProvider<>(); } if (MockProtoClientProvider.class.equals(type)) { return new MockProtoClientProvider(); } + if (MockAsyncProtoClientProvider.class.equals(type)) { + return new MockAsyncProtoClientProvider(); + } + if (PipelineRunner.class.equals(type)) { return new PipelineRunner(); } diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java new file mode 100644 index 0000000..2619bd6 --- /dev/null +++ b/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java @@ -0,0 +1,171 @@ +package com.vinted.flink.bigquery.util; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.*; +import com.google.protobuf.Descriptors; +import com.vinted.flink.bigquery.model.config.Credentials; +import com.vinted.flink.bigquery.model.config.WriterSettings; +import com.vinted.flink.bigquery.sink.async.AsyncClientProvider; +import io.grpc.Status; +import io.grpc.StatusException; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockAsyncProtoClientProvider extends AsyncClientProvider { + private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class); + private static StreamWriter protoWriter = Mockito.mock(StreamWriter.class); + + private static AtomicInteger numOfCreatedWriters = new AtomicInteger(0); + private int retryCount = 5; + + public MockAsyncProtoClientProvider() { + super(null, null); + } + + @Override + public StreamWriter getWriter(String streamName, TableId table) { + numOfCreatedWriters.incrementAndGet(); + Mockito.when(MockAsyncProtoClientProvider.protoWriter.getWriterId()).thenReturn(UUID.randomUUID().toString()); + return MockAsyncProtoClientProvider.protoWriter; + } + + @Override + public BigQueryWriteClient getClient() { + return MockAsyncProtoClientProvider.mockClient; + } + + @Override + public WriterSettings writeSettings() { + return WriterSettings.newBuilder().withRetryCount(retryCount).build(); + } + + public int getNumOfCreatedWriter() { + return numOfCreatedWriters.get(); + } + + public void givenRetryCount(int count) { + this.retryCount = count; + } + + public void givenStreamDoesNotExist(String streamName) { + Mockito.doThrow(new RuntimeException(new StatusException(Status.NOT_FOUND))) + .when(MockAsyncProtoClientProvider.mockClient).getWriteStream(streamName); + } + + public void givenStreamIsFinalized(String streamName) throws Descriptors.DescriptorValidationException, IOException { + var exception = createFinalizedStreamException(); + var ex = new RuntimeException(exception); + Mockito.when(MockAsyncProtoClientProvider.mockClient.getWriteStream(streamName)) + .thenThrow(ex); + + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any(), Mockito.anyLong())) + .thenReturn(createAppendRowsResponseError(exception)); + } + + public void givenGettingStreamFails(String streamName) { + Mockito.doThrow(new RuntimeException(new StatusException(Status.INTERNAL))) + .when(MockAsyncProtoClientProvider.mockClient).getWriteStream(streamName); + } + + public void givenCreateStream(String... streamNames) { + var list = new ArrayList(); + for (String i : streamNames) { + list.add(i); + } + + var mock = Mockito.when(MockAsyncProtoClientProvider.mockClient.createWriteStream(Mockito.nullable(CreateWriteStreamRequest.class))) + .thenReturn(WriteStream + .newBuilder() + .setName(list.get(0)) + .buildPartial() + ); + list.subList(1, list.size()).forEach(name -> { + mock.thenReturn(WriteStream + .newBuilder() + .setName(name) + .buildPartial()); + }); + } + + public void givenSuccessfulAppend() throws Descriptors.DescriptorValidationException, IOException { + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(createAppendRowsResponse()); + } + + public void givenFailingAppendWithStatus(Status status) throws Descriptors.DescriptorValidationException, IOException { + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(createAppendRowsResponseError(new StatusException(status))); + } + + public void givenTimeoutForAppend() throws Descriptors.DescriptorValidationException, IOException { + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(createTimeoutAppendRowsResponse()); + } + + + public void givenAppendingExistingOffset(int expected, int actual, String streamName) throws Descriptors.DescriptorValidationException, IOException { + var offsetMock = createOffsetAlreadyExistsException(expected, actual, streamName); + + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(createAppendRowsResponseError(offsetMock)) + .thenReturn(createAppendRowsResponse()); + } + + public void givenAppendingTooLargeBatch() throws Descriptors.DescriptorValidationException, IOException { + var ex = new StatusException(Status.INVALID_ARGUMENT + .augmentDescription("MessageSize is too large. Max allow: 10000000 Actual: 12040940 status: INVALID_ARGUMENT stream: project")); + + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(createAppendRowsResponseError(ex)) + .thenReturn(createAppendRowsResponse()); + } + + public static void reset() { + Mockito.reset(MockAsyncProtoClientProvider.mockClient); + Mockito.reset(MockAsyncProtoClientProvider.protoWriter); + MockAsyncProtoClientProvider.numOfCreatedWriters.set(0); + } + + private static Exceptions.StreamFinalizedException createFinalizedStreamException() { + var exception = Mockito.mock(Exceptions.StreamFinalizedException.class); + Mockito.when(exception.getStatus()).thenReturn(Status.INVALID_ARGUMENT); + Mockito.when(exception.getCause()).thenReturn(new RuntimeException()); + return exception; + } + + private static Exceptions.OffsetAlreadyExists createOffsetAlreadyExistsException(long expected, long actual, String streamName) { + var offsetMock = Mockito.mock(Exceptions.OffsetAlreadyExists.class); + Mockito.when(offsetMock.getStatus()).thenReturn(Status.ALREADY_EXISTS); + Mockito.when(offsetMock.getStreamName()).thenReturn(streamName); + Mockito.when(offsetMock.getExpectedOffset()).thenReturn(expected); + Mockito.when(offsetMock.getActualOffset()).thenReturn(actual); + Mockito.when(offsetMock.getCause()).thenReturn(new RuntimeException()); + return offsetMock; + } + + private static SettableApiFuture createAppendRowsResponse() { + SettableApiFuture result = SettableApiFuture.create(); + result.set(AppendRowsResponse.newBuilder().buildPartial()); + return result; + } + + private static SettableApiFuture createTimeoutAppendRowsResponse() { + SettableApiFuture result = SettableApiFuture.create(); + return result; + } + + private static SettableApiFuture createAppendRowsResponseError(Throwable exception) { + SettableApiFuture result = SettableApiFuture.create(); + result.setException(exception); + return result; + } + + public StreamWriter getMockProtoWriter() { + return MockAsyncProtoClientProvider.protoWriter; + } +} From 09c63be9f9622c95fc8f0f4efd48942b84f88667 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 13 Mar 2024 15:40:44 +0200 Subject: [PATCH 3/5] refactor: remove counter --- .../sink/async/AsyncBigQuerySinkWriter.java | 181 +++++++----------- 1 file changed, 74 insertions(+), 107 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 4e895a8..1d1b6ac 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -26,6 +26,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamRequest> { private static final Logger logger = LoggerFactory.getLogger(AsyncSinkWriter.class); @@ -112,32 +113,83 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str @Override protected void submitRequestEntries(List list, Consumer> consumer) { var traceId = UUID.randomUUID().toString(); - var parent = this; - - CompletableFuture.runAsync(() -> { - var counter = new CountDownLatch(list.size()); - var result = new ConcurrentLinkedDeque(); - list.forEach(request -> { - registerAppendMetrics(request); - var writer = streamWriter(traceId, request.getStream(), request.getTable()); - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, request.getStream(), writer.getStreamName(), writer.getWriterId()); + var requests = list.stream().map(request -> { + registerAppendMetrics(request); + var writer = streamWriter(traceId, request.getStream(), request.getTable()); + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, request.getStream(), writer.getStreamName(), writer.getWriterId()); + return CompletableFuture.>supplyAsync(() ->{ try { - var apiFuture = writer.append(request.getData()); - ApiFutures.addCallback(apiFuture, new AppendCallBack<>(parent, writer.getWriterId(), traceId, request, result, counter), appendExecutor); + writer.append(request.getData()).get(); + return List.of(); } catch (Throwable t) { logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); - counter.countDown(); - getFatalExceptionCons().accept(new AsyncWriterException(traceId, Status.Code.INTERNAL, t)); + var status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNAVAILABLE: { + this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); + return retry(t, traceId, request); + } + case INVALID_ARGUMENT: + if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { + Optional.ofNullable(this.metrics.get(request.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); + logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); + var data = request.getData().getSerializedRowsList(); + var first = data.subList(0, data.size() / 2); + var second = data.subList(data.size() / 2, data.size()); + try { + return List.of( + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build(), request.getRetries() - 1), + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build(), request.getRetries() - 1) + ); + } catch (Throwable e) { + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), e)); + return List.of(); + } + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + return List.of(); + } + case UNKNOWN: + if (status.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + Optional.ofNullable(this.metrics.get(request.getStream())) + .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); + this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); + return retry(t, traceId, request); + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + return List.of(); + } + default: + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + return List.of(); + } + } - }); - try { - counter.await(); - var finalResult = new ArrayList<>(result); - consumer.accept(finalResult); - } catch (InterruptedException e) { - getFatalExceptionCons().accept(new AsyncWriterException(traceId, Status.Code.INTERNAL, e)); - } - }, waitExecutor); + }, appendExecutor); + }).collect(Collectors.toList()); + + CompletableFuture + .allOf(requests.toArray(new CompletableFuture[0])) + .thenApplyAsync(v -> requests.stream().flatMap(s -> s.join().stream()).collect(Collectors.toList()), appendExecutor) + .thenAcceptAsync(consumer, appendExecutor); + + } + + private List retry(Throwable t, String traceId, StreamRequest request) { + var status = Status.fromThrowable(t); + request.setRetries(request.getRetries() - 1); + if (request.getRetries() > 0) { + logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), request.getRetries()); + return List.of(request); + } else { + logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + return List.of(); + } } @Override @@ -146,89 +198,4 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { } - static class AppendCallBack implements ApiFutureCallback { - private final AsyncBigQuerySinkWriter parent; - private final StreamRequest request; - - private final String writerId; - private final String traceId; - - private final ConcurrentLinkedDeque out; - - private final CountDownLatch counter; - - public AppendCallBack(AsyncBigQuerySinkWriter parent, String writerId, String traceId, StreamRequest request, ConcurrentLinkedDeque out, CountDownLatch counter) { - this.parent = parent; - this.writerId = writerId; - this.traceId = traceId; - this.request = request; - this.out = out; - this.counter = counter; - } - - @Override - public void onSuccess(AppendRowsResponse result) { - counter.countDown(); - } - - - @Override - public void onFailure(Throwable t) { - var status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNAVAILABLE: { - this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); - retry(t, traceId, request); - break; - } - case INVALID_ARGUMENT: - if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { - Optional.ofNullable(this.parent.metrics.get(request.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); - logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); - var data = request.getData().getSerializedRowsList(); - var first = data.subList(0, data.size() / 2); - var second = data.subList(data.size() / 2, data.size()); - try { - out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build(), request.getRetries() - 1)); - out.add(new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build(), request.getRetries() - 1)); - } catch (Throwable e) { - this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), e)); - } - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - } - break; - case UNKNOWN: - if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { - logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); - Optional.ofNullable(this.parent.metrics.get(request.getStream())) - .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); - this.parent.recreateStreamWriter(traceId, request.getStream(), writerId, request.getTable()); - retry(t, traceId, request); - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - } - break; - default: - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - } - - counter.countDown(); - } - - private void retry(Throwable t, String traceId, StreamRequest request) { - var status = Status.fromThrowable(t); - request.setRetries(request.getRetries() - 1); - if (request.getRetries() > 0) { - logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), request.getRetries()); - out.add(request); - } else { - logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); - this.parent.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - } - } - } } From 01117f0a05d4aa70a42ae70e1ba489845fca1a2f Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 13 Mar 2024 17:04:30 +0200 Subject: [PATCH 4/5] feat: add missing metrics --- .../flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 1d1b6ac..b6649a5 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -119,6 +119,10 @@ protected void submitRequestEntries(List list, Consumer>supplyAsync(() ->{ try { + Optional.ofNullable(metrics.get(request.getStream())).ifPresent(s -> { + s.updateSize(request.getData().getSerializedSize()); + s.setBatchCount(request.getData().getSerializedRowsCount()); + }); writer.append(request.getData()).get(); return List.of(); } catch (Throwable t) { From 638fb2875025123b507eb0f3805436efe3974d9e Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 14 Mar 2024 09:34:43 +0200 Subject: [PATCH 5/5] refactor: add executor provider --- .../metric/AsyncBigQueryStreamMetrics.java | 55 ++++++ .../sink/async/AsyncBigQuerySink.java | 92 ++++++++- .../sink/async/AsyncBigQuerySinkBuilder.java | 79 -------- .../sink/async/AsyncBigQuerySinkWriter.java | 179 ++++++++++-------- .../flink/bigquery/AsyncBigQuerySinkTest.java | 2 +- .../vinted/flink/bigquery/util/FlinkTest.java | 26 +-- 6 files changed, 257 insertions(+), 176 deletions(-) create mode 100644 src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java delete mode 100644 src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java diff --git a/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java b/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java new file mode 100644 index 0000000..f77a856 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java @@ -0,0 +1,55 @@ +package com.vinted.flink.bigquery.metric; + +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncBigQueryStreamMetrics { + + private long batchCount = 0; + private double batchSizeInMb = 0.0; + private long splitBatchCount = 0; + private int timeoutCount = 0; + private final AtomicInteger inflightRequests = new AtomicInteger(0); + + public void incSplitCount() { + splitBatchCount += 1; + } + public void updateSize(long sizeInBytes) { + batchSizeInMb = sizeInBytes / 1000000.0; + } + + public long getBatchCount() { + return batchCount; + } + + public void setBatchCount(long batchCount) { + this.batchCount = batchCount; + } + + public double getBatchSizeInMb() { + return batchSizeInMb; + } + + public long getSplitBatchCount() { + return splitBatchCount; + } + + public int getTimeoutCount() { + return timeoutCount; + } + + public void incrementTimeoutCount() { + this.timeoutCount++; + } + + public int getInflightRequests() { + return inflightRequests.get(); + } + + public void incrementInflightRequests() { + this.inflightRequests.incrementAndGet(); + } + + public void decrementInflightRequests() { + this.inflightRequests.decrementAndGet(); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java index 1506db2..2d1413b 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java @@ -1,7 +1,10 @@ package com.vinted.flink.bigquery.sink.async; import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import com.vinted.flink.bigquery.sink.ExecutorProvider; import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; @@ -9,26 +12,31 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; +import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.concurrent.Executors; public class AsyncBigQuerySink extends AsyncSinkBase, StreamRequest> { private final AsyncClientProvider provider; private final RateLimitingStrategy strategy; + private final ExecutorProvider executorProvider; + public static AsyncBigQuerySinkBuilder builder() { return new AsyncBigQuerySinkBuilder<>(); } - protected AsyncBigQuerySink(AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { + protected AsyncBigQuerySink(ExecutorProvider executorProvider, AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes); + this.executorProvider = executorProvider; this.provider = provider; this.strategy = rateLimitingStrategy; } @Override public StatefulSinkWriter, BufferedRequestState> createWriter(InitContext initContext) throws IOException { - return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(getMaxBatchSize()) .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) @@ -44,7 +52,7 @@ public StatefulSinkWriter, BufferedRequestState> createWr @Override public StatefulSinkWriter, BufferedRequestState> restoreWriter(InitContext initContext, Collection> collection) throws IOException { - return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(getMaxBatchSize()) .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) @@ -62,4 +70,82 @@ public StatefulSinkWriter, BufferedRequestState> restoreW public SimpleVersionedSerializer> getWriterStateSerializer() { return new StreamRequestSerializer(); } + + static public class AsyncBigQuerySinkBuilder extends AsyncSinkBaseBuilder, StreamRequest, AsyncBigQuerySinkBuilder> { + private static final int DEFAULT_MAX_BATCH_SIZE = 1; + private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; + private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB + + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); + + private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; + private AsyncClientProvider provider; + + private RowValueSerializer serializer; + + private RateLimitingStrategy strategy = null; + + private ExecutorProvider executorProvider = () -> Executors.newFixedThreadPool(4); + + public AsyncBigQuerySinkBuilder setClientProvider(AsyncClientProvider provider) { + this.provider = provider; + return this; + } + + public AsyncBigQuerySinkBuilder setRowSerializer(RowValueSerializer serializer) { + this.serializer = serializer; + return this; + } + + public AsyncBigQuerySinkBuilder setRateLimitStrategy(RateLimitingStrategy strategy) { + this.strategy = strategy; + return this; + } + + public AsyncBigQuerySinkBuilder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = executorProvider; + return this; + } + + @Override + public AsyncSinkBase, StreamRequest> build() { + if (getMaxBatchSize() == null) { + setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); + } + + if (getMaxInFlightRequests() == null) { + setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); + } + + if (getMaxBufferedRequests() == null) { + setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); + } + + if (getMaxBatchSizeInBytes() == null) { + setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); + } + + if (getMaxTimeInBufferMS() == null) { + setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); + } + + if (getMaxRecordSizeInBytes() == null) { + setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); + } + + return new AsyncBigQuerySink<>( + this.executorProvider, + this.provider, + this.strategy, + new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes() + ); + } + } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java deleted file mode 100644 index 4346582..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.vinted.flink.bigquery.sink.async; - -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import org.apache.flink.connector.base.sink.AsyncSinkBase; -import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; -import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; - -import java.time.Duration; - -public class AsyncBigQuerySinkBuilder extends AsyncSinkBaseBuilder, StreamRequest, AsyncBigQuerySinkBuilder> { - private static final int DEFAULT_MAX_BATCH_SIZE = 1; - private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; - private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; - private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB - - private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); - - private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; - private AsyncClientProvider provider; - - private RowValueSerializer serializer; - - private RateLimitingStrategy strategy = null; - - public AsyncBigQuerySinkBuilder setClientProvider(AsyncClientProvider provider) { - this.provider = provider; - return this; - } - - public AsyncBigQuerySinkBuilder setRowSerializer(RowValueSerializer serializer) { - this.serializer = serializer; - return this; - } - - public AsyncBigQuerySinkBuilder setRateLimitStrategy(RateLimitingStrategy strategy) { - this.strategy = strategy; - return this; - } - - @Override - public AsyncSinkBase, StreamRequest> build() { - if (getMaxBatchSize() == null) { - setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); - } - - if (getMaxInFlightRequests() == null) { - setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); - } - - if (getMaxBufferedRequests() == null) { - setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); - } - - if (getMaxBatchSizeInBytes() == null) { - setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); - } - - if (getMaxTimeInBufferMS() == null) { - setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); - } - - if (getMaxRecordSizeInBytes() == null) { - setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); - } - - return new AsyncBigQuerySink<>( - this.provider, - this.strategy, - new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), - getMaxBatchSize(), - getMaxInFlightRequests(), - getMaxBufferedRequests(), - getMaxBatchSizeInBytes(), - getMaxTimeInBufferMS(), - getMaxRecordSizeInBytes() - ); - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index b6649a5..fd07caf 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -1,30 +1,27 @@ package com.vinted.flink.bigquery.sink.async; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.*; -import com.vinted.flink.bigquery.client.BigQueryStreamWriter; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.vinted.flink.bigquery.metric.AsyncBigQueryStreamMetrics; import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.sink.AppendException; -import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSinkWriter; +import com.vinted.flink.bigquery.sink.ExecutorProvider; import io.grpc.Status; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -34,19 +31,15 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private final SinkWriterMetricGroup metricGroup; - private final transient Map metrics = new HashMap<>(); + private final transient Map metrics = new HashMap<>(); private final Executor appendExecutor; - private final Executor waitExecutor = Executors.newSingleThreadExecutor(); - protected transient Map streamMap = new ConcurrentHashMap<>(); - protected BigQueryWriteClient client; - - public AsyncBigQuerySinkWriter(AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { + public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { super(elementConverter, context, configuration, bufferedRequestStates); - appendExecutor = Executors.newFixedThreadPool(4); + appendExecutor = executorProvider.create(); this.metricGroup = context.metricGroup(); this.clientProvider = clientProvider; } @@ -61,15 +54,15 @@ private void registerInflightMetric(StreamWriter writer) { private void registerAppendMetrics(StreamRequest request) { metrics.computeIfAbsent(request.getStream(), s -> { - var metric = new BigQueryStreamMetrics(request.getStream()); + var metric = new AsyncBigQueryStreamMetrics(); var group = metricGroup .addGroup("table", request.getTable().getTable()) .addGroup("stream", request.getStream()); - group.gauge("stream_offset", (Gauge) metric::getOffset); group.gauge("batch_count", metric::getBatchCount); group.gauge("batch_size_mb", metric::getBatchSizeInMb); group.gauge("split_batch_count", metric::getSplitBatchCount); group.gauge("callback_timeouts", metric::getTimeoutCount); + group.gauge("inflight_requests", metric::getInflightRequests); return metric; }); @@ -117,69 +110,87 @@ protected void submitRequestEntries(List list, Consumer>supplyAsync(() ->{ - try { - Optional.ofNullable(metrics.get(request.getStream())).ifPresent(s -> { - s.updateSize(request.getData().getSerializedSize()); - s.setBatchCount(request.getData().getSerializedRowsCount()); - }); - writer.append(request.getData()).get(); - return List.of(); - } catch (Throwable t) { - logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); - var status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNAVAILABLE: { - this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); - return retry(t, traceId, request); - } - case INVALID_ARGUMENT: - if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { - Optional.ofNullable(this.metrics.get(request.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); - logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); - var data = request.getData().getSerializedRowsList(); - var first = data.subList(0, data.size() / 2); - var second = data.subList(data.size() / 2, data.size()); - try { - return List.of( - new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build(), request.getRetries() - 1), - new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build(), request.getRetries() - 1) - ); - } catch (Throwable e) { - this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), e)); - return List.of(); + return CompletableFuture.>supplyAsync(() -> { + try { + Optional.ofNullable(metrics.get(request.getStream())).ifPresent(s -> { + s.updateSize(request.getData().getSerializedSize()); + s.setBatchCount(request.getData().getSerializedRowsCount()); + s.incrementInflightRequests(); + }); + writer.append(request.getData()).get(); + return List.of(); + } catch (Throwable t) { + Optional.ofNullable(metrics.get(request.getStream())).ifPresent(s -> { + s.updateSize(request.getData().getSerializedSize()); + s.setBatchCount(request.getData().getSerializedRowsCount()); + s.decrementInflightRequests(); + }); + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + var status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNAVAILABLE: { + this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); + return retry(t, traceId, request); } - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - return List.of(); + case INVALID_ARGUMENT: + if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { + Optional.ofNullable(this.metrics.get(request.getStream())).ifPresent(AsyncBigQueryStreamMetrics::incSplitCount); + logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); + var data = request.getData().getSerializedRowsList(); + var first = data.subList(0, data.size() / 2); + var second = data.subList(data.size() / 2, data.size()); + try { + return List.of( + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(first).build(), request.getRetries() - 1), + new StreamRequest(request.getStream(), request.getTable(), ProtoRows.newBuilder().addAllSerializedRows(second).build(), request.getRetries() - 1) + ); + } catch (Throwable e) { + throw new AsyncWriterException(traceId, status.getCode(), e); + } + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + throw new AsyncWriterException(traceId, status.getCode(), t); + } + case UNKNOWN: + if (status.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + Optional.ofNullable(this.metrics.get(request.getStream())) + .ifPresent(AsyncBigQueryStreamMetrics::incrementTimeoutCount); + this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); + return retry(t, traceId, request); + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + throw new AsyncWriterException(traceId, status.getCode(), t); + } + default: + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + throw new AsyncWriterException(traceId, status.getCode(), t); } - case UNKNOWN: - if (status.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { - logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); - Optional.ofNullable(this.metrics.get(request.getStream())) - .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); - this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable()); - return retry(t, traceId, request); - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - return List.of(); - } - default: - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); - return List.of(); - } - } - }, appendExecutor); + } + }, appendExecutor) + .exceptionally(t -> { + var status = Status.fromThrowable(t); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), t)); + return List.of(request); + }) + .whenCompleteAsync((v, e) -> { + Optional.ofNullable(this.metrics.get(request.getStream())) + .ifPresent(AsyncBigQueryStreamMetrics::decrementInflightRequests); + }, appendExecutor); }).collect(Collectors.toList()); CompletableFuture .allOf(requests.toArray(new CompletableFuture[0])) .thenApplyAsync(v -> requests.stream().flatMap(s -> s.join().stream()).collect(Collectors.toList()), appendExecutor) - .thenAcceptAsync(consumer, appendExecutor); + .whenComplete((result, e) -> { + if (e != null) { + var status = Status.fromThrowable(e); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, status.getCode(), e)); + } else { + consumer.accept(result); + } + }); } @@ -201,5 +212,23 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { return StreamRequest.getData().getSerializedSize(); } + @Override + public void close() { + logger.info("Closing BigQuery write stream"); + try { + flush(true); + streamMap.values().forEach(stream -> { + try { + stream.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } + } diff --git a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java index e6a694d..ffc84de 100644 --- a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java @@ -76,7 +76,7 @@ public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); + verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); } @Test diff --git a/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java b/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java index 526a898..0e6cfde 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/util/FlinkTest.java @@ -6,6 +6,7 @@ import com.vinted.flink.bigquery.process.StreamState; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; @@ -142,23 +143,7 @@ public PipelineRunner withErrorAfter(int records) { public List run(Function> execute) throws Exception { - var env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(defaultParallelism); - env.enableCheckpointing(10); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10); - env.getCheckpointConfig().enableUnalignedCheckpoints(); - env.getCheckpointConfig().setCheckpointStorage(tempDir.toUri()); - env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - env.configure(config); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart( - retryCount, // number of restart attempts - Time.of(5, TimeUnit.MILLISECONDS) // delay - )); - - env.getConfig().registerKryoType(StreamState.class); - env.getConfig().registerKryoType(Rows.class); - + var env = getExecutionEnvironment(); var testSink = new TestSink(); var stream = error ? execute.apply(env).process(new ProcessFunctionWithError<>(errorAfter)) : execute.apply(env); stream.addSink(testSink); @@ -166,7 +151,7 @@ public List run(Function> execu return testSink.getResults(result); } - public void runWithCustomSink(Function> execute) throws Exception { + private StreamExecutionEnvironment getExecutionEnvironment() { var env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(defaultParallelism); env.enableCheckpointing(10); @@ -183,6 +168,11 @@ public void runWithCustomSink(Function void runWithCustomSink(Function> execute) throws Exception { + var env = getExecutionEnvironment(); execute.apply(env); env.execute(); }