Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [connector-kafka] In spark engine streaming mode, kafka source can read data, but cannot write to mysql. #8394

Open
2 of 3 tasks
Aiden-Rose opened this issue Dec 27, 2024 · 2 comments
Labels

Comments

@Aiden-Rose
Copy link

Aiden-Rose commented Dec 27, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

In spark engine streaming mode, kafka source can read data, but cannot write to sink.

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  #job.mode = "BATCH"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
source {
  Kafka {
    schema = {
      fields {
        name = "string"
        email = "string"
      }
    }
    format = json
    content_field = "$.data"
    topic = "wkg_topic"
    bootstrap.servers = "xxx:6667,xxx:6667,xxx:6667"
    start_mode = "earliest"
    commit_on_checkpoint=false
    poll.timeout = 10000
    consumer.group = "seatunnel_4"
    kafka.config = {
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
      client.id = client_1
    }
    result_table_name = "fake1"
  }
}
sink {
 Jdbc {
        url = "jdbc:mysql://x x x x x:3306/test_ysp?allowMultiQueries=true&useOldAliasMetadataBehavior=true&useUnicode=false&tinyInt1isBit=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull"
        driver = "com.mysql.jdbc.Driver"
        user = "root"
        password = "root"
        query = "insert into `test_yyy`.`mysql_0201`(country,capital) values(?,?)"
        database = "test_yyy"
        table = "mysql_0201"
        source_table_name = "fake1"
        data_save_mode = "CUSTOM_PROCESSING"
        custom_sql = "TRUNCATE TABLE `test_yyy`.`mysql_0201`"
        batch_size = 1000
        generate_sink_sql=false
        parallelism = 1
        schema_save_mode = IGNORE
    } 
}

Running Command

sh start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config kafka_hive_streaming.conf

Error Exception


2024-12-31 17:18:37,960 INFO  org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter - jsonValue: {"data":{"name":"2","email":"2","phone":"2","job":"2"}}
2024-12-31 17:18:37,960 INFO  org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter - data: {"name":"2","email":"2","phone":"2","job":"2"}
2024-12-31 17:18:37,960 INFO  org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter - ============================ fetch record ============================
2024-12-31 17:18:37,960 INFO  org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter - jsonValue: {"data":[{"name":"+.Colombia [Organization]","email":"[email protected]","phone":"202-662-9292","job":""}]}
2024-12-31 17:18:37,961 INFO  org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter - data: {"phone":"202-662-9292","name":"+.Colombia [Organization]","job":"","email":"[email protected]"}
2024-12-31 17:18:37,993 WARN  com.zaxxer.hikari.HikariConfig - HikariPool-1 - idleTimeout has been set but has no effect because the pool is operating as a fixed size pool.
2024-12-31 17:18:37,993 INFO  com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Starting...
2024-12-31 17:18:38,316 INFO  com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Start completed.
2024-12-31 17:18:38,324 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is:
insert into `test_yyy`.`mysql_0201`(country,capital) values(?,?)



Stuck here and can't write the data.


Question:
![image](https://github.com/user-attachments/assets/b406b336-7872-461d-85a9-e9bc3a89e9a2)
Why not use readStream method?

![image](https://github.com/user-attachments/assets/c7b67b24-92c6-480c-9212-bc3649c2a1e2)
Why is there no place to call this method inject() that returns DataStreamWriter <Row>?


Zeta or Flink or Spark Version

spark 3.2.2

Java or Scala Version

jdk 1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Aiden-Rose Aiden-Rose added the bug label Dec 27, 2024
@zhilinli123
Copy link
Contributor

Hello, has this problem been solved?
@Aiden-Rose

@Aiden-Rose
Copy link
Author

Hello, has this problem been solved? @Aiden-Rose

Not solved yet.

@Aiden-Rose Aiden-Rose reopened this Dec 31, 2024
@Aiden-Rose Aiden-Rose reopened this Dec 31, 2024
@Aiden-Rose Aiden-Rose changed the title [Bug] [connector-kafka] In spark engine streaming mode, kafka source can read data, but cannot write to sink. [Bug] [connector-kafka] In spark engine streaming mode, kafka source can read data, but cannot write to mysql. Dec 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants