Skip to content

Commit 0fca059

Browse files
Redshift batch inserts using COPY FROM operation
1 parent 3da3772 commit 0fca059

File tree

13 files changed

+956
-12
lines changed

13 files changed

+956
-12
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ jobs:
737737
REDSHIFT_VPC_SECURITY_GROUP_IDS: ${{ vars.REDSHIFT_VPC_SECURITY_GROUP_IDS }}
738738
REDSHIFT_S3_TPCH_TABLES_ROOT: ${{ vars.REDSHIFT_S3_TPCH_TABLES_ROOT }}
739739
REDSHIFT_S3_UNLOAD_ROOT: ${{ vars.REDSHIFT_S3_UNLOAD_ROOT }}
740+
REDSHIFT_S3_COPY_ROOT: ${{ vars.REDSHIFT_S3_COPY_ROOT }}
740741
if: >-
741742
contains(matrix.modules, 'trino-redshift') &&
742743
(contains(matrix.profile, 'cloud-tests') || contains(matrix.profile, 'fte-tests')) &&
@@ -750,6 +751,7 @@ jobs:
750751
-Dtest.redshift.jdbc.endpoint="${REDSHIFT_ENDPOINT}:${REDSHIFT_PORT}/" \
751752
-Dtest.redshift.s3.tpch.tables.root="${REDSHIFT_S3_TPCH_TABLES_ROOT}" \
752753
-Dtest.redshift.s3.unload.root="${REDSHIFT_S3_UNLOAD_ROOT}" \
754+
-Dtest.redshift.s3.copy.root="${REDSHIFT_S3_COPY_ROOT}" \
753755
-Dtest.redshift.iam.role="${REDSHIFT_IAM_ROLES}" \
754756
-Dtest.redshift.aws.region="${AWS_REGION}" \
755757
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \

docs/src/main/sphinx/connector/redshift.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,35 @@ potentially re-activate it again afterward.
262262
Additionally, define further required [S3 configuration such as IAM key, role,
263263
or region](/object-storage/file-system-s3), except `fs.native-s3.enabled`,
264264

265+
### Batch inserts from S3 using Redshift `COPY FROM`
266+
267+
The connector supports the Redshift `COPY FROM` command to
268+
efficiently write large batches of data to Redshift by staging them as
269+
Parquet files on Amazon S3. This method significantly improves sink
270+
performance compared to the default JDBC batch inserts for Redshift.
271+
272+
To enable this feature, configure a writeable S3 location with the
273+
following configuration properties:
274+
275+
:::{list-table} Parallel write configuration properties
276+
:widths: 30, 60
277+
:header-rows: 1
278+
279+
* - Property
280+
- Description
281+
* - `redshift.batched-inserts-copy-location`
282+
- A writeable location in Amazon S3 in the same AWS region as the
283+
Redshift cluster. Used for temporary Parquet staging files during
284+
insert operations. These files are automatically cleaned up after the load.
285+
* - `redshift.batched-inserts-copy-iam-role`
286+
- Fully specified ARN of the IAM Role to use for the `COPY FROM`
287+
command. This role must have read access to the S3 bucket.
288+
289+
:::
290+
291+
Use the `batched_inserts_copy_enabled` [catalog session property](/sql/set-session) to
292+
deactivate the batch inserts using copy for a specific query, and
293+
potentially re-activate it again afterward.
294+
295+
Additionally, define further required [S3 configuration such as IAM key, role,
296+
or region](/object-storage/file-system-s3).

plugin/trino-redshift/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@
7474
<groupId>io.trino</groupId>
7575
<artifactId>trino-filesystem-s3</artifactId>
7676
</dependency>
77+
<dependency>
78+
<groupId>io.trino</groupId>
79+
<artifactId>trino-hive</artifactId>
80+
</dependency>
7781

7882
<dependency>
7983
<groupId>io.trino</groupId>
@@ -110,6 +114,17 @@
110114
<artifactId>parquet-column</artifactId>
111115
</dependency>
112116

117+
<dependency>
118+
<groupId>org.apache.parquet</groupId>
119+
<artifactId>parquet-format-structures</artifactId>
120+
<exclusions>
121+
<exclusion>
122+
<groupId>javax.annotation</groupId>
123+
<artifactId>javax.annotation-api</artifactId>
124+
</exclusion>
125+
</exclusions>
126+
</dependency>
127+
113128
<dependency>
114129
<groupId>org.jdbi</groupId>
115130
<artifactId>jdbi3-core</artifactId>
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.redshift;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.airlift.log.Logger;
18+
import io.airlift.slice.Slice;
19+
import io.trino.filesystem.Location;
20+
import io.trino.filesystem.TrinoFileSystem;
21+
import io.trino.filesystem.TrinoFileSystemFactory;
22+
import io.trino.parquet.writer.ParquetSchemaConverter;
23+
import io.trino.parquet.writer.ParquetWriterOptions;
24+
import io.trino.plugin.hive.parquet.ParquetFileWriter;
25+
import io.trino.spi.Page;
26+
import io.trino.spi.connector.ConnectorPageSink;
27+
import io.trino.spi.connector.ConnectorPageSinkId;
28+
import io.trino.spi.connector.ConnectorSession;
29+
import io.trino.spi.type.ArrayType;
30+
import io.trino.spi.type.MapType;
31+
import io.trino.spi.type.RowType;
32+
import io.trino.spi.type.TimestampWithTimeZoneType;
33+
import io.trino.spi.type.Type;
34+
import io.trino.spi.type.TypeOperators;
35+
import org.apache.parquet.format.CompressionCodec;
36+
37+
import java.io.Closeable;
38+
import java.io.IOException;
39+
import java.io.UncheckedIOException;
40+
import java.util.ArrayList;
41+
import java.util.Collection;
42+
import java.util.List;
43+
import java.util.Optional;
44+
import java.util.concurrent.CompletableFuture;
45+
46+
import static com.google.common.base.Preconditions.checkArgument;
47+
import static com.google.common.base.Verify.verify;
48+
import static com.google.common.collect.ImmutableList.toImmutableList;
49+
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
50+
import static java.lang.String.format;
51+
import static java.util.Objects.requireNonNull;
52+
53+
public class RedshiftBatchedInsertsCopyPageSink
54+
implements ConnectorPageSink
55+
{
56+
private static final Logger log = Logger.get(RedshiftBatchedInsertsCopyPageSink.class);
57+
58+
private static final int MAX_ROWS_PER_FILE = 100_000;
59+
60+
private final TrinoFileSystem fileSystem;
61+
private final TypeOperators typeOperators;
62+
private final Location copyLocation;
63+
private final List<Type> columnTypes;
64+
private final String trinoVersion;
65+
private final ConnectorPageSinkId pageSinkId;
66+
private ParquetFileWriter parquetWriter;
67+
private long rowsInCurrentFile;
68+
private int filePartNumber;
69+
70+
public RedshiftBatchedInsertsCopyPageSink(
71+
ConnectorSession session,
72+
ConnectorPageSinkId pageSinkId,
73+
TrinoFileSystemFactory fileSystemFactory,
74+
TypeOperators typeOperators,
75+
Location copyLocationWithPrefix,
76+
List<String> columnNames,
77+
List<Type> columnTypes,
78+
String trinoVersion)
79+
{
80+
this.pageSinkId = requireNonNull(pageSinkId, "pageSinkId is null");
81+
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session);
82+
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
83+
this.copyLocation = copyLocationWithPrefix;
84+
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
85+
this.trinoVersion = trinoVersion;
86+
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must have the same size");
87+
88+
startNewPart();
89+
}
90+
91+
@Override
92+
public CompletableFuture<?> appendPage(Page page)
93+
{
94+
long positionCount = page.getPositionCount();
95+
if (positionCount == 0) {
96+
return NOT_BLOCKED;
97+
}
98+
99+
parquetWriter.appendRows(page);
100+
rowsInCurrentFile += positionCount;
101+
102+
if (rowsInCurrentFile >= MAX_ROWS_PER_FILE) {
103+
flushCurrentFile();
104+
startNewPart();
105+
rowsInCurrentFile = 0;
106+
}
107+
108+
return NOT_BLOCKED;
109+
}
110+
111+
private void startNewPart()
112+
{
113+
Location objectKey = copyLocation.appendPath(Long.toHexString(pageSinkId.getId())).appendPath(format("part-%d.parquet", filePartNumber++));
114+
this.parquetWriter = createParquetFileWriter(objectKey);
115+
}
116+
117+
@Override
118+
public CompletableFuture<Collection<Slice>> finish()
119+
{
120+
if (rowsInCurrentFile > 0) {
121+
flushCurrentFile();
122+
}
123+
124+
return CompletableFuture.completedFuture(List.of());
125+
}
126+
127+
@Override
128+
public void abort()
129+
{
130+
cleanupFiles();
131+
}
132+
133+
public void cleanupFiles()
134+
{
135+
try {
136+
fileSystem.deleteDirectory(copyLocation);
137+
}
138+
catch (IOException e) {
139+
log.warn("Unable to cleanup location %s: %s", copyLocation, e.getMessage());
140+
// We don't want to rethrow here, as the query has already completed successfully
141+
}
142+
}
143+
144+
private ParquetFileWriter createParquetFileWriter(Location path)
145+
{
146+
log.debug("Creating parquet file at location: %s", path.toString());
147+
ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder().build();
148+
CompressionCodec compressionCodec = CompressionCodec.SNAPPY;
149+
150+
try {
151+
Closeable rollbackAction = this::abort;
152+
153+
// According to Redshift docs, COPY inserts columns in the same order as the columns in the parquet. We
154+
// don't need the column names to match. We will instead create arbitrary column name to avoid any of the
155+
// parquet restrictions on column names.
156+
// See: https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-copy-from-columnar.html
157+
List<String> columnNames = new ArrayList<>();
158+
for (int i = 0; i < columnTypes.size(); i++) {
159+
columnNames.add(String.format("col%d", i));
160+
}
161+
162+
ParquetSchemaConverter converter = new ParquetSchemaConverter(
163+
columnTypes,
164+
columnNames,
165+
false,
166+
false);
167+
168+
List<Type> parquetTypes = columnTypes.stream()
169+
.map(type -> RedshiftParquetTypes.toParquetType(typeOperators, type))
170+
.collect(toImmutableList());
171+
172+
// we use identity column mapping; input page already contains only data columns per
173+
// DataLagePageSink.getDataPage()
174+
int[] identityMapping = new int[columnTypes.size()];
175+
for (int i = 0; i < identityMapping.length; ++i) {
176+
identityMapping[i] = i;
177+
}
178+
179+
return new ParquetFileWriter(
180+
fileSystem.newOutputFile(path),
181+
rollbackAction,
182+
parquetTypes,
183+
columnNames,
184+
converter.getMessageType(),
185+
converter.getPrimitiveTypes(),
186+
parquetWriterOptions,
187+
identityMapping,
188+
compressionCodec,
189+
trinoVersion,
190+
Optional.empty(),
191+
Optional.empty());
192+
}
193+
catch (IOException e) {
194+
throw new UncheckedIOException(e);
195+
}
196+
}
197+
198+
private void flushCurrentFile()
199+
{
200+
parquetWriter.commit();
201+
}
202+
203+
public static final class RedshiftParquetTypes
204+
{
205+
public static Type toParquetType(TypeOperators typeOperators, Type type)
206+
{
207+
if (type instanceof TimestampWithTimeZoneType timestamp) {
208+
verify(timestamp.getPrecision() == 3, "Unsupported type: %s", type);
209+
return TIMESTAMP_MILLIS;
210+
}
211+
if (type instanceof ArrayType arrayType) {
212+
return new ArrayType(toParquetType(typeOperators, arrayType.getElementType()));
213+
}
214+
if (type instanceof MapType mapType) {
215+
return new MapType(toParquetType(typeOperators, mapType.getKeyType()), toParquetType(typeOperators, mapType.getValueType()), typeOperators);
216+
}
217+
if (type instanceof RowType rowType) {
218+
return RowType.from(rowType.getFields().stream()
219+
.map(field -> RowType.field(field.getName().orElseThrow(), toParquetType(typeOperators, field.getType())))
220+
.collect(toImmutableList()));
221+
}
222+
return type;
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)