Keep sync tasks alive as long as it's not finished
After this change, sync tasks are erased only when it completes or SyncClient gives up.
This change is preparation for implementing a mechanism to allow CopyOperation to wait for a specific sync task to complete.
In EntryUpdatePerformer:
Add a new error code FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED.
Return the new error code when the sync task cannot be completed because of the lack of the parent resource ID.
In SyncClient:
Add a new task state SUSPENDED to allow tasks to be in an inactive state.
Add a new member |context| to SyncTask to make it possible to change the context without erasing the task.
OnTaskComplete() takes almost all responsibilities to handle errors. OnFetchFileComplete() and OnUpdateCompleter() are now almost empty or removed.
Handle the new error code FILE_ERROR_PARENT_NEEDS_TO_BE_SYNCED in OnTaskComplete().
BUG=384213
TEST=unit_tests
Review URL: https://codereview.chromium.org/391343002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@284051 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/chrome/browser/chromeos/drive/sync_client.cc b/chrome/browser/chromeos/drive/sync_client.cc
index a7852012..52049d52 100644
--- a/chrome/browser/chromeos/drive/sync_client.cc
+++ b/chrome/browser/chromeos/drive/sync_client.cc
@@ -121,15 +121,21 @@
DCHECK(!it->HasError());
}
-// Runs the task and returns a dummy cancel closure.
-base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
- task.Run();
- return base::Closure();
+// Gets the parent entry of the entry specified by the ID.
+FileError GetParentResourceEntry(ResourceMetadata* metadata,
+ const std::string& local_id,
+ ResourceEntry* parent) {
+ ResourceEntry entry;
+ FileError error = metadata->GetResourceEntryById(local_id, &entry);
+ if (error != FILE_ERROR_OK)
+ return error;
+ return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
}
} // namespace
-SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
+SyncClient::SyncTask::SyncTask()
+ : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
SyncClient::SyncTask::~SyncTask() {}
SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
@@ -209,8 +215,9 @@
SyncTask* task = &it->second;
switch (task->state) {
+ case SUSPENDED:
case PENDING:
- tasks_.erase(it);
+ OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
break;
case RUNNING:
if (!task->cancel_closure.is_null())
@@ -225,37 +232,56 @@
AddUpdateTaskInternal(context, local_id, delay_);
}
-void SyncClient::AddFetchTaskInternal(const std::string& local_id,
- const base::TimeDelta& delay) {
+base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
+ const ClientContext& context) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
- SyncTask task;
- task.task = base::Bind(
- &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
- base::Unretained(download_operation_.get()),
+ return download_operation_->EnsureFileDownloadedByLocalId(
local_id,
- ClientContext(BACKGROUND),
+ context,
GetFileContentInitializedCallback(),
google_apis::GetContentCallback(),
base::Bind(&SyncClient::OnFetchFileComplete,
weak_ptr_factory_.GetWeakPtr(),
local_id));
+}
+
+void SyncClient::AddFetchTaskInternal(const std::string& local_id,
+ const base::TimeDelta& delay) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
+ SyncTask task;
+ task.state = PENDING;
+ task.context = ClientContext(BACKGROUND);
+ task.task = base::Bind(&SyncClient::PerformFetchTask,
+ base::Unretained(this),
+ local_id);
AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
}
+base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
+ const ClientContext& context) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+ entry_update_performer_->UpdateEntry(
+ local_id,
+ context,
+ base::Bind(&SyncClient::OnTaskComplete,
+ weak_ptr_factory_.GetWeakPtr(),
+ UPDATE,
+ local_id));
+ return base::Closure();
+}
+
void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
const std::string& local_id,
const base::TimeDelta& delay) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
+
SyncTask task;
- task.task = base::Bind(
- &RunTaskAndReturnDummyCancelClosure,
- base::Bind(&EntryUpdatePerformer::UpdateEntry,
- base::Unretained(entry_update_performer_.get()),
- local_id,
- context,
- base::Bind(&SyncClient::OnUpdateComplete,
- weak_ptr_factory_.GetWeakPtr(),
- local_id)));
+ task.state = PENDING;
+ task.context = context;
+ task.task = base::Bind(&SyncClient::PerformUpdateTask,
+ base::Unretained(this),
+ local_id);
AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
}
@@ -267,20 +293,22 @@
SyncTasks::iterator it = tasks_.find(key);
if (it != tasks_.end()) {
switch (it->second.state) {
+ case SUSPENDED:
+ // Activate the task.
+ it->second.state = PENDING;
+ break;
case PENDING:
// The same task will run, do nothing.
- break;
+ return;
case RUNNING:
// Something has changed since the task started. Schedule rerun.
it->second.should_run_again = true;
- break;
+ return;
}
- return;
+ } else {
+ tasks_[key] = task;
}
-
DCHECK_EQ(PENDING, task.state);
- tasks_[key] = task;
-
base::MessageLoopProxy::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
@@ -288,19 +316,62 @@
}
void SyncClient::StartTask(const SyncTasks::key_type& key) {
+ ResourceEntry* parent = new ResourceEntry;
+ base::PostTaskAndReplyWithResult(
+ blocking_task_runner_.get(),
+ FROM_HERE,
+ base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
+ base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
+ weak_ptr_factory_.GetWeakPtr(),
+ key,
+ base::Owned(parent)));
+}
+
+void SyncClient::StartTaskAfterGetParentResourceEntry(
+ const SyncTasks::key_type& key,
+ const ResourceEntry* parent,
+ FileError error) {
+ const SyncType type = key.first;
+ const std::string& local_id = key.second;
SyncTasks::iterator it = tasks_.find(key);
if (it == tasks_.end())
return;
SyncTask* task = &it->second;
switch (task->state) {
+ case SUSPENDED:
case PENDING:
- task->state = RUNNING;
- task->cancel_closure = task->task.Run();
break;
case RUNNING: // Do nothing.
- break;
+ return;
}
+
+ if (error != FILE_ERROR_OK) {
+ OnTaskComplete(type, local_id, error);
+ return;
+ }
+
+ if (type == UPDATE &&
+ parent->resource_id().empty() &&
+ parent->local_id() != util::kDriveTrashDirLocalId) {
+ // Parent entry needs to be synced to get a resource ID.
+ // Suspend the task and register it as a dependent task of the parent.
+ const SyncTasks::key_type key_parent(type, parent->local_id());
+ SyncTasks::iterator it_parent = tasks_.find(key_parent);
+ if (it_parent == tasks_.end()) {
+ OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
+ LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
+ << local_id << ", parent_id = " << parent->local_id();
+ return;
+ }
+ task->state = SUSPENDED;
+ it_parent->second.dependent_tasks.push_back(key);
+ return;
+ }
+
+ // Run the task.
+ task->state = RUNNING;
+ task->cancel_closure = task->task.Run(task->context);
}
void SyncClient::OnGetLocalIdsOfBacklog(
@@ -330,22 +401,57 @@
AddFetchTask((*local_ids)[i]);
}
-bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
+void SyncClient::OnTaskComplete(SyncType type,
+ const std::string& local_id,
+ FileError error) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
const SyncTasks::key_type key(type, local_id);
SyncTasks::iterator it = tasks_.find(key);
DCHECK(it != tasks_.end());
- if (it->second.should_run_again) {
- DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
- it->second.should_run_again = false;
- it->second.task.Run();
- return false;
+ base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
+
+ switch (error) {
+ case FILE_ERROR_OK:
+ DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
+ break;
+ case FILE_ERROR_ABORT:
+ // Ignore it because this is caused by user's cancel operations.
+ break;
+ case FILE_ERROR_NO_CONNECTION:
+ // Run the task again so that we'll retry once the connection is back.
+ it->second.should_run_again = true;
+ it->second.context = ClientContext(BACKGROUND);
+ break;
+ case FILE_ERROR_SERVICE_UNAVAILABLE:
+ // Run the task again so that we'll retry once the service is back.
+ it->second.should_run_again = true;
+ it->second.context = ClientContext(BACKGROUND);
+ retry_delay = long_delay_;
+ operation_observer_->OnDriveSyncError(
+ file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
+ break;
+ default:
+ operation_observer_->OnDriveSyncError(
+ file_system::DRIVE_SYNC_ERROR_MISC, local_id);
+ LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
+ << ": " << FileErrorToString(error);
}
- tasks_.erase(it);
- return true;
+ if (it->second.should_run_again) {
+ DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
+ it->second.state = PENDING;
+ it->second.should_run_again = false;
+ base::MessageLoopProxy::current()->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
+ retry_delay);
+ } else {
+ for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
+ StartTask(it->second.dependent_tasks[i]);
+ tasks_.erase(it);
+ }
}
void SyncClient::OnFetchFileComplete(const std::string& local_id,
@@ -353,102 +459,15 @@
const base::FilePath& local_path,
scoped_ptr<ResourceEntry> entry) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
- if (!OnTaskComplete(FETCH, local_id))
- return;
-
- if (error == FILE_ERROR_OK) {
- DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
- } else {
- switch (error) {
- case FILE_ERROR_ABORT:
- // If user cancels download, unpin the file so that we do not sync the
- // file again.
- base::PostTaskAndReplyWithResult(
- blocking_task_runner_,
- FROM_HERE,
- base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
- base::Bind(&util::EmptyFileOperationCallback));
- break;
- case FILE_ERROR_NO_CONNECTION:
- // Add the task again so that we'll retry once the connection is back.
- AddFetchTaskInternal(local_id, delay_);
- break;
- case FILE_ERROR_SERVICE_UNAVAILABLE:
- // Add the task again so that we'll retry once the service is back.
- AddFetchTaskInternal(local_id, long_delay_);
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
- local_id);
- break;
- default:
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_MISC,
- local_id);
- LOG(WARNING) << "Failed to fetch " << local_id
- << ": " << FileErrorToString(error);
- }
- }
-}
-
-void SyncClient::OnUpdateComplete(const std::string& local_id,
- FileError error) {
- DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
-
- if (!OnTaskComplete(UPDATE, local_id))
- return;
-
- if (error == FILE_ERROR_OK) {
- DVLOG(1) << "Updated " << local_id;
-
- // Add update tasks for child entries which may be waiting for the parent to
- // be updated.
- ResourceEntryVector* entries = new ResourceEntryVector;
+ OnTaskComplete(FETCH, local_id, error);
+ if (error == FILE_ERROR_ABORT) {
+ // If user cancels download, unpin the file so that we do not sync the file
+ // again.
base::PostTaskAndReplyWithResult(
- blocking_task_runner_.get(),
+ blocking_task_runner_,
FROM_HERE,
- base::Bind(&ResourceMetadata::ReadDirectoryById,
- base::Unretained(metadata_), local_id, entries),
- base::Bind(&SyncClient::AddChildUpdateTasks,
- weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
- } else {
- switch (error) {
- case FILE_ERROR_ABORT:
- // Ignore it because this is caused by user's cancel operations.
- break;
- case FILE_ERROR_NO_CONNECTION:
- // Add the task again so that we'll retry once the connection is back.
- AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
- base::TimeDelta::FromSeconds(0));
- break;
- case FILE_ERROR_SERVICE_UNAVAILABLE:
- // Add the task again so that we'll retry once the service is back.
- AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
- local_id);
- break;
- default:
- operation_observer_->OnDriveSyncError(
- file_system::DRIVE_SYNC_ERROR_MISC,
- local_id);
- LOG(WARNING) << "Failed to update " << local_id << ": "
- << FileErrorToString(error);
- }
- }
-}
-
-void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
- FileError error) {
- if (error != FILE_ERROR_OK)
- return;
-
- for (size_t i = 0; i < entries->size(); ++i) {
- const ResourceEntry& entry = (*entries)[i];
- if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
- AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
- base::TimeDelta::FromSeconds(0));
- }
+ base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
+ base::Bind(&util::EmptyFileOperationCallback));
}
}
diff --git a/chrome/browser/chromeos/drive/sync_client.h b/chrome/browser/chromeos/drive/sync_client.h
index a8f6d1b..0efccd6 100644
--- a/chrome/browser/chromeos/drive/sync_client.h
+++ b/chrome/browser/chromeos/drive/sync_client.h
@@ -14,6 +14,7 @@
#include "base/memory/weak_ptr.h"
#include "base/time/time.h"
#include "chrome/browser/chromeos/drive/file_errors.h"
+#include "chrome/browser/chromeos/drive/job_scheduler.h"
#include "chrome/browser/chromeos/drive/resource_metadata.h"
namespace base {
@@ -93,25 +94,38 @@
// States of sync tasks.
enum SyncState {
- PENDING,
- RUNNING,
+ SUSPENDED, // Task is currently inactive.
+ PENDING, // Task is going to run.
+ RUNNING, // Task is running.
};
+ typedef std::pair<SyncType, std::string> SyncTaskKey;
+
struct SyncTask {
SyncTask();
~SyncTask();
SyncState state;
- base::Callback<base::Closure()> task;
+ ClientContext context;
+ base::Callback<base::Closure(const ClientContext& context)> task;
bool should_run_again;
base::Closure cancel_closure;
+ std::vector<SyncTaskKey> dependent_tasks;
};
- typedef std::map<std::pair<SyncType, std::string>, SyncTask> SyncTasks;
+ typedef std::map<SyncTaskKey, SyncTask> SyncTasks;
+
+ // Performs a FETCH task.
+ base::Closure PerformFetchTask(const std::string& local_id,
+ const ClientContext& context);
// Adds a FETCH task.
void AddFetchTaskInternal(const std::string& local_id,
const base::TimeDelta& delay);
+ // Performs a UPDATE task.
+ base::Closure PerformUpdateTask(const std::string& local_id,
+ const ClientContext& context);
+
// Adds a UPDATE task.
void AddUpdateTaskInternal(const ClientContext& context,
const std::string& local_id,
@@ -124,6 +138,9 @@
// Called when a task is ready to start.
void StartTask(const SyncTasks::key_type& key);
+ void StartTaskAfterGetParentResourceEntry(const SyncTasks::key_type& key,
+ const ResourceEntry* parent,
+ FileError error);
// Called when the local IDs of files in the backlog are obtained.
void OnGetLocalIdsOfBacklog(const std::vector<std::string>* to_fetch,
@@ -132,8 +149,10 @@
// Adds fetch tasks.
void AddFetchTasks(const std::vector<std::string>* local_ids);
- // Erases the task and returns true if task is completed.
- bool OnTaskComplete(SyncType type, const std::string& local_id);
+ // Called when a task is completed.
+ void OnTaskComplete(SyncType type,
+ const std::string& local_id,
+ FileError error);
// Called when the file for |local_id| is fetched.
void OnFetchFileComplete(const std::string& local_id,
@@ -141,13 +160,6 @@
const base::FilePath& local_path,
scoped_ptr<ResourceEntry> entry);
- // Called when the entry is updated.
- void OnUpdateComplete(const std::string& local_id, FileError error);
-
- // Adds update tasks for |entries|.
- void AddChildUpdateTasks(const ResourceEntryVector* entries,
- FileError error);
-
scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_;
file_system::OperationObserver* operation_observer_;
ResourceMetadata* metadata_;
diff --git a/chrome/browser/chromeos/drive/sync_client_unittest.cc b/chrome/browser/chromeos/drive/sync_client_unittest.cc
index 911b18c..cb56dd28 100644
--- a/chrome/browser/chromeos/drive/sync_client_unittest.cc
+++ b/chrome/browser/chromeos/drive/sync_client_unittest.cc
@@ -478,7 +478,6 @@
// Start syncing the child first.
sync_client_->AddUpdateTask(ClientContext(USER_INITIATED), local_id2);
- base::RunLoop().RunUntilIdle();
// Start syncing the parent later.
sync_client_->AddUpdateTask(ClientContext(USER_INITIATED), local_id1);
base::RunLoop().RunUntilIdle();