-
Notifications
You must be signed in to change notification settings - Fork 68
[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
KinesisAsyncClient get(); | ||
|
||
/** Closes any resources held by this provider. */ | ||
void close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the interface extend Closeable? Doing so allows for try-with-resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Updated the PR
streamArn, | ||
kinesisClientProperties, | ||
states, | ||
null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have an anonymous function implementation of client provider here which calls buildClient()
instead of providing null and handling null in the other constructor method? This will allow for cleaner code in the other constructor by removing the null handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up refactoring the way client/clientprovider are injected and passed between SinkWriter and Sink which should simplify this much further. PTAL
… for AWS Kinesis Stream sink
6d937c3
to
f6e0587
Compare
try { | ||
kinesisClientProvider.close(); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to close the kinesisClientProvider", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a specific or even generic connector exception which extends RuntimeException that we can use here?
summary.getCount(), | ||
summary.getExampleMessage()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can be implemented as ErrorSummary.toString(). Then, instead of using StringBuilder to construct string, calling toString() on the hashmap should produce a readable string. This way, we'd have less code to maintain.
|
||
// Using a single WARN log with aggregated information provides operational | ||
// visibility into errors without flooding logs in high-throughput scenarios | ||
LOG.warn("KDS Sink failed to write, " + errorSummary.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use full class name here for searchability/debug-ability. e.g. KinesisStreamsSinkWriter failed to write records: ...
Purpose of the change
[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink
Verifying this change
This change added tests and can be verified as follows:
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving)
)