-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close #8900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close #8900
Conversation
@@ -522,7 +522,7 @@ private void close(final boolean clean) { | |||
if (clean && commitNeeded) { | |||
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to" | |||
+ " commit and should close as dirty instead"); | |||
throw new StreamsException("Tried to close dirty task as clean"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a sort-of bug: because we don't close things during handleRevocation
, we want to make sure the TM will close this as dirty during handleAssignment
. So we throw this just to force it to call closeDirty
-- but it wasn't necessarily a fatal exception that caused commit to fail, so we should just throw TaskMigrated here.
That said, it doesn't really matter since the ConsumerCoordinator will save and rethrow only the first exception, which is the handleRevocation
exception. Anything we throw in handleAssignment
is "lost" -- but we should do the right thing anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is worthy to be a comment on the code itself :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
@@ -267,7 +283,17 @@ public void close() { | |||
|
|||
private void checkForException() { | |||
if (sendException != null) { | |||
throw sendException; | |||
if (sendException.getCause() instanceof KafkaException | |||
&& sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we avoid passing this exception to ProductionExceptionHandler
as it never breaks sent now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not add this handling for now based on the conclusion that after one task caused abortTxn
is called, no other tasks should ever call recordCollector#flush/send/close anymore right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current code, we might still need to close tasks, right? If a TX is aborted, we need to "reset" all active tasks accordingly and this would imply, closing and reviving them? And while closing we would call checkForException
and crash without this guard?
What makes we wonder, if we should actually checkForException
in closeDirty()
above? If a TX is aborted, and we closeDirty and don't call checkForException
it seems we don't need this guard? (In general, this guard seems a little bit hacky and it would be great if we could avoid it IMHO.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I think we can avoid checkForException
in closeDirty
, and if that is sufficient we can remove the exception handling based on error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds right, we should never see this exception outside of closeDirty
since we should then close all tasks dirty if the transaction is aborted.
But as for whether to check it in closeDirty
, I think we would need to at the very least check it so we can reset the exception afterwards. Or do you think it's "safe" to just blindly reset the exception in the case of a dirty close, no matter what it was?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, that we should never hit this exception during closeClean, that is, we should never call closeClean on a task/record collector that was part of an aborted transaction.
Agreed.
I was hoping we don't need this hack at all? Why do we want to call checkForException
in close dirty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, to still log the error message while swallowing it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, we can do that, but we only get the first exception back anyway. -- And for this, we also don't need the "hack" to check for a specific exception. In closeDirty()
we just do
try {
checkForException():
} catch (final RuntimeException logAndSwallow) {
log.error(...);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we revive a task, we don't recreate the record collector AFAICT. So there may still be a sendException
hanging around even after we close
the record collector. If this was a truly-fatal exception, we'll check and throw it. But we shouldn't rethrow this particular non-fatal exception. Therefore, we need to check for it and reset the sendException
iff we find this exact exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about clearing the exception in checkForException
instead:
private void checkForException() {
if (sendException != null) {
final KafkaException rethrow = sendException;
sendException = null;
throw rethrow;
}
}
We need to fix sendException
to make it thread safe though.
17430e4
to
a28986f
Compare
@@ -267,7 +283,17 @@ public void close() { | |||
|
|||
private void checkForException() { | |||
if (sendException != null) { | |||
throw sendException; | |||
if (sendException.getCause() instanceof KafkaException | |||
&& sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not add this handling for now based on the conclusion that after one task caused abortTxn
is called, no other tasks should ever call recordCollector#flush/send/close anymore right?
@@ -522,7 +522,7 @@ private void close(final boolean clean) { | |||
if (clean && commitNeeded) { | |||
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to" | |||
+ " commit and should close as dirty instead"); | |||
throw new StreamsException("Tried to close dirty task as clean"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is worthy to be a comment on the code itself :)
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
if (!filterActive(tasksToCloseDirty).isEmpty()) { | ||
tasksToCloseClean.removeAll(filterActive(tasksToCommit)); | ||
tasksToCommit.removeAll(filterActive(tasksToCommit)); | ||
tasksToCloseDirty.addAll(activeTaskIterable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only add active tasks here, not standby tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standbys done affect the TX-producer and thus they can be closed dirty without side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean they can be closed clean
right? In that case I'd agree :) Realized that we are still closing standby as clean if one of the active task is causing all other active tasks to be closed dirty. The looping over all tasks above is a bit confusing to me.
Maybe a subjective, nit suggestion here is to first loop over active, and then loop over standby and in the second loop we do not need the one-spoils-all logic anymore. Although it is a bit duplicated code it would make logic cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean they can be closed clean right? In that case I'd agree :)
I guess :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will refactor a bit to clarify the active/standby handling
@@ -267,7 +283,17 @@ public void close() { | |||
|
|||
private void checkForException() { | |||
if (sendException != null) { | |||
throw sendException; | |||
if (sendException.getCause() instanceof KafkaException | |||
&& sendException.getCause().getMessage().equals("Failing batch since transaction was aborted")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current code, we might still need to close tasks, right? If a TX is aborted, we need to "reset" all active tasks accordingly and this would imply, closing and reviving them? And while closing we would call checkForException
and crash without this guard?
What makes we wonder, if we should actually checkForException
in closeDirty()
above? If a TX is aborted, and we closeDirty and don't call checkForException
it seems we don't need this guard? (In general, this guard seems a little bit hacky and it would be great if we could avoid it IMHO.)
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
if (!filterActive(tasksToCloseDirty).isEmpty()) { | ||
tasksToCloseClean.removeAll(filterActive(tasksToCommit)); | ||
tasksToCommit.removeAll(filterActive(tasksToCommit)); | ||
tasksToCloseDirty.addAll(activeTaskIterable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standbys done affect the TX-producer and thus they can be closed dirty without side effect.
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
Outdated
Show resolved
Hide resolved
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(0)); | ||
// Sometimes the store happens to have already been closed sometime during startup, so just keep track | ||
// of where it started and make sure it doesn't happen more times from there | ||
final int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Saw this fail locally so just did a minor flaky test fix on the side
@@ -474,6 +492,7 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn | |||
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.") | |||
); | |||
|
|||
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests used to rely on the fact that the sendException
was never forgotten by just setting it once and then asserting that multiple subsequent calls also threw it. So now we need to call send
before each to re-insert the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For those particular test, considering their names, it seem the tests are void now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misinterpreted this, but I took the OnSubsequentCall
in the name to mean that it would throw on the next (ie subsequent) call after the send, not that it would continue to throw on all subsequent calls. ie I think it should actually be several different tests (one for each "call" that should throw) but got mashed into just one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I should just break these up into different tests then, huh. Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure how OnSubsequentCall
is meant either. But what you say seems to make sense and thus it should be different test. Thanks for the extra mile splitting them up!
f58d400
to
4185c54
Compare
test this |
One unrelated failure: |
// for this host then just pick the first metadata | ||
if (thisHost == UNKNOWN_HOST) { | ||
if (thisHost.equals(UNKNOWN_HOST)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fix this in older branches (2.5/2.4), too? (ie follow up PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
for (final Task task : activeTaskIterable()) { | ||
executeAndMaybeSwallow( | ||
clean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If tasksToCloseDirty
is not empty, should we close dirty, too, ie pass in clean && tasksToCloseDirty.isEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is more in line with the general code flow elsewhere. Note that if we started out clean but became dirty and had to close some tasks as such, we would have already caught an exception somewhere. So AtomicReference#compareAndSet
would be a no-op, and it actually doesn't matter what we pass in here
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @ableegoldman!
LGTM.
Retest this please. |
LGTM! I will merge after green jenkins. |
…tion during clean close (#8900) If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous send. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: KAFKA-10169) The reason we ended up aborting an un-flushed transaction was due to the combination of a. always aborting the ongoing transaction when any task is closed/revoked b. only committing (and flushing) if at least one of the revoked tasks needs to be committed (regardless of whether any non-revoked tasks have data/transaction in flight) Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked. We can handle this by splitting the RecordCollector's close into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed Note that this means we still abort the transaction any time a task is closed dirty, so we must close/reinitialize any active task with pending data (that was aborted). In sum: * hackily check the KafkaException message and swallow * only abort the transaction during a dirty close * refactor shutdown to make sure we don't closeClean a task whose data was actually aborted Reviewers: Chia-Ping Tsai <[email protected]>, Boyang Chen <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
Merged to trunk and cherry-picked to 2.6 |
* 'trunk' of github.com:apache/kafka: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close (apache#8900) KAFKA-10126:Add a warning message for ConsumerPerformance (apache#8845) KAFKA-9439: add KafkaProducer API unit tests (apache#8174) MINOR: correct the doc of transaction.timeout.ms (apache#8901) KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (apache#8887)
If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous
send
. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: KAFKA-10169)The reason we ended up aborting an un-flushed transaction was due to the combination of
a. always aborting the ongoing transaction when any task is closed/revoked
b. only committing (and flushing) if at least one of the revoked tasks needs to be committed (regardless of whether any non-revoked tasks have data/transaction in flight)
Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked.
We can handle this by splitting the RecordCollector's
close
into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closedNote that this means we still abort the transaction any time a task is closed dirty, so we must close/reinitialize any active task with pending data (that was aborted).
In sum:
closeClean
a task whose data was actually aborted