Keep sync tasks alive as long as it's not finished

After this change, sync tasks are erased only when it completes or SyncClient gives up.
This change is preparation for implementing a mechanism to allow CopyOperation to wait for a specific sync task to complete.

In EntryUpdatePerformer:
Add a new error code FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED.
Return the new error code when the sync task cannot be completed because of the lack of the parent resource ID.

In SyncClient:
Add a new task state SUSPENDED to allow tasks to be in an inactive state.
Add a new member |context| to SyncTask to make it possible to change the context without erasing the task.
OnTaskComplete() takes almost all responsibilities to handle errors. OnFetchFileComplete() and OnUpdateCompleter() are now almost empty or removed.
Handle the new error code FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED in OnTaskComplete().

BUG=384213
TEST=unit_tests

Review URL: https://codereview.chromium.org/391343002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@284051 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/chrome/browser/chromeos/drive/sync_client.cc b/chrome/browser/chromeos/drive/sync_client.cc
index a7852012..52049d52 100644
--- a/chrome/browser/chromeos/drive/sync_client.cc
+++ b/chrome/browser/chromeos/drive/sync_client.cc
@@ -121,15 +121,21 @@
   DCHECK(!it->HasError());
 }
 
-// Runs the task and returns a dummy cancel closure.
-base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
-  task.Run();
-  return base::Closure();
+// Gets the parent entry of the entry specified by the ID.
+FileError GetParentResourceEntry(ResourceMetadata* metadata,
+                                 const std::string& local_id,
+                                 ResourceEntry* parent) {
+  ResourceEntry entry;
+  FileError error = metadata->GetResourceEntryById(local_id, &entry);
+  if (error != FILE_ERROR_OK)
+    return error;
+  return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
 }
 
 }  // namespace
 
-SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
+SyncClient::SyncTask::SyncTask()
+    : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
 SyncClient::SyncTask::~SyncTask() {}
 
 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
@@ -209,8 +215,9 @@
 
   SyncTask* task = &it->second;
   switch (task->state) {
+    case SUSPENDED:
     case PENDING:
-      tasks_.erase(it);
+      OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
       break;
     case RUNNING:
       if (!task->cancel_closure.is_null())
@@ -225,37 +232,56 @@
   AddUpdateTaskInternal(context, local_id, delay_);
 }
 
-void SyncClient::AddFetchTaskInternal(const std::string& local_id,
-                                      const base::TimeDelta& delay) {
+base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
+                                           const ClientContext& context) {
   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
-  SyncTask task;
-  task.task = base::Bind(
-      &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
-      base::Unretained(download_operation_.get()),
+  return download_operation_->EnsureFileDownloadedByLocalId(
       local_id,
-      ClientContext(BACKGROUND),
+      context,
       GetFileContentInitializedCallback(),
       google_apis::GetContentCallback(),
       base::Bind(&SyncClient::OnFetchFileComplete,
                  weak_ptr_factory_.GetWeakPtr(),
                  local_id));
+}
+
+void SyncClient::AddFetchTaskInternal(const std::string& local_id,
+                                      const base::TimeDelta& delay) {
+  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
+  SyncTask task;
+  task.state = PENDING;
+  task.context = ClientContext(BACKGROUND);
+  task.task = base::Bind(&SyncClient::PerformFetchTask,
+                         base::Unretained(this),
+                         local_id);
   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
 }
 
+base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
+                                            const ClientContext& context) {
+  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+  entry_update_performer_->UpdateEntry(
+      local_id,
+      context,
+      base::Bind(&SyncClient::OnTaskComplete,
+                 weak_ptr_factory_.GetWeakPtr(),
+                 UPDATE,
+                 local_id));
+  return base::Closure();
+}
+
 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
                                        const std::string& local_id,
                                        const base::TimeDelta& delay) {
+  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
   SyncTask task;
-  task.task = base::Bind(
-      &RunTaskAndReturnDummyCancelClosure,
-      base::Bind(&EntryUpdatePerformer::UpdateEntry,
-                 base::Unretained(entry_update_performer_.get()),
-                 local_id,
-                 context,
-                 base::Bind(&SyncClient::OnUpdateComplete,
-                            weak_ptr_factory_.GetWeakPtr(),
-                            local_id)));
+  task.state = PENDING;
+  task.context = context;
+  task.task = base::Bind(&SyncClient::PerformUpdateTask,
+                         base::Unretained(this),
+                         local_id);
   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
 }
 
@@ -267,20 +293,22 @@
   SyncTasks::iterator it = tasks_.find(key);
   if (it != tasks_.end()) {
     switch (it->second.state) {
+      case SUSPENDED:
+        // Activate the task.
+        it->second.state = PENDING;
+        break;
       case PENDING:
         // The same task will run, do nothing.
-        break;
+        return;
       case RUNNING:
         // Something has changed since the task started. Schedule rerun.
         it->second.should_run_again = true;
-        break;
+        return;
     }
-    return;
+  } else {
+    tasks_[key] = task;
   }
-
   DCHECK_EQ(PENDING, task.state);
-  tasks_[key] = task;
-
   base::MessageLoopProxy::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
@@ -288,19 +316,62 @@
 }
 
 void SyncClient::StartTask(const SyncTasks::key_type& key) {
+  ResourceEntry* parent = new ResourceEntry;
+  base::PostTaskAndReplyWithResult(
+      blocking_task_runner_.get(),
+      FROM_HERE,
+      base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
+      base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
+                 weak_ptr_factory_.GetWeakPtr(),
+                 key,
+                 base::Owned(parent)));
+}
+
+void SyncClient::StartTaskAfterGetParentResourceEntry(
+    const SyncTasks::key_type& key,
+    const ResourceEntry* parent,
+    FileError error) {
+  const SyncType type = key.first;
+  const std::string& local_id = key.second;
   SyncTasks::iterator it = tasks_.find(key);
   if (it == tasks_.end())
     return;
 
   SyncTask* task = &it->second;
   switch (task->state) {
+    case SUSPENDED:
     case PENDING:
-      task->state = RUNNING;
-      task->cancel_closure = task->task.Run();
       break;
     case RUNNING:  // Do nothing.
-      break;
+      return;
   }
+
+  if (error != FILE_ERROR_OK) {
+    OnTaskComplete(type, local_id, error);
+    return;
+  }
+
+  if (type == UPDATE &&
+      parent->resource_id().empty() &&
+      parent->local_id() != util::kDriveTrashDirLocalId) {
+    // Parent entry needs to be synced to get a resource ID.
+    // Suspend the task and register it as a dependent task of the parent.
+    const SyncTasks::key_type key_parent(type, parent->local_id());
+    SyncTasks::iterator it_parent = tasks_.find(key_parent);
+    if (it_parent == tasks_.end()) {
+      OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
+      LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
+                   << local_id << ", parent_id = " << parent->local_id();
+      return;
+    }
+    task->state = SUSPENDED;
+    it_parent->second.dependent_tasks.push_back(key);
+    return;
+  }
+
+  // Run the task.
+  task->state = RUNNING;
+  task->cancel_closure = task->task.Run(task->context);
 }
 
 void SyncClient::OnGetLocalIdsOfBacklog(
@@ -330,22 +401,57 @@
     AddFetchTask((*local_ids)[i]);
 }
 
-bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
+void SyncClient::OnTaskComplete(SyncType type,
+                                const std::string& local_id,
+                                FileError error) {
   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
 
   const SyncTasks::key_type key(type, local_id);
   SyncTasks::iterator it = tasks_.find(key);
   DCHECK(it != tasks_.end());
 
-  if (it->second.should_run_again) {
-    DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
-    it->second.should_run_again = false;
-    it->second.task.Run();
-    return false;
+  base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
+
+  switch (error) {
+    case FILE_ERROR_OK:
+      DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
+      break;
+    case FILE_ERROR_ABORT:
+      // Ignore it because this is caused by user's cancel operations.
+      break;
+    case FILE_ERROR_NO_CONNECTION:
+      // Run the task again so that we'll retry once the connection is back.
+      it->second.should_run_again = true;
+      it->second.context = ClientContext(BACKGROUND);
+      break;
+    case FILE_ERROR_SERVICE_UNAVAILABLE:
+      // Run the task again so that we'll retry once the service is back.
+      it->second.should_run_again = true;
+      it->second.context = ClientContext(BACKGROUND);
+      retry_delay = long_delay_;
+      operation_observer_->OnDriveSyncError(
+          file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
+      break;
+    default:
+      operation_observer_->OnDriveSyncError(
+          file_system::DRIVE_SYNC_ERROR_MISC, local_id);
+      LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
+                   << ": " << FileErrorToString(error);
   }
 
-  tasks_.erase(it);
-  return true;
+  if (it->second.should_run_again) {
+    DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
+    it->second.state = PENDING;
+    it->second.should_run_again = false;
+    base::MessageLoopProxy::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
+        retry_delay);
+  } else {
+    for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
+      StartTask(it->second.dependent_tasks[i]);
+    tasks_.erase(it);
+  }
 }
 
 void SyncClient::OnFetchFileComplete(const std::string& local_id,
@@ -353,102 +459,15 @@
                                      const base::FilePath& local_path,
                                      scoped_ptr<ResourceEntry> entry) {
   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
-  if (!OnTaskComplete(FETCH, local_id))
-    return;
-
-  if (error == FILE_ERROR_OK) {
-    DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
-  } else {
-    switch (error) {
-      case FILE_ERROR_ABORT:
-        // If user cancels download, unpin the file so that we do not sync the
-        // file again.
-        base::PostTaskAndReplyWithResult(
-            blocking_task_runner_,
-            FROM_HERE,
-            base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
-            base::Bind(&util::EmptyFileOperationCallback));
-        break;
-      case FILE_ERROR_NO_CONNECTION:
-        // Add the task again so that we'll retry once the connection is back.
-        AddFetchTaskInternal(local_id, delay_);
-        break;
-      case FILE_ERROR_SERVICE_UNAVAILABLE:
-        // Add the task again so that we'll retry once the service is back.
-        AddFetchTaskInternal(local_id, long_delay_);
-        operation_observer_->OnDriveSyncError(
-            file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
-            local_id);
-        break;
-      default:
-        operation_observer_->OnDriveSyncError(
-            file_system::DRIVE_SYNC_ERROR_MISC,
-            local_id);
-        LOG(WARNING) << "Failed to fetch " << local_id
-                     << ": " << FileErrorToString(error);
-    }
-  }
-}
-
-void SyncClient::OnUpdateComplete(const std::string& local_id,
-                                  FileError error) {
-  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
-  if (!OnTaskComplete(UPDATE, local_id))
-    return;
-
-  if (error == FILE_ERROR_OK) {
-    DVLOG(1) << "Updated " << local_id;
-
-    // Add update tasks for child entries which may be waiting for the parent to
-    // be updated.
-    ResourceEntryVector* entries = new ResourceEntryVector;
+  OnTaskComplete(FETCH, local_id, error);
+  if (error == FILE_ERROR_ABORT) {
+    // If user cancels download, unpin the file so that we do not sync the file
+    // again.
     base::PostTaskAndReplyWithResult(
-        blocking_task_runner_.get(),
+        blocking_task_runner_,
         FROM_HERE,
-        base::Bind(&ResourceMetadata::ReadDirectoryById,
-                   base::Unretained(metadata_), local_id, entries),
-        base::Bind(&SyncClient::AddChildUpdateTasks,
-                   weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
-  } else {
-    switch (error) {
-      case FILE_ERROR_ABORT:
-        // Ignore it because this is caused by user's cancel operations.
-        break;
-      case FILE_ERROR_NO_CONNECTION:
-        // Add the task again so that we'll retry once the connection is back.
-        AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
-                              base::TimeDelta::FromSeconds(0));
-        break;
-      case FILE_ERROR_SERVICE_UNAVAILABLE:
-        // Add the task again so that we'll retry once the service is back.
-        AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
-        operation_observer_->OnDriveSyncError(
-            file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
-            local_id);
-        break;
-      default:
-        operation_observer_->OnDriveSyncError(
-            file_system::DRIVE_SYNC_ERROR_MISC,
-            local_id);
-        LOG(WARNING) << "Failed to update " << local_id << ": "
-                     << FileErrorToString(error);
-    }
-  }
-}
-
-void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
-                                     FileError error) {
-  if (error != FILE_ERROR_OK)
-    return;
-
-  for (size_t i = 0; i < entries->size(); ++i) {
-    const ResourceEntry& entry = (*entries)[i];
-    if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
-      AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
-                            base::TimeDelta::FromSeconds(0));
-    }
+        base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
+        base::Bind(&util::EmptyFileOperationCallback));
   }
 }