Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp conversion issue #346

Open
CHarnel opened this issue Aug 1, 2023 · 0 comments
Open

Timestamp conversion issue #346

CHarnel opened this issue Aug 1, 2023 · 0 comments

Comments

@CHarnel
Copy link

CHarnel commented Aug 1, 2023

I have an issue similar to wepay#267.

I've installed BQ sink on a Confluent Cloud, managed cluster.
I have a simple setup with Input Kafka record value format = JSON. I had to define the schema in BQ though, because it wouldn't work otherwise.
For some reason, it adds precision to my data - the field timestamp in the original message has the value of 1690825888386. In the error, it says the value it tried to insert is 1690825888385999872.

Here's my advanced config (mostly defaults):

Field Value
Input Kafka record key format STRING
Max poll interval(ms) 300000
Partitioning type INGESTION_TIME
Max poll records 500
Auto create tables false
Auto update schemas false
Sanitize topics true
Sanitize field names false
Time partitioning type DAY
Allow schema unionization false
All bq fields nullable false
Convert double special values false

Here is the exception:

connect.errors.exception.message:
table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=test_dataset, tableId=analytics}} insertion failed for the following rows:
[row index 0] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872
[row index 1] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872

connect.errors.exception.stacktrace:
com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=test_dataset, tableId=analytics}} insertion failed for the following rows:
[row index 0] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872
[row index 1] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872
at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.filterAndSendRecordsToDLQ(BigQueryWriter.java:248)
at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:130)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)\n
Original message
{
  "eventType": "testEvent",
  "deviceId": "26d810f9-966a-4ea9-b17d-6a2123ad1695",
  "playerId": "b83984a5-8cc1-4beb-904f-36719b2a7291",
  "sessionId": "ac9591fe-1604-4105-9bb1-6ba83f1a4a47",
  "clientVersion": "v1.2",
  "timestamp": 1690825888386,
  "eventParams": null
}

Full headers from the DLQ
[
  {
    "key": "__connect.errors.topic",
    "value": "analytics"
  },
  {
    "key": "__connect.errors.partition",
    "value": "2"
  },
  {
    "key": "__connect.errors.offset",
    "value": "18"
  },
  {
    "key": "__connect.errors.connector.name",
    "value": "lcc-dgzo3y"
  },
  {
    "key": "__connect.errors.task.id",
    "value": "0"
  },
  {
    "key": "__connect.errors.stage",
    "value": "TASK_PUT"
  },
  {
    "key": "__connect.errors.class.name",
    "value": "org.apache.kafka.connect.sink.SinkTask"
  },
  {
    "key": "__connect.errors.exception.class.name",
    "value": "com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException"
  },
  {
    "key": "__connect.errors.exception.message",
    "value": "table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=test_dataset, tableId=analytics}} insertion failed for the following rows:\n\t[row index 0] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872\n\t[row index 1] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872"
  },
  {
    "key": "__connect.errors.exception.stacktrace",
    "value": "com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=test_dataset, tableId=analytics}} insertion failed for the following rows:\n\t[row index 0] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872\n\t[row index 1] (location timestamp, reason: invalid): Timestamp field value is out of range: 1690825888385999872\n\tat com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.filterAndSendRecordsToDLQ(BigQueryWriter.java:248)\n\tat com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:130)\n\tat com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:93)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
  }
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant