Skip to content

KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently #5694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

xiowu0
Copy link
Contributor

@xiowu0 xiowu0 commented Sep 26, 2018

KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently

The following race condition can happen: 1) log retention set a topic partition to paused state 2) topic deletion come and see it is already in paused state and proceed 3) topic deletion removed the paused state 4)log retention tries to resume the same topic partition from a NONE state and throw out an exception.

In order to fix this situation, we allow a topic partition to be paused multiple times. Two unit tests are added to verify the fix.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…thread can throw illegal state exception when race against topic deletion or truncation.

The following race can happen: 1) log retention set a topic partition to paused state 2) topic deletion come and see it is already in paused state and proceed 3) topic deletion removed the paused state 4)log retention tries to resume the same topic partition from a NONE state and throw out an exception.

 In order to fix this situation, we allow a topic partition to be paused multiple times.  In addition, a concurrent unit test is added to test race condition.
@lindong28 lindong28 changed the title KAFKA-7322; This is a followup Fix to previous patch. KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently Sep 26, 2018
@lindong28
Copy link
Member

@xiowu0 Thanks much for the PR. The fix makes sense overall. Since bug kind of exists even before the fix made in KAFKA-7322, it probably makes sense to have a separate JIRA to document the problem and the solution. I opened https://issues.apache.org/jira/browse/KAFKA-7441 and also updated the PR title accordingly. Please feel free to modify the JIRA, PR title and description as appropriate.

I have one high level comment regarding the test. If the problem description in KAFKA-7441 makes sense, then the bug can be triggered without having user changing the policy, right? Can we make the test more general, ideally just simulating the scenario described in the JIRA, without requiring topic policy switch to trigger the bug?

@lindong28 lindong28 self-assigned this Sep 26, 2018
Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @xiowu0. Left some minor comments below.

/**
* Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call.
*/
private def checkCleaningPaused(topicPartition: TopicPartition): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this method isCleaningPaused(...) to be consistent with the existing method name isCleaningInState()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is another function "checkCleaningAborted" , I name this function after that.

Copy link
Member

@lindong28 lindong28 Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing method checkCleaningAborted() is a unit function which throws exception if the state is not LogCleaningAborted. The method isCleaningInState(...) returns boolean to indicate whether the state has the given value. It seems that the method we are adding here is closer to the isCleaningInState(...) since this method returns a boolean instead of throwing exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will rename the function to isCleaningInPausedState. For pattern matching purpose, unfortunately we can't reuse the isCleaningInState(...) function.

@@ -240,8 +244,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.")
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicPartition)
case LogCleaningPaused(count) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would code look a bit more elegant by doing this:

            case Some(state) =>
              state match {
                case LogCleaningPaused(count) if count == 1 => 
                  inProgress.remove(topicPartition)
                case LogCleaningPaused(count) if count > 1 =>
                  inProgress.put(topicPartition, LogCleaningPaused(count - 1))
                case s =>
                  throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.")
              }

@@ -37,7 +37,7 @@ import scala.collection.{Iterable, immutable, mutable}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case object LogCleaningPaused extends LogCleaningState
private[log] case class LogCleaningPaused(times: Int) extends LogCleaningState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more readable to replace times with count?

Now that we are making the state more complicated, can we add comment to explain transition rules for these states, similar to the comment in ReplicaStateMachine.scala and PartitionStateMachine.scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about pausedCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add comments about transition rules.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pausedCount sounds good.

Copy link
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @xiowu0. The code looks good overall. Left some comments to see whether we can improve the documentation.

pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
}
info(s"The cleaning for partition $topicPartition is aborted and paused")
}

