Skip to content

KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory #8962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

ableegoldman
Copy link
Member

@ableegoldman ableegoldman commented Jun 30, 2020

Two more edge cases I found producing extra TaskcorruptedException while playing around with the failing eos-beta upgrade test (sadly these are unrelated problems, as the test still fails with these fixes in place).

  1. Need to write the checkpoint when recycling a standby: although we do preserve the changelog offsets when recycling a task, and should therefore write the offsets when the new task is itself closed, we do NOT write the checkpoint for uninitialized tasks. So if the new task is ultimately closed before it gets out of the CREATED state, the offsets will not be written and we can get a TaskCorruptedException
  2. With the change in task locking to address some Windows-related nonsense (am I remembering that correctly?), we don't delete entire task directories but just clear the inner state. With EOS, during initialization we check if the state directory is non-empty and the checkpoint is missing, and throw a TaskCorrupted if so. But just opening a rocksdb store creates a rocksdb base dir in the task directory, so the taskDirIsEmpty check always fails and we always throw TaskCorrupted even if there's nothing there. We can fix this for rocksdb (and custom stores) by searching through the task directory for any actual contents: we just do a BFS looking for any file that isn't itself a directory or sst file specifically if rocksdb

Note: fix 2 is not perfect but it helps. It's not a correctness issue, just an annoyance.

@vvcephei
Copy link
Contributor

Test this please

for (final File file : files) {
if (file.isDirectory()) {
subDirectories.offer(file);
} else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is admittedly quite hacky, and of course does not solve the problem for custom state stores that might write some non-data files upon open. A "better" fix would probably be to write some sentinel value in the checkpoint ala OFFSET_UNKNOWN, so we do have an entry in there if the store was opened but does not yet have any data.

But, I wanted to keep things simple (a very relative term here, I know) and low-risk before the 2.6 release. We can discuss better solutions once we're not at the doorstep of the release (and blocking the door, I might add)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. What's the impact of the exception that we're avoiding here? It might be better to just not do anything right now than to introduce assumptions about the implementation of the default persistent store implementation here. If those assumptions become false later, it could be pretty bad.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I did add a unit test with a real RocksDB store to verify, how could any change possibly slip past us 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah: the extra TaskCorruptedException shouldn't have any correctness implications, and I don't think it will have any bad side effects at all except for confusing ourselves and users. If this seems too risky then we can absolutely take it out. It's just annoying since we pretty much always hit this on startup.

For example in the EosBetaUpgradeIntegrationTest (where I noticed this), we start a second client who then gets a standby while the first client revokes the active task. The second client initializes the standby but doesn't get to processing any data before the followup rebalance is triggered, where it receives the active task. So basically every task hits TaskCorruptedException. Of course this is an edge case where we happen to be within the acceptable recovery lag

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I discussed offline with Guozhang, we should go forward with the "write sentinel values for unknown offset" fix since it will also be needed for other work soon in 2.7. I'll remove this hacky emptiness check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good. Thanks!

@abbccdda
Copy link
Contributor

abbccdda commented Jul 1, 2020

Test this please

@abbccdda
Copy link
Contributor

abbccdda commented Jul 1, 2020

test this please

@vvcephei
Copy link
Contributor

vvcephei commented Jul 1, 2020

Looks like there are related failures:

org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked
org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay
org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldReportDirectoryEmpty
org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldOnlyListNonEmptyTaskDirectories
java.lang.AssertionError: expected:<[/tmp/kafka-4l8dk/applicationId/0_0, /tmp/kafka-4l8dk/applicationId/1_0, /tmp/kafka-4l8dk/applicationId/2_0]> but was:<[]>
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:120)
	at org.junit.Assert.assertEquals(Assert.java:146)
	at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked(StateDirectoryTest.java:310)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
	at java.base/java.lang.Thread.run(Thread.java:830)

@vvcephei
Copy link
Contributor

vvcephei commented Jul 1, 2020

Test this please

1 similar comment
@vvcephei
Copy link
Contributor

vvcephei commented Jul 1, 2020

Test this please

@vvcephei
Copy link
Contributor

vvcephei commented Jul 1, 2020

Retest this please

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman !

for (final File file : files) {
if (file.isDirectory()) {
subDirectories.offer(file);
} else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. What's the impact of the exception that we're avoiding here? It might be better to just not do anything right now than to introduce assumptions about the implementation of the default persistent store implementation here. If those assumptions become false later, it could be pretty bad.

@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
if (oldTask.isActive()) {
final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id());
newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions);
cleanUpTaskProducer(oldTask, taskCloseExceptions);
} else {
oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes me a bit twitchy. Can we assert it instead? I mention it because I assume that active tasks also don't need to be committed because it should have happened already. Can we assert that as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, we do assert/enforce this, but in the StreamTask and not in the TaskManager. At this point the TaskManager is actually completely agnostic to task state* and all assertions and branching based on state is internal to the Task implementation.

*except for in handleRevocation, where I just noticed we still filter the commit based on state, which is now unnecessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your words sound good to me, but they are directly in contradiction to the code here, which skips suspending and committing active tasks because it assumes they have already happened. In other words, there is an assumption here that the active tasks are in some kind of "committed" state, while the standbys are in either "created" or "running".

If we're going to have a branch that explicitly assumes the task is already committed, then I'd like to verify it, otherwise experience says it will become false after refactoring, and we'd wind up trying to track down an IllegalStateException later on.

On the other hand, if these transitions are idempotent, we can just include them in both branches (or rather move them outside the conditional block).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While suspend is technically idempotent, the safer thing is actually to not also call it for active tasks here. But let me back up a bit and try to further clarify my earlier response. I realize I made this claim without any hint as to what specifically I was referring to:

By we do assert/enforce this, but in the StreamTask I meant that if we try to close a task without first suspending it, we will get an IllegalStateException in the Task#close method. The TaskManager is responsible for "managing" the operations on the task, but the task is ultimately responsible for verifying that its lifecycle makes sense and its state/transitions are valid. If we just blindly suspend everything here, it seems worse than getting an IllegalStateException because we lose the check that the task was prepared for close (ie suspended + committed) earlier during handleRevocation.

I'd like to verify it, otherwise experience says it will become false after refactoring, and we'd wind up trying to track down an IllegalStateException later on

On that note, I'm slightly confused by your proposal: what do you mean by "verify" if not to throw an IllegalStateException if it's not what's expected? Or do you just mean we'd have to track down an IllegalStateException slightly farther from the root cause

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the commit case is trickier than the suspend as it's possible we actually don't commit a task before closing it, if the commit or any commit preparation fails. So we can't just assert that the task is committed.

We can assert that we attempted to commit it by keeping track of tasks we tried to commit/revoke between handleRevocation and handleAssignment, but I'm quite confident that would quickly get out of control.
We could introduce a #commitAttempted method on the Task but that also seems to invite bugs
We could leave it up to the Task to make sure everything is done safely during the close procedure. What the task currently does is verify that no commit is needed if a clean close is attempted -- if we try to close clean but a commit is still needed, it means the commit failed, and we can throw an exception to force the TM to closeDirty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @vvcephei 's confusion comes from a change that we now require all tasks to be transited to suspended before transiting to close: previously, we allow e.g. a running task to be closed immediately and inside the task itself the logic actually did the "running -> suspend -> close" logic, i.e. it is totally agnostic to the TM. In a refactoring with eos-beta we changed it. So now the responsibility is kinda split between the two: TM needs to make sure the transition is valid and the task verifies it. By doing this we avoided the "pre-/post-" functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, thanks for the discussion.

To start at the beginning, yes, I was advocating for throwing an IllegalStateException at the earliest possible moment when we can detect the illegal state. Right here in the code, we are making an invalid assumption. Namely, that if a task to be recycled is active, then it has already been suspended and committed, and if it is a standby, then it still needs to be suspended and committed. Why should this be true? Because some other code deep inside another set of nested conditionals thirty lines above looks like it does that right now? Experience says that this situation will not survive refactoring. We cannot test every branch, so we need to make assertions about the state being valid when it's in doubt.

We could make an argument that if this assumption becomes incorrect, than we'll throw an exception later on before any state becomes corrupted, which would be good. We could make a stronger argument that the exception we throw later on will be perfectly crystal clear about the cause and therefore we won't spend weeks poking around a flaky test or a user bug report trying to figure out what happened. But both of those arguments would depend on even further assumptions about stuff that may or may not happen elsewhere in the code base. The best thing to do at all times is validate potentially dangerous assumptions. This looks very much like a potentially dangerous assumption. I'm asking that we validate it.

Copy link
Member Author

@ableegoldman ableegoldman Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't hold up this PR on the point, but I just don't see how checking the state of the task here and throwing an exception if it's not in suspended at this random point in the TM code is any more clear than checking the state in the task and throwing an exception if it's not in suspended in Task#close

The latter makes it very clear what the problem is -- tasks need to be suspended before they are closed, end of story. The former seems to enforce suspension at an arbitrary point in the TM.

Why should this be true? Because some other code deep inside another set of nested conditionals thirty lines above looks like it does that right now

This makes me wonder if I'm misinterpreting you, though, at least in part. Are you saying that it's fine to make this assumption thirty lines up when dealing with tasks that are actually being closed, but it's not ok to make this assumption here specifically when we are recycling tasks? Or are you saying we should add this check in both places where we make this assumption in TM#handleAssignment, and referring to where we do the suspension by "thirty lines up" (it's technically down, so maybe I'm being too literal here)

I guess my point is just that I don't agree with the claim that we're making a "potentially dangerous assumption". We do check the assumption very directly when closing the task.

Copy link
Contributor

@guozhangwang guozhangwang Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey folks, taking a step back here regarding this, here are a few different principles we can exercise regarding the class hierarchy:

  1. TM to be totally agnostic to the task's state, and the task's method would transit multiple state if necessary: e.g. if TM calls close on a running task, then task would first transit to suspended, and then to closed; if TM calls suspend on a suspended task, then it would be a no-op. In this case, then here TM could just blindly call close or suspend for closing / recycling tasks, and the Task's methods would handle state transition.

This would be ideal, but the current issue is that we need to commit post suspending a task today (i.e. handleRevocation would commit those suspended active tasks, while handleAssignment would commit those suspended standby tasks), and hence if we blindly suspend and commit a task, we may unnecessarily double committing a suspended task.

Right now to work around this issue, we choose to:

  1. Letting TM to be aware of the task's state transition rules and tries to obey it. Similarly, we can also let TM to check the task's state, and then call suspend and commit conditionally -- at the moment, only standby tasks would not be suspended, so we would effectively end up the same logic as to only suspend standby tasks. So either checking task.isActive or task.isSuspended would work the same.

Personally I think if we want the TM to be purely agnostic to task state transition, an alternative approach could be, that we make preCommit / postCommit to be "idempotent" as well, e.g. if the task remembers since last preCommit we do not have processed any records, then a second call to preCommit would be a no-op and returns empty map, and similarly postCommit can also be a no-op if there's no processed records since its last checkpointing. And then in TM, we can blindly call:

task.suspend();
aggregatOffsets(task.prepreCommit());
if (!offsetsMap.isEmpty()) call sendOffsets();
postOffsets(task.postCommit());

if (isActive()) {
   recycle as standby;
} else {
   recycle as active;
}

And similarly for tasksToClose we can blindly call suspend / preCommit / postCommit first. WDYT?

Anyways, I think this discussion can continue in a follow-up PR (maybe I can incorporate whatever we've agreed on in the decoupling flushing/committing PR) and we can merge this one as-is.

Copy link
Contributor

@vvcephei vvcephei Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, all!

Thanks for adding the check, @ableegoldman . To answer your question, I do think there may be some kind of misunderstanding. Almost certainly, I failed to make myself clear.

I agree that the reason we enforce the valid transitions inside the task is so that the TM doesn't have to check every little thing. For example, that we call suspend in one place and close in another is not a big deal. As you pointed out, if we failed to call suspend, then the close call would let us know.

However, in this case, we are checking something. But what we're checking is only vaguely related to what we do next. In other words, if (! isSuspended() ) suspend() makes way more sense than if ( isStandby() ) suspend(). How are we supposed to know that standby == not suspended at line 275 of a 700-line class? At least, with the log you added, if any quadrant of the implication is false, we'll have an error log telling us where it went wrong.

Note, I think I was especially uneasy because we're now also committing the task in this block, and "committed" or "not committed" isn't a checked state, which is how we wound up with the subtle bug that you're fixing here. I think your fix is fine; I just felt the pre-existing structure of the code needed improvement.

And thanks for the alternative proposals, @guozhangwang . I agree that either one of them would resolve my concern, and that we can take care of it in a follow-on PR.

@guozhangwang
Copy link
Contributor

  • Need to write the checkpoint when recycling a standby: although we do preserve the changelog offsets when recycling a task, and should therefore write the offsets when the new task is itself closed, we do NOT write the checkpoint for uninitialized tasks. So if the new task is ultimately closed before it gets out of the CREATED state, the offsets will not be written and we can get a TaskCorruptedException

If we do not gets out of the CREATED state, it means we do not call registerStateStores yet to load the offsets, and the existing checkpoint file has not be deleted right?

@guozhangwang
Copy link
Contributor

2. But just opening a rocksdb store creates a rocksdb base dir in the task directory, so the taskDirIsEmpty check always fails and we always throw TaskCorrupted even if there's nothing there.

Hmm, but in registerStateStores we first check stateDirectory.directoryForTaskIsEmpty and then only after that we call stateMgr.registerStateStores so the rocksDB.open should be called after the check. Right?

@ableegoldman
Copy link
Member Author

  1. The problem occurs when the original standby task does reach RUNNING and delete the checkpoint, then gets recycled to an active task. If the new active task doesn't get out of CREATED, it won't write the known offsets, but the checkpoint was deleted back when it was a standby.
  2. We only pass stateDirectory.directoryForTaskIsEmpty in to initializeOffsetsFromCheckpoint, not registerStateStores -- and even then we only use it to determine whether to throw TaskCorruptedException if we're missing any offsets in the checkpoint


// Pass in a sentinel value to checkpoint when the changelog offset is not yet initialized/known
private long checkpointableOffsetFromChangelogOffset(final Long offset) {
return offset != null ? offset : OFFSET_UNKNOWN;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little awkward, and it might be cleaner to just replace the use of null with this OFFSET_UNKNOWN sentinel throughout the ProcessorStateManager/StoreChangelogReader -- but, I wanted to keep the changes as short and simple as possible for now

@@ -91,20 +94,38 @@ public void shouldSkipNegativeOffsetsDuringRead() throws IOException {
offsets.put(new TopicPartition(topic, 0), -1L);

writeVersion0(offsets, file);
assertTrue(checkpoint.read().isEmpty());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of awkward, we forgot actually read the checkpoint here 🤦‍♀️

@guozhangwang
Copy link
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
if (oldTask.isActive()) {
final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id());
newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions);
cleanUpTaskProducer(oldTask, taskCloseExceptions);
} else {
oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @vvcephei 's confusion comes from a change that we now require all tasks to be transited to suspended before transiting to close: previously, we allow e.g. a running task to be closed immediately and inside the task itself the logic actually did the "running -> suspend -> close" logic, i.e. it is totally agnostic to the TM. In a refactoring with eos-beta we changed it. So now the responsibility is kinda split between the two: TM needs to make sure the transition is valid and the task verifies it. By doing this we avoided the "pre-/post-" functions.

@guozhangwang
Copy link
Contributor

test this

@guozhangwang
Copy link
Contributor

@ableegoldman My understanding is that with this PR the integration test could still be flaky but should not fail consistently, I saw the last pass still fail on all cases and hence I'm re-triggering. Otherwise the code LGTM.

@guozhangwang
Copy link
Contributor

I've execute the eos upgrade integration test on top of this branch but it still fails consistently.

@ableegoldman
Copy link
Member Author

@guozhangwang I don't think this PR has anything to do with the flaky EosBetaUpgradeIntegrationTest. It might make it worse, not sure, but it definitely won't fix it without #8963

@vvcephei
Copy link
Contributor

vvcephei commented Jul 6, 2020

Test this please

2 similar comments
@vvcephei
Copy link
Contributor

vvcephei commented Jul 6, 2020

Test this please

@vvcephei
Copy link
Contributor

vvcephei commented Jul 6, 2020

Test this please

@vvcephei
Copy link
Contributor

vvcephei commented Jul 6, 2020

Retest this please

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman ! LGTM.

@guozhangwang guozhangwang merged commit cdf68a4 into apache:trunk Jul 7, 2020
guozhangwang pushed a commit that referenced this pull request Jul 7, 2020
…se directory (#8962)

Two more edge cases I found producing extra TaskcorruptedException while playing around with the failing eos-beta upgrade test (sadly these are unrelated problems, as the test still fails with these fixes in place).

* Need to write the checkpoint when recycling a standby: although we do preserve the changelog offsets when recycling a task, and should therefore write the offsets when the new task is itself closed, we do NOT write the checkpoint for uninitialized tasks. So if the new task is ultimately closed before it gets out of the CREATED state, the offsets will not be written and we can get a TaskCorruptedException
* We do not write the checkpoint file if the current offset map is empty; however for eos the checkpoint file is not only used for restoration but also for clean shutdown. Although skipping a dummy checkpoint file does not actually violate any correctness since we are going to re-bootstrap from the log-start-offset anyways, it throws unnecessary TaskCorruptedException which has an overhead itself.

Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]>
@guozhangwang
Copy link
Contributor

Cherry-picked to 2.6

@ableegoldman ableegoldman deleted the 10166-checkpoint-recycled-standbys-and-ignore-rocksdb-dir branch July 7, 2020 19:12
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Jul 10, 2020
* 'trunk' of github.com:apache/kafka: (24 commits)
  KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores (apache#8996)
  MINOR: Restore stream-table duality description (apache#8995)
  MINOR: Create ChannelBuilder for each connection in ConnectionStressWorker workload
  KAFKA-10179: Pass correct changelog topic to state serdes (apache#8902)
  KAFKA-10235 Fix flaky transactions_test.py (apache#8981)
  MINOR: Closing consumerGroupService resources in SaslClientsWithInvalidCredentialsTest (apache#8992)
  MINOR: Define the term tombstone, since it's used elsewhere in the docs (apache#3480)
  KAFKA-10109: Fix double AdminClient creation in AclCommand
  KAFKA-10220: Add null check for configurationKey in AdminManager.describeConfigs()
  KAFKA-10225 Increase default zk timeout for system tests (apache#8974)
  MINOR; alterReplicaLogDirs should not fail all the futures when only one call fails (apache#8985)
  KAFKA-10134: Use long poll if we do not have fetchable partitions (apache#8934)
  KAFKA-10191 fix flaky StreamsOptimizedTest (apache#8913)
  KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts (apache#8990)
  KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (apache#8989)
  KAFKA-9930: Adjust ReplicaFetcherThread logging when processing UNKNOWN_TOPIC_OR_PARTITION error (apache#8579)
  MINOR: document timestamped state stores (apache#8920)
  KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory (apache#8962)
  MINOR: prune the metadata upgrade test matrix (apache#8971)
  KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (apache#8963)
  ...
vvcephei pushed a commit that referenced this pull request Jul 28, 2020
…sets (#9066)

In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets in checkpoint files. The sentinel was set to -2 which is the same value used for the sentinel LATEST_OFFSET that is used in subscriptions to signal that state stores have been used by an active task. Unfortunately, we missed to skip UNKNOWN_OFFSET when we compute the sum of the changelog offsets.

If a task had only one state store and it did not restore anything before the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the subscription as sum of the changelog offsets. During assignment, the leader interpreted the -2 as if the stream run the task as active although it might have run it as standby. This misinterpretation of the sentinel value resulted in unexpected task assigments.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai <[email protected]>, John Roesler <[email protected]>, Matthias J. Sax <[email protected]>
cadonna added a commit to cadonna/kafka that referenced this pull request Jul 29, 2020
…sets (apache#9066)

In PR apache#8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets in checkpoint files. The sentinel was set to -2 which is the same value used for the sentinel LATEST_OFFSET that is used in subscriptions to signal that state stores have been used by an active task. Unfortunately, we missed to skip UNKNOWN_OFFSET when we compute the sum of the changelog offsets.

If a task had only one state store and it did not restore anything before the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the subscription as sum of the changelog offsets. During assignment, the leader interpreted the -2 as if the stream run the task as active although it might have run it as standby. This misinterpretation of the sentinel value resulted in unexpected task assigments.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai <[email protected]>, John Roesler <[email protected]>, Matthias J. Sax <[email protected]>
vvcephei pushed a commit that referenced this pull request Jul 30, 2020
…sets (#9066) (#9097)

In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets in checkpoint files. The sentinel was set to -2 which is the same value used for the sentinel LATEST_OFFSET that is used in subscriptions to signal that state stores have been used by an active task. Unfortunately, we missed to skip UNKNOWN_OFFSET when we compute the sum of the changelog offsets.

If a task had only one state store and it did not restore anything before the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the subscription as sum of the changelog offsets. During assignment, the leader interpreted the -2 as if the stream run the task as active although it might have run it as standby. This misinterpretation of the sentinel value resulted in unexpected task assignments.

Ports: KAFKA-10287 / #9066

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai <[email protected]>, John Roesler <[email protected]>, Matthias J. Sax <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants