Skip to content

Commit

Permalink
Merge pull request #20 from vinted/refactor/simplify-structure
Browse files Browse the repository at this point in the history
Refactor/simplify structure
  • Loading branch information
gintarasm authored Feb 23, 2024
2 parents b1316b2 + f40c212 commit 4b02e8e
Show file tree
Hide file tree
Showing 22 changed files with 295 additions and 285 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ There are builder classes to simplify constructing a BigQuery sink. The code sni
```java
var credentials = new JsonCredentialsProvider("key");

var clientProvider = new BigQueryProtoClientProvider(credentials,
var clientProvider = new BigQueryProtoClientProvider<String>(credentials,
WriterSettings.newBuilder()
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newProto()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder()
.withClientProvider(clientProvider)
.withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.withRowValueSerializer(new NoOpRowSerializer<>())
Expand Down Expand Up @@ -73,12 +73,12 @@ BigQuery supports two types of data formats: json and proto. When creating a str
- JSON

```java
var clientProvider = new BigQueryJsonClientProvider(credentials,
var clientProvider = new BigQueryJsonClientProvider<String>(credentials,
WriterSettings.newBuilder()
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newJson()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder()
```

- Proto
Expand All @@ -89,7 +89,7 @@ var clientProvider = new BigQueryProtoClientProvider(credentials,
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newProto()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder();
```

# Exactly once
Expand Down
18 changes: 7 additions & 11 deletions src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,36 @@
import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSink;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryStreamSink<A, StreamT> {
public class BigQueryStreamSink<A> {
private RowValueSerializer<A> rowValueSerializer = new NoOpRowSerializer<>();
private ClientProvider<StreamT> clientProvider = null;
private ClientProvider<A> clientProvider = null;

private ExecutorProvider executorProvider = MoreExecutors::directExecutor;

private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private BigQueryStreamSink() {
}

public static <A> BigQueryStreamSink<A, StreamWriter> newProto() {
public static <A> BigQueryStreamSink<A> newBuilder() {
return new BigQueryStreamSink<>();
}

public static <A> BigQueryStreamSink<A, JsonStreamWriter> newJson() {
return new BigQueryStreamSink<>();
}

public BigQueryStreamSink<A, StreamT> withRowValueSerializer(RowValueSerializer<A> serializer) {
public BigQueryStreamSink<A> withRowValueSerializer(RowValueSerializer<A> serializer) {
this.rowValueSerializer = serializer;
return this;
}

public BigQueryStreamSink<A, StreamT> withClientProvider(ClientProvider<StreamT> clientProvider) {
public BigQueryStreamSink<A> withClientProvider(ClientProvider<A> clientProvider) {
this.clientProvider = clientProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withExecutorProvider(ExecutorProvider executorProvider) {
public BigQueryStreamSink<A> withExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
public BigQueryStreamSink<A> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
this.deliveryGuarantee = deliveryGuarantee;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;

public class BigQueryJsonClientProvider implements ClientProvider<JsonStreamWriter> {
public class BigQueryJsonClientProvider<A> implements ClientProvider<A> {
private Credentials credentials;
private WriterSettings writerSettings;

Expand All @@ -42,16 +43,18 @@ public BigQueryWriteClient getClient() {
}

@Override
public JsonStreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
return JsonStreamWriter
var writer = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.concurrent.Executors;

public class BigQueryProtoClientProvider implements ClientProvider<StreamWriter> {
private Credentials credentials;
private WriterSettings writerSettings;
public class BigQueryProtoClientProvider<A> implements ClientProvider<A> {
private final Credentials credentials;
private final WriterSettings writerSettings;

private transient BigQueryWriteClient bigQueryWriteClient;

Expand All @@ -37,7 +38,7 @@ public BigQueryWriteClient getClient() {
}

@Override
public StreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table));
var protoSchema = ProtoSchemaConverter.convert(descriptor);
Expand All @@ -53,7 +54,8 @@ public StreamWriter getWriter(String streamName, TableId table) {
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
return streamWriterBuilder.build();

return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.model.Rows;

public interface BigQueryStreamWriter<T> extends AutoCloseable {
ApiFuture<AppendRowsResponse> append(Rows<T> data);
ApiFuture<AppendRowsResponse> append(Rows<T> data, long offset);

String getStreamName();

String getWriterId();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.Serializable;

public interface ClientProvider<A> extends Serializable {
BigQueryWriteClient getClient();

A getWriter(String streamName, TableId table);
BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer);

WriterSettings writeSettings();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;

public class JsonStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer;

public JsonStreamWriter(RowValueSerializer<A> rowSerializer, com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray, offset);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.protobuf.ByteString;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.util.stream.Collectors;

public class ProtoStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final StreamWriter writer;

public ProtoStreamWriter(RowValueSerializer<A> rowSerializer, StreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows);
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows, offset);
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Loading

0 comments on commit 4b02e8e

Please sign in to comment.