Skip to content

Commit ceb5a56

Browse files
Snapshot report and tests
1 parent e64ae0b commit ceb5a56

File tree

11 files changed

+805
-35
lines changed

11 files changed

+805
-35
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import io.delta.kernel.internal.actions.Metadata;
2828
import io.delta.kernel.internal.actions.Protocol;
2929
import io.delta.kernel.internal.fs.Path;
30+
import io.delta.kernel.internal.metrics.SnapshotContext;
31+
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
3032
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
3133
import io.delta.kernel.internal.replay.LogReplay;
3234
import io.delta.kernel.internal.snapshot.LogSegment;
3335
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
36+
import io.delta.kernel.metrics.SnapshotReport;
3437
import io.delta.kernel.types.StructType;
3538
import java.util.Map;
3639
import java.util.Optional;
@@ -45,13 +48,15 @@ public class SnapshotImpl implements Snapshot {
4548
private final Metadata metadata;
4649
private final LogSegment logSegment;
4750
private Optional<Long> inCommitTimestampOpt;
51+
private final SnapshotReport snapshotReport;
4852

4953
public SnapshotImpl(
5054
Path dataPath,
5155
LogSegment logSegment,
5256
LogReplay logReplay,
5357
Protocol protocol,
54-
Metadata metadata) {
58+
Metadata metadata,
59+
SnapshotContext snapshotContext) {
5560
this.logPath = new Path(dataPath, "_delta_log");
5661
this.dataPath = dataPath;
5762
this.version = logSegment.version;
@@ -60,6 +65,13 @@ public SnapshotImpl(
6065
this.protocol = protocol;
6166
this.metadata = metadata;
6267
this.inCommitTimestampOpt = Optional.empty();
68+
this.snapshotReport =
69+
new SnapshotReportImpl(
70+
snapshotContext.getTablePath(),
71+
Optional.of(this.version),
72+
snapshotContext.getProvidedTimestamp(),
73+
snapshotContext.getSnapshotMetrics(),
74+
Optional.empty() /* exception */);
6375
}
6476

6577
@Override
@@ -74,6 +86,7 @@ public StructType getSchema(Engine engine) {
7486

7587
@Override
7688
public ScanBuilder getScanBuilder(Engine engine) {
89+
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
7790
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
7891
}
7992

@@ -128,6 +141,10 @@ public Path getDataPath() {
128141
return dataPath;
129142
}
130143

144+
public SnapshotReport getSnapshotReport() {
145+
return snapshotReport;
146+
}
147+
131148
/**
132149
* Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot,
133150
* this returns -1.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import io.delta.kernel.exceptions.TableNotFoundException;
2727
import io.delta.kernel.internal.actions.Protocol;
2828
import io.delta.kernel.internal.fs.Path;
29+
import io.delta.kernel.internal.metrics.SnapshotContext;
30+
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
2931
import io.delta.kernel.internal.snapshot.SnapshotManager;
3032
import io.delta.kernel.internal.util.Clock;
33+
import io.delta.kernel.metrics.SnapshotReport;
3134
import io.delta.kernel.types.StructField;
3235
import io.delta.kernel.types.StructType;
3336
import io.delta.kernel.utils.CloseableIterator;
@@ -120,19 +123,38 @@ public Optional<TableIdentifier> getTableIdentifier() {
120123

121124
@Override
122125
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
123-
return snapshotManager.buildLatestSnapshot(engine);
126+
SnapshotContext snapshotContext = SnapshotContext.forLatestSnapshot(tablePath);
127+
try {
128+
return snapshotManager.buildLatestSnapshot(engine, snapshotContext);
129+
} catch (Exception e) {
130+
recordSnapshotReport(engine, snapshotContext, e);
131+
throw e;
132+
}
124133
}
125134

126135
@Override
127136
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
128137
throws TableNotFoundException {
129-
return snapshotManager.getSnapshotAt(engine, versionId);
138+
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, versionId);
139+
try {
140+
return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext);
141+
} catch (Exception e) {
142+
recordSnapshotReport(engine, snapshotContext, e);
143+
throw e;
144+
}
130145
}
131146

132147
@Override
133148
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
134149
throws TableNotFoundException {
135-
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC);
150+
SnapshotContext snapshotContext =
151+
SnapshotContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
152+
try {
153+
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
154+
} catch (Exception e) {
155+
recordSnapshotReport(engine, snapshotContext, e);
156+
throw e;
157+
}
136158
}
137159

138160
@Override
@@ -353,4 +375,16 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
353375
logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema);
354376
return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
355377
}
378+
379+
/** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */
380+
private void recordSnapshotReport(Engine engine, SnapshotContext snapshotContext, Exception e) {
381+
SnapshotReport snapshotReport =
382+
new SnapshotReportImpl(
383+
snapshotContext.getTablePath(),
384+
snapshotContext.getVersion(),
385+
snapshotContext.getProvidedTimestamp(),
386+
snapshotContext.getSnapshotMetrics(),
387+
Optional.of(e));
388+
engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport));
389+
}
356390
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.delta.kernel.exceptions.TableNotFoundException;
3232
import io.delta.kernel.internal.actions.*;
3333
import io.delta.kernel.internal.fs.Path;
34+
import io.delta.kernel.internal.metrics.SnapshotContext;
35+
import io.delta.kernel.internal.metrics.SnapshotMetrics;
3436
import io.delta.kernel.internal.replay.LogReplay;
3537
import io.delta.kernel.internal.snapshot.LogSegment;
3638
import io.delta.kernel.internal.snapshot.SnapshotHint;
@@ -105,8 +107,11 @@ public Transaction build(Engine engine) {
105107
// Table doesn't exist yet. Create an initial snapshot with the new schema.
106108
Metadata metadata = getInitialMetadata();
107109
Protocol protocol = getInitialProtocol();
108-
LogReplay logReplay = getEmptyLogReplay(engine, metadata, protocol);
109-
snapshot = new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol);
110+
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, 0);
111+
LogReplay logReplay =
112+
getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics());
113+
snapshot =
114+
new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext);
110115
}
111116

112117
boolean isNewTable = snapshot.getVersion(engine) < 0;
@@ -204,8 +209,19 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
204209
}
205210

206211
private class InitialSnapshot extends SnapshotImpl {
207-
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
208-
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
212+
InitialSnapshot(
213+
Path dataPath,
214+
LogReplay logReplay,
215+
Metadata metadata,
216+
Protocol protocol,
217+
SnapshotContext snapshotContext) {
218+
super(
219+
dataPath,
220+
LogSegment.empty(table.getLogPath()),
221+
logReplay,
222+
protocol,
223+
metadata,
224+
snapshotContext);
209225
}
210226

211227
@Override
@@ -214,14 +230,16 @@ public long getTimestamp(Engine engine) {
214230
}
215231
}
216232

217-
private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
233+
private LogReplay getEmptyLogReplay(
234+
Engine engine, Metadata metadata, Protocol protocol, SnapshotMetrics snapshotMetrics) {
218235
return new LogReplay(
219236
table.getLogPath(),
220237
table.getDataPath(),
221238
-1,
222239
engine,
223240
LogSegment.empty(table.getLogPath()),
224-
Optional.empty()) {
241+
Optional.empty(),
242+
snapshotMetrics) {
225243

226244
@Override
227245
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.metrics;
17+
18+
import io.delta.kernel.metrics.SnapshotReport;
19+
import java.util.Optional;
20+
21+
/** Stores the context for a given Snapshot query. Used to generate a {@link SnapshotReport} */
22+
public class SnapshotContext {
23+
24+
public static SnapshotContext forLatestSnapshot(String tablePath) {
25+
return new SnapshotContext(tablePath, Optional.empty(), Optional.empty());
26+
}
27+
28+
public static SnapshotContext forVersionSnapshot(String tablePath, long version) {
29+
return new SnapshotContext(tablePath, Optional.of(version), Optional.empty());
30+
}
31+
32+
public static SnapshotContext forTimestampSnapshot(String tablePath, long timestamp) {
33+
return new SnapshotContext(tablePath, Optional.empty(), Optional.of(timestamp));
34+
}
35+
36+
private Optional<Long> version;
37+
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();
38+
private final String tablePath;
39+
private final Optional<Long> providedTimestamp;
40+
41+
private SnapshotContext(
42+
String tablePath, Optional<Long> version, Optional<Long> providedTimestamp) {
43+
this.tablePath = tablePath;
44+
this.version = version;
45+
this.providedTimestamp = providedTimestamp;
46+
}
47+
48+
public Optional<Long> getVersion() {
49+
return version;
50+
}
51+
52+
public SnapshotMetrics getSnapshotMetrics() {
53+
return snapshotMetrics;
54+
}
55+
56+
public String getTablePath() {
57+
return tablePath;
58+
}
59+
60+
public Optional<Long> getProvidedTimestamp() {
61+
return providedTimestamp;
62+
}
63+
64+
public void setVersion(long updatedVersion) {
65+
version = Optional.of(updatedVersion);
66+
}
67+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.metrics;
17+
18+
/** Stores the metrics for an ongoing snapshot creation */
19+
public class SnapshotMetrics {
20+
21+
public final Timer timestampToVersionResolutionDuration = new Timer();
22+
23+
public final Timer loadProtocolAndMetadataDuration = new Timer();
24+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.metrics;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
import io.delta.kernel.metrics.SnapshotMetricsResult;
21+
import io.delta.kernel.metrics.SnapshotReport;
22+
import java.util.Optional;
23+
import java.util.UUID;
24+
25+
/** A basic POJO implementation of {@link SnapshotReport} for creating them */
26+
public class SnapshotReportImpl implements SnapshotReport {
27+
28+
private final String tablePath;
29+
private final Optional<Long> version;
30+
private final Optional<Long> providedTimestamp;
31+
private final UUID reportUUID;
32+
private final SnapshotMetricsResult snapshotMetrics;
33+
private final Optional<Exception> exception;
34+
35+
public SnapshotReportImpl(
36+
String tablePath,
37+
Optional<Long> version,
38+
Optional<Long> providedTimestamp,
39+
SnapshotMetrics snapshotMetrics,
40+
Optional<Exception> exception) {
41+
this.tablePath = requireNonNull(tablePath);
42+
this.version = requireNonNull(version);
43+
this.providedTimestamp = requireNonNull(providedTimestamp);
44+
this.snapshotMetrics =
45+
SnapshotMetricsResult.fromSnapshotMetrics(requireNonNull(snapshotMetrics));
46+
this.exception = requireNonNull(exception);
47+
this.reportUUID = UUID.randomUUID();
48+
}
49+
50+
@Override
51+
public String tablePath() {
52+
return tablePath;
53+
}
54+
55+
@Override
56+
public String operationType() {
57+
return OPERATION_TYPE;
58+
}
59+
60+
@Override
61+
public Optional<Exception> exception() {
62+
return exception;
63+
}
64+
65+
@Override
66+
public UUID reportUUID() {
67+
return reportUUID;
68+
}
69+
70+
@Override
71+
public Optional<Long> version() {
72+
return version;
73+
}
74+
75+
@Override
76+
public Optional<Long> providedTimestamp() {
77+
return providedTimestamp;
78+
}
79+
80+
@Override
81+
public SnapshotMetricsResult snapshotMetrics() {
82+
return snapshotMetrics;
83+
}
84+
}

0 commit comments

Comments
 (0)