Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource usage limits via configuration - Tasks crashing due to high load #307

Open
mkherlakian opened this issue Aug 13, 2022 · 0 comments

Comments

@mkherlakian
Copy link

I'm trying to sink some pretty large topics from Kafka (5 topics with about 250 million events each) into BigQuery via a separate (rather large - 8CPU, 32Gb RAM X3) Kafka Connect cluster. It starts up fine but after about 2 minutes, the connect instance CPUs are pegged at 100%, and the nodes start disconnecting - ultimately the whole process restarts with little progress on getting any data into BigQuery.

I tried that configuration in a replica of our environment with many less events (500,000) and it works fine.

Are there any configurations that can throttle the processing of events to keep the CPU in check? I tried tuning queueSize and threadPoolSize, as well as max.queue.size and max.batch.size to no avail.

Any hint/help would be very much appreciated!

Here's our config for reference:

{
  "name": "hd-sink-bq",
  "tasks.max": "3",

  "queueSize": 20000,
  "threadPoolSize": 2,

  "topics": "topic1,topic2,topic3,topic4,topic5",
  "sanitizeTopics": "true",

  "autoCreateTables": "true",

  "timestampPartitionFieldName": "created_at",

  "max.queue.size": "81290",
  "max.batch.size": "20480",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "<REGISTRY_URL>",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "<REGISTRY_URL>",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",

  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "defaultDataset": "data_lake",
  "allowNewBigQueryFields": "true",
  "bigQueryPartitionDecorator": "false",
  "project": "<PROJECT>",
  "keySource": "JSON",
  "keyfile": "<JSON_STRINGIFIED_KEY>",
  "timePartitioningType": "DAY",
  "upsertEnabled": true,
  "kafkaKeyFieldName": "_kid",

  "transforms": "removeEventRequestData,removeResponseData",

  "transforms.removeEventRequestData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.removeEventRequestData.blacklist": "headers,body,path,query",
  "transforms.removeEventRequestData.predicate": "isEventRequest",

  "transforms.removeResponseData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.removeResponseData.blacklist": "body",
  "transforms.removeResponseData.predicate": "isAttemptResponse",

  "predicates": "isEventRequest,isAttemptResponse",
  "predicates.isEventRequest.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isEventRequest.pattern": "topic1",

  "predicates.isAttemptResponse.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isAttemptResponse.pattern": "topic2",

  "errors.deadletterqueue.topic.replication.factor": "1",
  "errors.log.include.messages": "true",
  "errors.tolerance": "all",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.name": "connect.bq-sink.deadletter"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant