Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
package org.apache.iceberg.mr.hive.udf;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
Expand Down Expand Up @@ -52,7 +57,7 @@
value = "_FUNC_(value, bucketCount) - " +
"Returns the bucket value calculated by Iceberg bucket transform function ",
extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4")
public class GenericUDFIcebergBucket extends GenericUDF {
public class GenericUDFIcebergBucket extends GenericUDF implements StatEstimatorProvider {
private final IntWritable result = new IntWritable();
private int numBuckets = -1;
private transient PrimitiveObjectInspector argumentOI;
Expand Down Expand Up @@ -209,4 +214,32 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException {
public String getDisplayString(String[] children) {
return getStandardDisplayString("iceberg_bucket", children);
}

@Override
public StatEstimator getStatEstimator() {
return new BucketStatEstimator();
}

private static class BucketStatEstimator implements StatEstimator {
@Override
public Optional<ColStatistics> estimate(List<ColStatistics> argStats) {
if (argStats.size() != 2) {
return Optional.empty();
}
ColStatistics inputStats = argStats.get(0);
ColStatistics bucketCountStats = argStats.get(1);
ColStatistics.Range bucketRange = bucketCountStats.getRange();
if (bucketRange == null || bucketRange.minValue == null) {
return Optional.empty();
}
long numBuckets = bucketRange.minValue.longValue();
if (numBuckets <= 0) {
return Optional.empty();
}
ColStatistics result = inputStats.clone();
result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets));
result.setRange(0, numBuckets - 1);
return Optional.of(result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.udf;

import java.util.Arrays;
import java.util.Optional;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
import org.junit.Assert;
import org.junit.Test;

/**
* Tests for the BucketStatEstimator in GenericUDFIcebergBucket.
* Verifies that the StatEstimator correctly narrows NDV based on bucket count.
*/
public class TestGenericUDFIcebergBucketStatEstimator {

@Test
public void testNdvNarrowedByBucketCount() {
// source NDV (100) > numBuckets (8) -> output NDV should be 8
Optional<ColStatistics> result = estimateBucket(100, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(8, result.get().getCountDistint());
}

@Test
public void testNdvBelowBucketCount() {
// source NDV (3) < numBuckets (8) -> output NDV should be 3
Optional<ColStatistics> result = estimateBucket(3, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(3, result.get().getCountDistint());
}

@Test
public void testNdvEqualsBucketCount() {
// source NDV (8) == numBuckets (8) -> output NDV should be 8
Optional<ColStatistics> result = estimateBucket(8, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(8, result.get().getCountDistint());
}

@Test
public void testZeroBucketsReturnsEmpty() {
Optional<ColStatistics> result = estimateBucket(100, 0);
Assert.assertFalse(result.isPresent());
}

private Optional<ColStatistics> estimateBucket(long sourceNdv, long numBuckets) {
ColStatistics sourceStats = new ColStatistics("col", "int");
sourceStats.setCountDistint(sourceNdv);
ColStatistics numBucketsStats = new ColStatistics("numBuckets", "int");
numBucketsStats.setRange(numBuckets, numBuckets);

StatEstimator estimator = new GenericUDFIcebergBucket().getStatEstimator();
return estimator.estimate(Arrays.asList(sourceStats, numBucketsStats));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, '
insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL);

create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc;
-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter)
explain insert overwrite table tbl_target_identity select a, b from tbl_src;
insert overwrite table tbl_target_identity select a, b from tbl_src;
select * from tbl_target_identity order by a, ccy;

--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc;
-- threshold = 0 (default, cost-based): bucket NDV = min(NDV(b), 3) = 3 < MAX_WRITERS -> no sort (FanoutWriter)
explain insert into table tbl_target_bucket select a, b from tbl_src;
insert into table tbl_target_bucket select a, b from tbl_src;
select * from tbl_target_bucket order by a, ccy;
Expand Down Expand Up @@ -151,4 +153,31 @@ tblproperties ('parquet.compression'='snappy','format-version'='2');

explain insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
select * from tbl_hour_timestamp order by id, date_time_timestamp;
select * from tbl_hour_timestamp order by id, date_time_timestamp;

-- threshold = -1: never sort -> FanoutWriter
set hive.optimize.sort.dynamic.partition.threshold=-1;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 1: always sort -> ClusteredWriter
set hive.optimize.sort.dynamic.partition.threshold=1;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 2: bucket NDV = min(NDV(b), 3) = 3 > 2 -> sort (ClusteredWriter)
set hive.optimize.sort.dynamic.partition.threshold=2;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 100: bucket NDV = min(NDV(b), 3) = 3 <= 100 -> no sort (FanoutWriter)
set hive.optimize.sort.dynamic.partition.threshold=100;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- write.fanout.enabled=false: SerDe forces threshold=1 -> always ClusteredWriter
set hive.optimize.sort.dynamic.partition.threshold=0;
drop table if exists tbl_target_nofanout;
create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc
tblproperties ('write.fanout.enabled'='false');
explain insert into tbl_target_nofanout select a, b from tbl_src;
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_int(
strcol string,
intcol integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_date_transform_year(
bigintcol bigint,
intcol integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_int(
strcol string,
intcol integer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- SORT_QUERY_RESULTS
set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table target_ice(a int, b string, c int) partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

drop table if exists tbl_ice;
Expand Down
Loading
Loading