Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClassCastException from GenericData$Record after upgrade to org.apache.parquet:parquet-avro:1.14.1 #2975

Open
NathanEckert opened this issue Jul 29, 2024 · 0 comments

Comments

@NathanEckert
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

When upgrading from 1.13.1 to 1.14.1:

java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to org.myorg.parquet.TestUpgradeParquet$CustomParquetRecord

I get a different error when upgrading only to 1.14.0:

java.lang.RuntimeException: shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by default

I manage to reproduce it with the following code:

package org.myorg.parquet;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.Cleanup;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * Test upgrade parquet.
 * <p>Works with org.apache.parquet:parquet-avro:1.13.1 (failure with simplified example, but different stacktrace than the other ones, as it fails later)</p>
 * <p>Fails with org.apache.parquet:parquet-avro:1.14.0 with shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by default: add Module ...</p>
 * <p>Fails with org.apache.parquet:parquet-avro:1.14.1 with java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.activeviam.source.parquet.api.IParquetRecord</p>
 */
class TestUpgradeParquet {

  private static final Path TEMP_FOLDER = Paths.get(System. getProperty("java.io.tmpdir"), "directoryForTest");
  private static final Path TEMP_FILE = Paths.get(TEMP_FOLDER.toString(), "source.parquet");

  private Path parquetFilePath;



  @BeforeEach
  public void createParquet() throws Exception {

    // Cleanup of previous test run
    Files.list(TEMP_FOLDER).forEach(path -> {
      try {
        Files.delete(path);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
    Files.deleteIfExists(TEMP_FOLDER);
    Thread.sleep(Duration.ofSeconds(1));

    // Actually create the folder and file for this test
    Files.createDirectory(TEMP_FOLDER);
    this.parquetFilePath = Files.createFile(TEMP_FILE);


    final Schema schema =
        SchemaBuilder.record("simpleSchema")
            .fields()
            .name("Key")
            .type()
            .intType()
            .noDefault()
            .name("FieldA")
            .type()
            .doubleType()
            .noDefault()
            .endRecord();
    final Record record1 = new Record(schema);
    record1.put(0, 1);
    record1.put(1, 2.0);
    final Record record2 = new Record(schema);
    record2.put(0, 2);
    record2.put(1, 5.0);
    final Collection<Record> recordsToWrite = List.of(record1, record2);
    writeParquetFile(
        this.parquetFilePath,
        schema,
        recordsToWrite,
        new Configuration(),
        CompressionCodecName.UNCOMPRESSED);
  }

  @Test
  void testUpgradeParquet() throws Exception {

    final org.apache.hadoop.fs.Path filepath = new org.apache.hadoop.fs.Path(this.parquetFilePath.toString());
    final Configuration configuration = new Configuration();


    @Cleanup
    final ParquetReader<CustomParquetRecord> parquetReader = ParquetReader
        .builder(new CustomReadSupport(), filepath)
        .withConf(configuration)
        .build();

    final var pivotRecord = parquetReader.read(); // Failure here
    assertThat(pivotRecord).isNotNull();


  }

  private interface CustomParquetRecord extends GenericRecord, SchemaConstructable {
  }

  private static class CustomReadSupport extends AvroReadSupport<CustomParquetRecord> {
    @Override
    public RecordMaterializer<CustomParquetRecord> prepareForRead(
        Configuration configuration,
        Map<String, String> keyValueMetaData,
        MessageType fileSchema,
        ReadContext readContext) {
      final RecordMaterializer<CustomParquetRecord> rMaterializer =
          super.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext);

      return new RecordMaterializer<>() {

        @Override
        public CustomParquetRecord getCurrentRecord() {
          return rMaterializer.getCurrentRecord();
        }

        @Override
        public GroupConverter getRootConverter() {
          // Difference here with actual code,
          // we return a custom root converter
          // It is not included in this reproducer as it would bring way too much code
          return null;
        }
      };
    }
  }

  /**
   * Creates a parquet file with contents.
   *
   * @param filePath filePath where the file is created
   * @param schema schema of the data
   * @param recordsToWrite records to write
   * @param configuration the hadoop configuration to use for the writer
   * @param compressionCodec the codec for the compression scheme to use for the parquet file
   */
  private static void writeParquetFile(
      final Path filePath,
      final Schema schema,
      final Collection<Record> recordsToWrite,
      final Configuration configuration,
      final CompressionCodecName compressionCodec)
      throws IOException, InterruptedException {
    try {
      Files.deleteIfExists(filePath);
    } catch (final NoSuchFileException x) {
      throw new RuntimeException(
          String.format("%s: no such file or directory.", filePath), x);
    } catch (final DirectoryNotEmptyException x) {
      throw new RuntimeException(String.format("%s not empty.", filePath), x);
    } catch (final IOException x) {
      throw new RuntimeException(x);
    }

    final org.apache.hadoop.fs.Path path =
        new org.apache.hadoop.fs.Path(filePath.toAbsolutePath().toString());

    RecordWriter<Void, Record> writer = null;
    try {
      writer =
          new ParquetOutputFormat<Record>(
              new AvroWriteSupport<>(
                  new AvroSchemaConverter(configuration).convert(schema),
                  schema,
                  SpecificData.get()))
              .getRecordWriter(configuration, path, compressionCodec, Mode.CREATE);
      for (final Record record : recordsToWrite) {
        writer.write(null, record);
      }
    } finally {
      if (writer != null) {
        writer.close(null);
      }
    }
  }


}

N-B: This code will throw when using 1.13.1 but it is expected. I didn't wanted to complexify the reproducer with other code.
You will get this error:

org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/directoryForTest/source.parquet

Component(s)

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant