-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Metrics serializer + LoggingMetricsReporter
- Loading branch information
1 parent
977ac3a
commit b6c3b0f
Showing
4 changed files
with
300 additions
and
0 deletions.
There are no files selected for viewing
137 changes: 137 additions & 0 deletions
137
...l/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.metrics; | ||
|
||
import com.fasterxml.jackson.core.JsonGenerator; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.SerializerProvider; | ||
import com.fasterxml.jackson.databind.module.SimpleModule; | ||
import com.fasterxml.jackson.databind.ser.std.StdSerializer; | ||
import io.delta.kernel.metrics.SnapshotMetricsResult; | ||
import io.delta.kernel.metrics.SnapshotReport; | ||
import java.io.IOException; | ||
import java.util.Optional; | ||
|
||
/** Defines JSON serializers for {@link MetricsReport} types */ | ||
public final class MetricsReportSerializers { | ||
|
||
///////////////// | ||
// Public APIs // | ||
///////////////// | ||
|
||
/** | ||
* Serializes a {@link SnapshotReport} to a JSON string | ||
* | ||
* @throws JsonProcessingException | ||
*/ | ||
public static String serializeSnapshotReport(SnapshotReport snapshotReport) | ||
throws JsonProcessingException { | ||
return OBJECT_MAPPER.writeValueAsString(snapshotReport); | ||
} | ||
|
||
///////////////////////////////// | ||
// Private fields and methods // | ||
//////////////////////////////// | ||
|
||
private static final ObjectMapper OBJECT_MAPPER = | ||
new ObjectMapper() | ||
.registerModule( | ||
new SimpleModule() | ||
.addSerializer(SnapshotReport.class, new SnapshotReportSerializer())); | ||
|
||
private MetricsReportSerializers() {} | ||
|
||
///////////////// | ||
// Serializers // | ||
//////////////// | ||
|
||
static class SnapshotReportSerializer extends StdSerializer<SnapshotReport> { | ||
|
||
SnapshotReportSerializer() { | ||
super(SnapshotReport.class); | ||
} | ||
|
||
@Override | ||
public void serialize( | ||
SnapshotReport snapshotReport, JsonGenerator gen, SerializerProvider provider) | ||
throws IOException { | ||
gen.writeStartObject(); | ||
gen.writeStringField("tablePath", snapshotReport.tablePath()); | ||
gen.writeStringField("operationType", snapshotReport.operationType()); | ||
gen.writeStringField("reportUUID", snapshotReport.reportUUID().toString()); | ||
writeOptionalField( | ||
"version", snapshotReport.version(), item -> gen.writeNumberField("version", item), gen); | ||
writeOptionalField( | ||
"providedTimestamp", | ||
snapshotReport.providedTimestamp(), | ||
item -> gen.writeNumberField("providedTimestamp", item), | ||
gen); | ||
gen.writeFieldName("snapshotMetrics"); | ||
writeSnapshotMetrics(snapshotReport.snapshotMetrics(), gen); | ||
writeOptionalField( | ||
"exception", | ||
snapshotReport.exception(), | ||
item -> gen.writeStringField("exception", item.toString()), | ||
gen); | ||
gen.writeEndObject(); | ||
} | ||
|
||
private void writeSnapshotMetrics(SnapshotMetricsResult snapshotMetrics, JsonGenerator gen) | ||
throws IOException { | ||
gen.writeStartObject(); | ||
writeOptionalField( | ||
"timestampToVersionResolutionDuration", | ||
snapshotMetrics.timestampToVersionResolutionDuration(), | ||
item -> gen.writeNumberField("timestampToVersionResolutionDuration", item), | ||
gen); | ||
gen.writeNumberField( | ||
"loadProtocolAndMetadataDuration", snapshotMetrics.loadInitialDeltaActionsDuration()); | ||
gen.writeEndObject(); | ||
} | ||
} | ||
|
||
////////////////////////////////// | ||
// Helper fx for serialization // | ||
///////////////////////////////// | ||
|
||
/** | ||
* For an optional item - If it is empty, writes out a null value - If it is non-empty, writes the | ||
* items value using the provided nonNullConsumer | ||
* | ||
* @param fieldName name of the field to write | ||
* @param item optional item | ||
* @param nonNullConsumer consumes an items non-null value | ||
* @throws IOException | ||
*/ | ||
private static <T> void writeOptionalField( | ||
String fieldName, | ||
Optional<T> item, | ||
ConsumerThrowsIOException<T> nonNullConsumer, | ||
JsonGenerator gen) | ||
throws IOException { | ||
if (item.isPresent()) { | ||
nonNullConsumer.accept(item.get()); | ||
} else { | ||
gen.writeNullField(fieldName); | ||
} | ||
} | ||
|
||
// Need to create custom consumer so we can propagate the IOException without wrapping it | ||
private interface ConsumerThrowsIOException<T> { | ||
void accept(T t) throws IOException; | ||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
...el-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.metrics | ||
|
||
import java.util.Optional | ||
|
||
import io.delta.kernel.metrics.SnapshotReport | ||
import org.scalatest.funsuite.AnyFunSuite | ||
|
||
class MetricsReportSerializerSuite extends AnyFunSuite { | ||
|
||
private def optionToString[T](option: Optional[T]): String = { | ||
if (option.isPresent) { | ||
if (option.get().isInstanceOf[String]) { | ||
s""""${option.get()}"""" // For string objects wrap with quotes | ||
} else { | ||
option.get().toString | ||
} | ||
} else { | ||
"null" | ||
} | ||
} | ||
|
||
def testSnapshotReport(snapshotReport: SnapshotReport): Unit = { | ||
val timestampToVersionResolutionDuration = optionToString( | ||
snapshotReport.snapshotMetrics().timestampToVersionResolutionDuration()) | ||
val loadProtocolAndMetadataDuration = | ||
snapshotReport.snapshotMetrics().loadInitialDeltaActionsDuration() | ||
val exception: Optional[String] = snapshotReport.exception().map(_.toString) | ||
val expectedJson = | ||
s""" | ||
|{"tablePath":"${snapshotReport.tablePath()}", | ||
|"operationType":"Snapshot", | ||
|"reportUUID":"${snapshotReport.reportUUID()}", | ||
|"version":${optionToString(snapshotReport.version())}, | ||
|"providedTimestamp":${optionToString(snapshotReport.providedTimestamp())}, | ||
|"snapshotMetrics":{ | ||
|"timestampToVersionResolutionDuration":${timestampToVersionResolutionDuration}, | ||
|"loadProtocolAndMetadataDuration":${loadProtocolAndMetadataDuration} | ||
|}, | ||
|"exception":${optionToString(exception)} | ||
|} | ||
|""".stripMargin.replaceAll("\n", "") | ||
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport)) | ||
} | ||
|
||
test("SnapshotReport serializer") { | ||
val snapshotMetrics1 = new SnapshotMetrics() | ||
snapshotMetrics1.timestampToVersionResolutionDuration.record(10) | ||
snapshotMetrics1.loadProtocolAndMetadataDuration.record(1000) | ||
val exception = new RuntimeException("something something failed") | ||
|
||
val snapshotReport1 = new SnapshotReportImpl( | ||
"/table/path", | ||
Optional.of(1), | ||
Optional.of(0), | ||
snapshotMetrics1, | ||
Optional.of(exception) | ||
) | ||
|
||
// Manually check expected JSON | ||
val expectedJson = | ||
s""" | ||
|{"tablePath":"/table/path", | ||
|"operationType":"Snapshot", | ||
|"reportUUID":"${snapshotReport1.reportUUID()}", | ||
|"version":1, | ||
|"providedTimestamp":0, | ||
|"snapshotMetrics":{ | ||
|"timestampToVersionResolutionDuration":10, | ||
|"loadProtocolAndMetadataDuration":1000 | ||
|}, | ||
|"exception":"$exception" | ||
|} | ||
|""".stripMargin.replaceAll("\n", "") | ||
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1)) | ||
|
||
// Check with test function | ||
testSnapshotReport(snapshotReport1) | ||
|
||
// Empty options for all possible fields | ||
val snapshotMetrics2 = new SnapshotMetrics() | ||
val snapshotReport2 = new SnapshotReportImpl( | ||
"/table/path", | ||
Optional.empty(), | ||
Optional.empty(), | ||
snapshotMetrics2, | ||
Optional.empty() | ||
) | ||
testSnapshotReport(snapshotReport2) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
...kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.defaults.engine; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import io.delta.kernel.engine.MetricsReporter; | ||
import io.delta.kernel.internal.metrics.MetricsReportSerializers; | ||
import io.delta.kernel.internal.snapshot.SnapshotManager; | ||
import io.delta.kernel.metrics.MetricsReport; | ||
import io.delta.kernel.metrics.SnapshotReport; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info | ||
* level. | ||
*/ | ||
public class LoggingMetricsReporter implements MetricsReporter { | ||
|
||
private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); | ||
|
||
@Override | ||
public void report(MetricsReport report) { | ||
try { | ||
if (report instanceof SnapshotReport) { | ||
logger.info( | ||
"SnapshotReport = %s", | ||
MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report)); | ||
} else { | ||
logger.info( | ||
"%s = [%s does not support serializing this type of MetricReport]", | ||
report.getClass(), this.getClass()); | ||
} | ||
} catch (JsonProcessingException e) { | ||
logger.info("Encountered exception while serializing report %s: %s", report, e); | ||
} | ||
} | ||
} |