Skip to content

Spark filter pushdown resolves columns against current schema during time-travel, not snapshot schema #16510

@amenck

Description

@amenck

Apache Iceberg version

1.11.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

Spark resolves filter predicate column names against the current table schema instead of the snapshot's schema during time-travel reads. This causes ValidationException: Cannot find field '<old_name>' when filtering on a column that was renamed after the target snapshot.

In the repro below, you can see that SELECT uses the snapshot's schema and succeeds--however, when we add a filter it tries to apply the new schema and starts failing.

import os
import urllib.request

from pyspark.sql import SparkSession, functions as F

ICEBERG_VERSION = "1.11.0"
JAR_NAME = f"iceberg-spark-runtime-3.5_2.12-{ICEBERG_VERSION}.jar"
JAR_PATH = os.path.join("/tmp", JAR_NAME)
if not os.path.exists(JAR_PATH):
    urllib.request.urlretrieve(
        f"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/{ICEBERG_VERSION}/{JAR_NAME}",
        JAR_PATH,
    )

ICEBERG_FORMAT = "org.apache.iceberg.spark.source.IcebergSource"

spark = (
    SparkSession.builder
    .master("local[1]")
    .config("spark.jars", JAR_PATH)
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .getOrCreate()
)

TABLE = "local.db.repro"
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.db")
spark.sql(f"DROP TABLE IF EXISTS {TABLE}")

# Create table with column "col" and insert data
spark.sql(f"CREATE TABLE {TABLE} (id BIGINT, col DOUBLE) USING iceberg")
spark.sql(f"INSERT INTO {TABLE} VALUES (1, 100.0), (2, 200.0), (3, 0.0)")

# Capture snapshot
snapshot_v1 = spark.sql(f"SELECT snapshot_id FROM {TABLE}.snapshots").collect()[-1].snapshot_id

# Rename column and insert more data
spark.sql(f"ALTER TABLE {TABLE} RENAME COLUMN col TO value")
spark.sql(f"INSERT INTO {TABLE} VALUES (4, 400.0)")

# Time-travel read via IcebergSource
df = spark.read.format(ICEBERG_FORMAT).option("snapshot-id", snapshot_v1).load(TABLE)

df.columns       # => ['id', 'col']  -- correct, uses snapshot schema
df.select("col") # => works fine
df.count()       # => 3, works fine

# BUG: filter resolves "col" against current schema (which has "value", not "col")
df.filter(F.col("col") > 0).count()
# => ValidationException: Cannot find field 'col' in struct:
#      struct<1: id: optional long, 2: value: optional double>

Expected behavior

df.filter(F.col("col") > 0) should resolve col against the snapshot's schema (where the column is named col) and succeed, consistent with how df.select("col") and df.columns already work.

Actual behavior

org.apache.iceberg.exceptions.ValidationException: Cannot find field 'col' in struct:
  struct<1: id: optional long, 2: value: optional double>

The error message shows the current schema (with value), not the snapshot's schema (with col).

Environment

  • Iceberg: 1.11.0
  • Spark: 3.5.5
  • Catalog: Hadoop

Initially saw this happening in EMR with an AWS Glue catalog, but was able to get the minimal repro above.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions