Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: java.lang.IllegalArgumentException: Row arity: 2, but serializer arity: 4 #3003

Open
1 of 2 tasks
hezhi3f opened this issue Jul 4, 2024 · 0 comments
Open
1 of 2 tasks
Labels
module:mixed-flink Flink moduel for Mixed Format type:bug Something isn't working

Comments

@hezhi3f
Copy link

hezhi3f commented Jul 4, 2024

What happened?

CREATE CATALOG amoro_catalog WITH (
  'type' = 'arctic',
  'metastore.url' = 'thrift://ip:1260/amoro_catalog'
);

CREATE TABLE ODS_TB_A (
  PK_ID VARCHAR,
  XMMC VARCHAR,
  DWDM VARCHAR,
  XSSX INTEGER,
  FLAG VARCHAR,
  DWQYMC VARCHAR,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

CREATE TABLE ODS_TB_B (
  PK_ID VARCHAR,
  DDRBBH VARCHAR,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

CREATE TABLE ODS_TB_C (
  PK_ID VARCHAR,
  DWDM VARCHAR,
  XMDM INTEGER,
  VAL DOUBLE,
  PRIMARY KEY (PK_ID) NOT ENFORCED
);

INSERT INTO ODS_TB_A (PK_ID, XMMC, DWDM, XSSX, FLAG, DWQYMC) VALUES
('A10001','上海','1001',10,'0','华东'),
('A10002','北京','1002',20,'0','华北'),
('A10003','广州','1003',30,'0','华南');
INSERT INTO ODS_TB_A (PK_ID, XMMC, DWDM, XSSX, FLAG) VALUES
('A10004','华北','AAAA',40,'1'),
('A10005','华东','BBBB',50,'1'),
('A10006','华南','CCCC',60,'1');

INSERT INTO ODS_TB_B (PK_ID, DDRBBH) VALUES
('B10001', '1001-20240110'),
('B10002', '1002-20240110'),
('B10003', '1002-20240110'),
('B10004', '1003-20240612'),
('B10005', '1003-20240612'),
('B10006', '1003-20240612');

INSERT INTO ODS_TB_C (PK_ID,DWDM,XMDM,VAL) VALUES
('C10001','1001',1024,0.4),
('C10001','1001',1025,0.5),
('C10001','1002',1024,0.6),
('C10001','1003',1025,0.7),
('C10001','1003',1024,0.8);


SELECT
  UUID(),
  T2.DWMC AS COL1,
  DWDM AS COL2,
  T3.JHL AS COL3,
  RQ AS COL4
FROM(
  SELECT
    T1.XMMC AS DWMC,
    SUBSTR(T.DDRBBH,1,4) AS DWDM,
    SUBSTR(T.DDRBBH,6,8) AS RQ,
    T1.XSSX
  FROM ODS_TB_B T
  LEFT JOIN ODS_TB_A T1
    ON SUBSTR(T.DDRBBH,1,4) = T1.DWDM
  WHERE T1.FLAG = '1'
  GROUP BY T1.XMMC,SUBSTR(T.DDRBBH,1,4),SUBSTR(T.DDRBBH,6,8),T1.XSSX
) T2
LEFT JOIN(
  SELECT
    T1.DWQYMC AS DWMC,
	SUM(T.VAL) AS JHL
  FROM ODS_TB_C T
  LEFT JOIN ODS_TB_A T1
    ON T.DWDM = T1.DWDM
  WHERE T.XMDM = 1024
  GROUP BY T1.DWQYMC
)T3 ON TRIM(T2.DWMC) = TRIM(T3.DWMC);

I found an issue using MixedIceberg.
This SQL statement is fine when executing a subquery separately.
However, after executing a join, the following problem occurs: there is an ODS_TB_A in the subquery.
If this table is copied again and one of the subqueries uses ODS_TB_A_COPY, this problem will not occur.
Whether LogStore is used or not, this problem will occur.
Excuse me, have you ever encountered a similar problem? Is there any solution?

There was no error using regular Flink or Amoro Trino

2024-07-04 15:21:12
java.lang.IllegalArgumentException: Row arity: 2, but serializer arity: 4
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
	at org.apache.amoro.flink.read.hybrid.reader.MixedFormatRecordEmitter.emitRecord(MixedFormatRecordEmitter.java:56)
	at org.apache.amoro.flink.read.hybrid.reader.MixedFormatRecordEmitter.emitRecord(MixedFormatRecordEmitter.java:36)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
	at org.apache.amoro.flink.read.hybrid.reader.MixedFormatSourceReader.pollNext(MixedFormatSourceReader.java:125)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
	at org.apache.amoro.flink.read.hybrid.reader.MixedFormatSourceReader.pollNext(MixedFormatSourceReader.java:125)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.amoro.shade.org.apache.iceberg.exceptions.ValidationException: Cannot find field 'FLAG' in struct: struct<3: DWDM: optional string, 6: DWQYMC: optional string, 1: PK_ID: required string>
	at org.apache.amoro.shade.org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:45)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:26)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:111)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:159)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:118)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:347)
	at org.apache.amoro.shade.org.apache.iceberg.expressions.Binder.bind(Binder.java:60)
	at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetMetricsRowGroupFilter.<init>(AdaptHiveParquetMetricsRowGroupFilter.java:67)
	at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.<init>(AdaptHiveReadConf.java:109)
	at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.init(AdaptHiveParquetReader.java:77)
	at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:98)
	at org.apache.amoro.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:42)
	at org.apache.amoro.shade.org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
	at org.apache.amoro.shade.org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
	at org.apache.amoro.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:39)
	at org.apache.amoro.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:27)
	at org.apache.amoro.shade.org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
	at org.apache.amoro.shade.org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
	at org.apache.amoro.table.TableMetaStore.call(TableMetaStore.java:234)
	at org.apache.amoro.table.TableMetaStore.lambda$doAs$0(TableMetaStore.java:209)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:360)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
	at org.apache.amoro.table.TableMetaStore.doAs(TableMetaStore.java:209)
	at org.apache.amoro.io.AuthenticatedHadoopFileIO.doAs(AuthenticatedHadoopFileIO.java:202)
	at org.apache.amoro.flink.read.source.FlinkUnkyedDataReader.open(FlinkUnkyedDataReader.java:126)
	at org.apache.amoro.flink.read.source.DataIterator.openTaskIterator(DataIterator.java:154)
	at org.apache.amoro.flink.read.source.DataIterator.updateCurrentIterator(DataIterator.java:144)
	at org.apache.amoro.flink.read.source.DataIterator.seek(DataIterator.java:103)
	at org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:54)
	at org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:30)
	at org.apache.amoro.flink.read.hybrid.reader.HybridSplitReader.fetch(HybridSplitReader.java:68)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more

Affects Versions

amoro-0.6.1,Flink 1.17.2

What table formats are you seeing the problem on?

Mixed-Iceberg

What engines are you seeing the problem on?

Flink

How to reproduce

No response

Relevant log output

No response

Anything else

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

  • I agree to follow this project's Code of Conduct
@hezhi3f hezhi3f added the type:bug Something isn't working label Jul 4, 2024
@czy006 czy006 added the module:mixed-flink Flink moduel for Mixed Format label Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module:mixed-flink Flink moduel for Mixed Format type:bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants