Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2471: Add support for geometry logical type #1379

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>${jts.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.parquet.column.statistics;

import org.apache.parquet.column.statistics.geometry.GeometryStatistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

Expand All @@ -30,6 +32,7 @@ public class BinaryStatistics extends Statistics<Binary> {

private Binary max;
private Binary min;
private GeometryStatistics geometryStatistics = null;

/**
* @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
Expand All @@ -41,6 +44,10 @@ public BinaryStatistics() {

BinaryStatistics(PrimitiveType type) {
super(type);
LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
if (logicalType instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) {
geometryStatistics = new GeometryStatistics();
}
}

private BinaryStatistics(BinaryStatistics other) {
Expand All @@ -49,6 +56,9 @@ private BinaryStatistics(BinaryStatistics other) {
initializeStats(other.min, other.max);
}
setNumNulls(other.getNumNulls());
if (other.geometryStatistics != null) {
geometryStatistics = other.geometryStatistics.copy();
}
}

@Override
Expand All @@ -62,6 +72,9 @@ public void updateStats(Binary value) {
} else if (comparator().compare(max, value) < 0) {
max = value.copy();
}
if (geometryStatistics != null) {
geometryStatistics.update(value);
}
}

@Override
Expand All @@ -72,6 +85,9 @@ public void mergeStatisticsMinMax(Statistics stats) {
} else {
updateStats(binaryStats.getMin(), binaryStats.getMax());
}
if (geometryStatistics != null) {
geometryStatistics.merge(binaryStats.geometryStatistics);
}
}

/**
Expand Down Expand Up @@ -190,4 +206,12 @@ public void setMinMax(Binary min, Binary max) {
public BinaryStatistics copy() {
return new BinaryStatistics(this);
}

public void setGeometryStatistics(GeometryStatistics geometryStatistics) {
this.geometryStatistics = geometryStatistics;
}

public GeometryStatistics getGeometryStatistics() {
return geometryStatistics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Arrays;
import org.apache.parquet.column.UnknownColumnTypeException;
import org.apache.parquet.column.statistics.geometry.GeometryStatistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.Float16;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand Down Expand Up @@ -64,6 +65,10 @@ public Builder withNumNulls(long numNulls) {
return this;
}

public Builder withGeometryStatistics(GeometryStatistics geometryStatistics) {
throw new UnsupportedOperationException("Please use the GeometryBuilder");
}

public Statistics<?> build() {
Statistics<?> stats = createStats(type);
if (min != null && max != null) {
Expand Down Expand Up @@ -178,6 +183,30 @@ public Statistics<?> build() {
}
}

// Builder for GEOMETRY type to handle GeometryStatistics
private static class GeometryBuilder extends Builder {

private GeometryStatistics geometryStatistics;

public GeometryBuilder(PrimitiveType type) {
super(type);
assert type.getPrimitiveTypeName() == PrimitiveTypeName.BINARY;
}

@Override
public Builder withGeometryStatistics(GeometryStatistics geometryStatistics) {
this.geometryStatistics = geometryStatistics;
return this;
}

@Override
public Statistics<?> build() {
BinaryStatistics stats = (BinaryStatistics) super.build();
stats.setGeometryStatistics(geometryStatistics);
return stats;
}
}

private final PrimitiveType type;
private final PrimitiveComparator<T> comparator;
private boolean hasNonNullValue;
Expand Down Expand Up @@ -269,6 +298,11 @@ public static Builder getBuilderForReading(PrimitiveType type) {
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) {
return new Float16Builder(type);
}
return new Builder(type);
case BINARY:
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) {
return new GeometryBuilder(type);
}
default:
return new Builder(type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.statistics.geometry;

import org.apache.parquet.Preconditions;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;

public class BoundingBox {

private double xMin = Double.MAX_VALUE;
private double xMax = Double.MIN_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the init value of xMax, yMax, ..., please use -Double.MAX_VALUE because Double.MIN_VALUE is not guaranteed to be negative. See https://stackoverflow.com/questions/3884793/why-is-double-min-value-in-not-negative

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double.POSITIVE_INFINITY and Double.NEGATIVE_INFINITY are also valid choices for initial min/max values.

For extracting the bounds for each ordinate, org.locationtech.jts.io.twkb.BoundsExtractor in the JTS code base is also a good example.

private double yMin = Double.MAX_VALUE;
private double yMax = Double.MIN_VALUE;
private double zMin = Double.MAX_VALUE;
private double zMax = Double.MIN_VALUE;
private double mMin = Double.MAX_VALUE;
private double mMax = Double.MIN_VALUE;

public BoundingBox(
double xMin, double xMax, double yMin, double yMax, double zMin, double zMax, double mMin, double mMax) {
this.xMin = xMin;
this.xMax = xMax;
this.yMin = yMin;
this.yMax = yMax;
this.zMin = zMin;
this.zMax = zMax;
this.mMin = mMin;
this.mMax = mMax;
}

public BoundingBox() {}

public double getXMin() {
return xMin;
}

public double getXMax() {
return xMax;
}

public double getYMin() {
return yMin;
}

public double getYMax() {
return yMax;
}

public double getZMin() {
return zMin;
}

public double getZMax() {
return zMax;
}

public double getMMin() {
return mMin;
}

public double getMMax() {
return mMax;
}

void update(Geometry geom) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct way to update Geometry in JTS. You need to use CoordinateSequenceFilter. Sedona has many examples here: https://github.com/apache/sedona/blob/0a1db3d35fd7b5721301aa3ce5d39aa8cd828661/common/src/main/java/org/apache/sedona/common/Functions.java#L294

if (geom == null || geom.isEmpty()) {
return;
}
Coordinate[] coordinates = geom.getCoordinates();
for (Coordinate coordinate : coordinates) {
update(coordinate.getX(), coordinate.getY(), coordinate.getZ(), coordinate.getM());
}
Comment on lines +87 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised that JTS doesn't have an optimized method for this (it almost certainly has at least an internal one for building indexes internally)

}

public void update(double x, double y, double z, double m) {
xMin = Math.min(xMin, x);
xMax = Math.max(xMax, x);
yMin = Math.min(yMin, y);
yMax = Math.max(yMax, y);
zMin = Math.min(zMin, z);
zMax = Math.max(zMax, z);
mMin = Math.min(mMin, m);
mMax = Math.max(mMax, m);
}

public void merge(BoundingBox other) {
Preconditions.checkArgument(other != null, "Cannot merge with null bounding box");
xMin = Math.min(xMin, other.xMin);
xMax = Math.max(xMax, other.xMax);
yMin = Math.min(yMin, other.yMin);
yMax = Math.max(yMax, other.yMax);
zMin = Math.min(zMin, other.zMin);
zMax = Math.max(zMax, other.zMax);
mMin = Math.min(mMin, other.mMin);
mMax = Math.max(mMax, other.mMax);
}

public void reset() {
xMin = Double.MAX_VALUE;
xMax = Double.MIN_VALUE;
yMin = Double.MAX_VALUE;
yMax = Double.MIN_VALUE;
zMin = Double.MAX_VALUE;
zMax = Double.MIN_VALUE;
mMin = Double.MAX_VALUE;
mMax = Double.MIN_VALUE;
}

public void abort() {
xMin = Double.NaN;
xMax = Double.NaN;
yMin = Double.NaN;
yMax = Double.NaN;
zMin = Double.NaN;
zMax = Double.NaN;
mMin = Double.NaN;
mMax = Double.NaN;
}

public BoundingBox copy() {
return new BoundingBox(xMin, xMax, yMin, yMax, zMin, zMax, mMin, mMax);
}

@Override
public String toString() {
return "BoundingBox{" + "xMin="
+ xMin + ", xMax="
+ xMax + ", yMin="
+ yMin + ", yMax="
+ yMax + ", zMin="
+ zMin + ", zMax="
+ zMax + ", mMin="
+ mMin + ", mMax="
+ mMax + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.statistics.geometry;

import java.nio.ByteBuffer;
import org.apache.parquet.Preconditions;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;

public class Covering {

protected final LogicalTypeAnnotation.Edges edges;
protected ByteBuffer geometry;

public Covering(ByteBuffer geometry, LogicalTypeAnnotation.Edges edges) {
Preconditions.checkArgument(geometry != null, "Geometry cannot be null");
Preconditions.checkArgument(edges != null, "Edges cannot be null");
this.geometry = geometry;
this.edges = edges;
}

public ByteBuffer getGeometry() {
return geometry;
}

public LogicalTypeAnnotation.Edges getEdges() {
return edges;
}

void update(Geometry geom) {
throw new UnsupportedOperationException(
"Update is not supported for " + this.getClass().getSimpleName());
}

public void merge(Covering other) {
throw new UnsupportedOperationException(
"Merge is not supported for " + this.getClass().getSimpleName());
}

public void reset() {
throw new UnsupportedOperationException(
"Reset is not supported for " + this.getClass().getSimpleName());
}

public void abort() {
throw new UnsupportedOperationException(
"Abort is not supported for " + this.getClass().getSimpleName());
}

public Covering copy() {
return new Covering(geometry.duplicate(), edges);
}

@Override
public String toString() {
String geomText;
try {
geomText = new WKBReader().read(geometry.array()).toText();
} catch (ParseException e) {
geomText = "Invalid Geometry";
}

return "Covering{" + "geometry=" + geomText + ", edges=" + edges + '}';
}
}
Loading
Loading