/**
* Resume the cleaning of paused partitions.
* If the partition is paused by log retention and then paused again by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current comment log retention and then paused again by topic deletion, truncation or partition movement seems to suggest that the behavior depends behavior of the caller, which is not the case. In order to reduce confusion, can we avoid commenting on the behavior of the caller?

* topic deletion, and log truncation
*/
@Test
def testLogsRaceCondition(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test method name testLogsRaceCondition is not very informative. Can you follow the existing test names and make it a bit more self-explanatory? It may also be useful to make the comment a bit more informative similar to the comment for testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa. The current commit is a bit casual.

* test log retention, topic deletion, and log truncation can handle
* pause and resume cleaning on a topic partition correctly.
*/
@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify the test to the following version? Here are the improvements the suggested approach may provide:

  1. In general we want the test to be as efficient and concise as possible as long as it catches the bug. So it is probably preferred to implement the test without running something 100 times.

  2. The test in the current patch actually tests to scenarios, i.e. log cleanup with concurrent topic deletion, and log cleanup with concurrent log truncation. In general these are independent scenarios and it may be better (and most consistent with existing test patterns) to separate them into two tests.

  3. The current test name testLogsRetentionDeletionTruncationHandlePauseAndResumeCleaning is not very easy to understand. The new test names testConcurrentLogCleanupAndLogTruncation and testConcurrentLogCleanupAndTopicDeletion may be easier to understand.

BTW, if we use TestUtils.assertConcurrent , it is preferred to also make its comment more informative, e.g. use Should support log cleanup with concurrent topic deletion may be better than Concurrent log race condition test.

  @Test
  def testConcurrentLogCleanupAndLogTruncation(): Unit = {
    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
    val cleanerManager: LogCleanerManager = createCleanerManager(log)

    // LogManager.cleanup() starts
    val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
    // Log truncation happens due to unclean leader election
    cleanerManager.abortAndPauseCleaning(log.topicPartition)
    cleanerManager.resumeCleaning(Seq(log.topicPartition))
    // LogManager.cleanup() finishes and pausedPartitions are resumed
    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))

    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
  }

  @Test
  def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
    val cleanerManager: LogCleanerManager = createCleanerManager(log)

    // LogManager.cleanup() starts
    val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
    // Broker processes StopReplicaRequest with delete=true
    cleanerManager.abortCleaning(log.topicPartition)
    // LogManager.cleanup() finishes and pausedPartitions are resumed
    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))

    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rename the test cases to:
testLogsWithRetentionInprogressShouldBePickedUpForLogTruncation
testLogsWithRetentionInprogressShouldBePickedUpForTopicDeletion

so they are consistent with existing test case:
testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not careful enough when committing the previous patch. Now I look at it again, the previous test name testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa seems to verbose. The phrase IneligibleForCompaction seems more accurate and more concise than the phrase ShouldNotPickedUpForCompaction. AndViceVersa seems redundant as we typically don't have to specify all details in the test name. Can we simplify that test name to e.g. testLogsUndeCleanupIneligibleForCompaction

Regarding the name for the tests added in this PR, testLogsWithRetentionInprogressShouldBePickedUpForLogTruncation does not seem to match what the test does. In particular, we are verifying that log cleanup and topic deletion can happen concurrently for the same partition, rather than verifying that the log which is under cleanup is also eligible for topic deletion, since we do not have explicitly logic to prevent topic deletion in this case. I feel the name testConcurrentLogCleanupAndLogTruncation probably matches the test's implementation better. What do you think?

* This class manages the state of each partition being cleaned.
* LogCleaningState defines the cleaning states that a TopicPartition can be in.
* 1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress
* or LogCleaningPaused(1). Valid previous state are None, LogCleaningInProgress and LogCleaningPaused(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is None a valid previous state of None? Is there any event that will make this state change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are cases when log cleaner tries to pick up logs for compaction and not to pick up these logs because the log is not a "compact" log. But this is not a really "None" to "None" transition. I will remove it.

* test log retention, topic deletion, and log truncation can handle
* pause and resume cleaning on a topic partition correctly.
*/
@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not careful enough when committing the previous patch. Now I look at it again, the previous test name testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa seems to verbose. The phrase IneligibleForCompaction seems more accurate and more concise than the phrase ShouldNotPickedUpForCompaction. AndViceVersa seems redundant as we typically don't have to specify all details in the test name. Can we simplify that test name to e.g. testLogsUndeCleanupIneligibleForCompaction

Regarding the name for the tests added in this PR, testLogsWithRetentionInprogressShouldBePickedUpForLogTruncation does not seem to match what the test does. In particular, we are verifying that log cleanup and topic deletion can happen concurrently for the same partition, rather than verifying that the log which is under cleanup is also eligible for topic deletion, since we do not have explicitly logic to prevent topic deletion in this case. I feel the name testConcurrentLogCleanupAndLogTruncation probably matches the test's implementation better. What do you think?

…rrently

rename test cases and minor comment changes
@lindong28
Copy link
Member

Thanks for the patch @xiowu0. LGTM. Merged to trunk.

@lindong28 lindong28 closed this in 7ea0655 Oct 5, 2018
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…rrently

Author: Xiongqi Wesley Wu <[email protected]>

Reviewers: Dong Lin <[email protected]>

Closes apache#5694 from xiowu0/fixrace2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants