Commit bd49c237 authored by tzik@chromium.org's avatar tzik@chromium.org

[SyncFS] Refine SyncProcessRunner's throttling algorithm for parallel task support

This CL extends task throttling algorithm to multiple task case.
Comparing to previous algorithm, new one backs off the throttling duration by elapsed time.
So that consecutive failure doesn't throttle the next task too long time.


BUG=344769
TEST=unit_tests --gtest_filter="SyncProcessRunnerTest.*"

Review URL: https://codereview.chromium.org/386523002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@282934 0039d316-1c4b-4281-b951-d872f2087c98
parent 12507e6f
...@@ -161,8 +161,7 @@ class LocalSyncRunner : public SyncProcessRunner, ...@@ -161,8 +161,7 @@ class LocalSyncRunner : public SyncProcessRunner,
OnChangesUpdated(pending_changes); OnChangesUpdated(pending_changes);
// Kick other sync runners just in case they're not running. // Kick other sync runners just in case they're not running.
GetSyncService()->RunForEachSyncRunners( GetSyncService()->RunForEachSyncRunners(&SyncProcessRunner::Schedule);
&SyncProcessRunner::ScheduleIfNotRunning);
} }
private: private:
...@@ -211,8 +210,7 @@ class RemoteSyncRunner : public SyncProcessRunner, ...@@ -211,8 +210,7 @@ class RemoteSyncRunner : public SyncProcessRunner,
OnChangesUpdated(pending_changes); OnChangesUpdated(pending_changes);
// Kick other sync runners just in case they're not running. // Kick other sync runners just in case they're not running.
GetSyncService()->RunForEachSyncRunners( GetSyncService()->RunForEachSyncRunners(&SyncProcessRunner::Schedule);
&SyncProcessRunner::ScheduleIfNotRunning);
} }
virtual void OnRemoteServiceStateUpdated( virtual void OnRemoteServiceStateUpdated(
......
...@@ -63,20 +63,16 @@ SyncProcessRunner::SyncProcessRunner( ...@@ -63,20 +63,16 @@ SyncProcessRunner::SyncProcessRunner(
const std::string& name, const std::string& name,
Client* client, Client* client,
scoped_ptr<TimerHelper> timer_helper, scoped_ptr<TimerHelper> timer_helper,
int max_parallel_task) size_t max_parallel_task)
: name_(name), : name_(name),
client_(client), client_(client),
max_parallel_task_(max_parallel_task), max_parallel_task_(max_parallel_task),
running_tasks_(0), running_tasks_(0),
timer_helper_(timer_helper.Pass()), timer_helper_(timer_helper.Pass()),
current_delay_(0), service_state_(SYNC_SERVICE_RUNNING),
last_delay_(0),
pending_changes_(0), pending_changes_(0),
factory_(this) { factory_(this) {
DCHECK_LE(1, max_parallel_task_); DCHECK_LE(1u, max_parallel_task_);
DCHECK_EQ(1, max_parallel_task_)
<< "Parellel task execution is not yet implemented.";
if (!timer_helper_) if (!timer_helper_)
timer_helper_.reset(new BaseTimerHelper); timer_helper_.reset(new BaseTimerHelper);
} }
...@@ -84,38 +80,69 @@ SyncProcessRunner::SyncProcessRunner( ...@@ -84,38 +80,69 @@ SyncProcessRunner::SyncProcessRunner(
SyncProcessRunner::~SyncProcessRunner() {} SyncProcessRunner::~SyncProcessRunner() {}
void SyncProcessRunner::Schedule() { void SyncProcessRunner::Schedule() {
int64 delay = kSyncDelayInMilliseconds;
if (pending_changes_ == 0) { if (pending_changes_ == 0) {
ScheduleInternal(kSyncDelayMaxInMilliseconds); ScheduleInternal(kSyncDelayMaxInMilliseconds);
return; return;
} }
switch (GetServiceState()) {
SyncServiceState last_service_state = service_state_;
service_state_ = GetServiceState();
switch (service_state_) {
case SYNC_SERVICE_RUNNING: case SYNC_SERVICE_RUNNING:
ResetThrottling();
if (pending_changes_ > kPendingChangeThresholdForFastSync) if (pending_changes_ > kPendingChangeThresholdForFastSync)
delay = kSyncDelayFastInMilliseconds; ScheduleInternal(kSyncDelayFastInMilliseconds);
else else
delay = kSyncDelayInMilliseconds; ScheduleInternal(kSyncDelayInMilliseconds);
break; return;
case SYNC_SERVICE_TEMPORARY_UNAVAILABLE: case SYNC_SERVICE_TEMPORARY_UNAVAILABLE:
delay = kSyncDelaySlowInMilliseconds; if (last_service_state != service_state_)
if (last_delay_ >= kSyncDelaySlowInMilliseconds) ThrottleSync(kSyncDelaySlowInMilliseconds);
delay = last_delay_ * 2; ScheduleInternal(kSyncDelaySlowInMilliseconds);
if (delay >= kSyncDelayMaxInMilliseconds) return;
delay = kSyncDelayMaxInMilliseconds;
break;
case SYNC_SERVICE_AUTHENTICATION_REQUIRED: case SYNC_SERVICE_AUTHENTICATION_REQUIRED:
case SYNC_SERVICE_DISABLED: case SYNC_SERVICE_DISABLED:
delay = kSyncDelayMaxInMilliseconds; if (last_service_state != service_state_)
break; ThrottleSync(kSyncDelaySlowInMilliseconds);
ScheduleInternal(kSyncDelayMaxInMilliseconds);
return;
} }
ScheduleInternal(delay);
NOTREACHED();
ScheduleInternal(kSyncDelayMaxInMilliseconds);
} }
void SyncProcessRunner::ScheduleIfNotRunning() { void SyncProcessRunner::ThrottleSync(int64 base_delay) {
if (!timer_helper_->IsRunning()) base::TimeTicks now = timer_helper_->Now();
Schedule(); base::TimeDelta elapsed = std::min(now, throttle_until_) - throttle_from_;
DCHECK(base::TimeDelta() <= elapsed);
throttle_from_ = now;
// Extend throttling duration by twice the elapsed time.
// That is, if the backoff repeats in a short period, the throttling period
// doesn't grow exponentially. If the backoff happens on the end of
// throttling period, it causes another throttling period that is twice as
// long as previous.
base::TimeDelta base_delay_delta =
base::TimeDelta::FromMilliseconds(base_delay);
const base::TimeDelta max_delay =
base::TimeDelta::FromMilliseconds(kSyncDelayMaxInMilliseconds);
throttle_until_ =
std::min(now + max_delay,
std::max(now + base_delay_delta, throttle_until_ + 2 * elapsed));
}
void SyncProcessRunner::ResetOldThrottling() {
if (throttle_until_ < base::TimeTicks::Now())
ResetThrottling();
}
void SyncProcessRunner::ResetThrottling() {
throttle_from_ = base::TimeTicks();
throttle_until_ = base::TimeTicks();
} }
void SyncProcessRunner::OnChangesUpdated( void SyncProcessRunner::OnChangesUpdated(
...@@ -143,62 +170,71 @@ SyncServiceState SyncProcessRunner::GetServiceState() { ...@@ -143,62 +170,71 @@ SyncServiceState SyncProcessRunner::GetServiceState() {
void SyncProcessRunner::Finished(const base::TimeTicks& start_time, void SyncProcessRunner::Finished(const base::TimeTicks& start_time,
SyncStatusCode status) { SyncStatusCode status) {
DCHECK_LT(0, running_tasks_); DCHECK_LT(0u, running_tasks_);
DCHECK_LE(running_tasks_, max_parallel_task_); DCHECK_LE(running_tasks_, max_parallel_task_);
--running_tasks_; --running_tasks_;
util::Log(logging::LOG_VERBOSE, FROM_HERE, util::Log(logging::LOG_VERBOSE, FROM_HERE,
"[%s] * Finished (elapsed: %" PRId64 " sec)", "[%s] * Finished (elapsed: %" PRId64 " ms)", name_.c_str(),
name_.c_str(), (timer_helper_->Now() - start_time).InMilliseconds());
(timer_helper_->Now() - start_time).InSeconds());
if (status == SYNC_STATUS_NO_CHANGE_TO_SYNC || if (status == SYNC_STATUS_NO_CHANGE_TO_SYNC ||
status == SYNC_STATUS_FILE_BUSY) status == SYNC_STATUS_FILE_BUSY) {
ScheduleInternal(kSyncDelayMaxInMilliseconds); ScheduleInternal(kSyncDelayMaxInMilliseconds);
else if (!WasSuccessfulSync(status) && return;
GetServiceState() == SYNC_SERVICE_RUNNING) }
ScheduleInternal(kSyncDelayWithSyncError);
if (WasSuccessfulSync(status))
ResetOldThrottling();
else else
Schedule(); ThrottleSync(kSyncDelayWithSyncError);
Schedule();
} }
void SyncProcessRunner::Run() { void SyncProcessRunner::Run() {
if (running_tasks_ >= max_parallel_task_) if (running_tasks_ >= max_parallel_task_)
return; return;
++running_tasks_; ++running_tasks_;
last_scheduled_ = timer_helper_->Now(); base::TimeTicks now = timer_helper_->Now();
last_delay_ = current_delay_; last_run_ = now;
util::Log(logging::LOG_VERBOSE, FROM_HERE, util::Log(logging::LOG_VERBOSE, FROM_HERE,
"[%s] * Started", name_.c_str()); "[%s] * Started", name_.c_str());
StartSync(base::Bind(&SyncProcessRunner::Finished, factory_.GetWeakPtr(), StartSync(base::Bind(&SyncProcessRunner::Finished, factory_.GetWeakPtr(),
last_scheduled_)); now));
if (running_tasks_ < max_parallel_task_)
Schedule();
} }
void SyncProcessRunner::ScheduleInternal(int64 delay) { void SyncProcessRunner::ScheduleInternal(int64 delay) {
base::TimeDelta time_to_next = base::TimeDelta::FromMilliseconds(delay); base::TimeTicks now = timer_helper_->Now();
base::TimeTicks next_scheduled;
if (timer_helper_->IsRunning()) { if (timer_helper_->IsRunning()) {
if (current_delay_ == delay) next_scheduled = last_run_ + base::TimeDelta::FromMilliseconds(delay);
return; if (next_scheduled < now) {
next_scheduled =
base::TimeDelta elapsed = timer_helper_->Now() - last_scheduled_; now + base::TimeDelta::FromMilliseconds(kSyncDelayFastInMilliseconds);
if (elapsed < time_to_next) {
time_to_next = time_to_next - elapsed;
} else {
time_to_next = base::TimeDelta::FromMilliseconds(
kSyncDelayFastInMilliseconds);
} }
} else {
next_scheduled = now + base::TimeDelta::FromMilliseconds(delay);
} }
if (current_delay_ != delay) { if (next_scheduled < throttle_until_)
util::Log(logging::LOG_VERBOSE, FROM_HERE, next_scheduled = throttle_until_;
"[%s] Scheduling task in %" PRId64 " secs",
name_.c_str(), time_to_next.InSeconds()); if (timer_helper_->IsRunning() && last_scheduled_ == next_scheduled)
} return;
current_delay_ = delay;
util::Log(logging::LOG_VERBOSE, FROM_HERE,
"[%s] Scheduling task in %" PRId64 " ms",
name_.c_str(), (next_scheduled - now).InMilliseconds());
last_scheduled_ = next_scheduled;
timer_helper_->Start( timer_helper_->Start(
FROM_HERE, time_to_next, FROM_HERE, next_scheduled - now,
base::Bind(&SyncProcessRunner::Run, base::Unretained(this))); base::Bind(&SyncProcessRunner::Run, base::Unretained(this)));
} }
......
...@@ -67,7 +67,7 @@ class SyncProcessRunner { ...@@ -67,7 +67,7 @@ class SyncProcessRunner {
SyncProcessRunner(const std::string& name, SyncProcessRunner(const std::string& name,
Client* client, Client* client,
scoped_ptr<TimerHelper> timer_helper, scoped_ptr<TimerHelper> timer_helper,
int max_parallel_task); size_t max_parallel_task);
virtual ~SyncProcessRunner(); virtual ~SyncProcessRunner();
// Subclass must implement this. // Subclass must implement this.
...@@ -75,7 +75,6 @@ class SyncProcessRunner { ...@@ -75,7 +75,6 @@ class SyncProcessRunner {
// Schedules a new sync. // Schedules a new sync.
void Schedule(); void Schedule();
void ScheduleIfNotRunning();
int64 pending_changes() const { return pending_changes_; } int64 pending_changes() const { return pending_changes_; }
...@@ -92,14 +91,26 @@ class SyncProcessRunner { ...@@ -92,14 +91,26 @@ class SyncProcessRunner {
void Run(); void Run();
void ScheduleInternal(int64 delay); void ScheduleInternal(int64 delay);
// Throttles new sync for |base_delay| milliseconds for an error case.
// If new sync is already throttled, back off the duration.
void ThrottleSync(int64 base_delay);
// Clears old throttling setting that is already over.
void ResetOldThrottling();
void ResetThrottling();
std::string name_; std::string name_;
Client* client_; Client* client_;
int max_parallel_task_; size_t max_parallel_task_;
int running_tasks_; size_t running_tasks_;
scoped_ptr<TimerHelper> timer_helper_; scoped_ptr<TimerHelper> timer_helper_;
base::TimeTicks last_run_;
base::TimeTicks last_scheduled_; base::TimeTicks last_scheduled_;
int64 current_delay_; SyncServiceState service_state_;
int64 last_delay_;
base::TimeTicks throttle_from_;
base::TimeTicks throttle_until_;
int64 pending_changes_; int64 pending_changes_;
base::WeakPtrFactory<SyncProcessRunner> factory_; base::WeakPtrFactory<SyncProcessRunner> factory_;
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include "chrome/browser/sync_file_system/sync_process_runner.h" #include "chrome/browser/sync_file_system/sync_process_runner.h"
#include <queue>
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
...@@ -85,15 +87,16 @@ class FakeSyncProcessRunner : public SyncProcessRunner { ...@@ -85,15 +87,16 @@ class FakeSyncProcessRunner : public SyncProcessRunner {
public: public:
FakeSyncProcessRunner(SyncProcessRunner::Client* client, FakeSyncProcessRunner(SyncProcessRunner::Client* client,
scoped_ptr<TimerHelper> timer_helper, scoped_ptr<TimerHelper> timer_helper,
int max_parallel_task) size_t max_parallel_task)
: SyncProcessRunner("FakeSyncProcess", : SyncProcessRunner("FakeSyncProcess",
client, timer_helper.Pass(), client, timer_helper.Pass(),
max_parallel_task) { max_parallel_task),
max_parallel_task_(max_parallel_task) {
} }
virtual void StartSync(const SyncStatusCallback& callback) OVERRIDE { virtual void StartSync(const SyncStatusCallback& callback) OVERRIDE {
EXPECT_TRUE(running_task_.is_null()); EXPECT_LT(running_tasks_.size(), max_parallel_task_);
running_task_ = callback; running_tasks_.push(callback);
} }
virtual ~FakeSyncProcessRunner() { virtual ~FakeSyncProcessRunner() {
...@@ -104,18 +107,19 @@ class FakeSyncProcessRunner : public SyncProcessRunner { ...@@ -104,18 +107,19 @@ class FakeSyncProcessRunner : public SyncProcessRunner {
} }
void CompleteTask(SyncStatusCode status) { void CompleteTask(SyncStatusCode status) {
ASSERT_FALSE(running_task_.is_null()); ASSERT_FALSE(running_tasks_.empty());
SyncStatusCallback task = running_task_; SyncStatusCallback task = running_tasks_.front();
running_task_.Reset(); running_tasks_.pop();
task.Run(status); task.Run(status);
} }
bool HasRunningTask() const { bool HasRunningTask() const {
return !running_task_.is_null(); return !running_tasks_.empty();
} }
private: private:
SyncStatusCallback running_task_; size_t max_parallel_task_;
std::queue<SyncStatusCallback> running_tasks_;
DISALLOW_COPY_AND_ASSIGN(FakeSyncProcessRunner); DISALLOW_COPY_AND_ASSIGN(FakeSyncProcessRunner);
}; };
...@@ -184,6 +188,90 @@ TEST(SyncProcessRunnerTest, SingleTaskBasicTest) { ...@@ -184,6 +188,90 @@ TEST(SyncProcessRunnerTest, SingleTaskBasicTest) {
fake_runner.CompleteTask(SYNC_STATUS_OK); fake_runner.CompleteTask(SYNC_STATUS_OK);
EXPECT_EQ(SyncProcessRunner::kSyncDelayMaxInMilliseconds, EXPECT_EQ(SyncProcessRunner::kSyncDelayMaxInMilliseconds,
fake_timer->GetCurrentDelay()); fake_timer->GetCurrentDelay());
// Schedule the next with the longest delay if the client is persistently
// unavailable.
fake_client.set_service_state(SYNC_SERVICE_AUTHENTICATION_REQUIRED);
fake_runner.UpdateChanges(100);
EXPECT_EQ(SyncProcessRunner::kSyncDelayMaxInMilliseconds,
fake_timer->GetCurrentDelay());
}
TEST(SyncProcessRunnerTest, MultiTaskBasicTest) {
FakeClient fake_client;
FakeTimerHelper* fake_timer = new FakeTimerHelper();
FakeSyncProcessRunner fake_runner(
&fake_client,
scoped_ptr<SyncProcessRunner::TimerHelper>(fake_timer),
2 /* max_parallel_task */);
base::TimeTicks base_time = base::TimeTicks::Now();
fake_timer->SetCurrentTime(base_time);
EXPECT_FALSE(fake_timer->IsRunning());
fake_runner.UpdateChanges(100);
EXPECT_TRUE(fake_timer->IsRunning());
EXPECT_EQ(SyncProcessRunner::kSyncDelayFastInMilliseconds,
fake_timer->GetCurrentDelay());
// Even after a task starts running, SyncProcessRunner should schedule next
// task until the number of running task reachs the limit.
fake_timer->AdvanceToScheduledTime();
EXPECT_TRUE(fake_timer->IsRunning());
EXPECT_TRUE(fake_runner.HasRunningTask());
EXPECT_EQ(SyncProcessRunner::kSyncDelayFastInMilliseconds,
fake_timer->GetCurrentDelay());
// After the second task starts running, SyncProcessRunner should stop
// scheduling a task.
fake_timer->AdvanceToScheduledTime();
EXPECT_FALSE(fake_timer->IsRunning());
EXPECT_TRUE(fake_runner.HasRunningTask());
fake_runner.CompleteTask(SYNC_STATUS_OK);
EXPECT_TRUE(fake_timer->IsRunning());
EXPECT_TRUE(fake_runner.HasRunningTask());
fake_runner.CompleteTask(SYNC_STATUS_OK);
EXPECT_TRUE(fake_timer->IsRunning());
EXPECT_FALSE(fake_runner.HasRunningTask());
// Turn |service_state| to TEMPORARY_UNAVAILABLE and let the task fail.
// |fake_runner| should schedule following tasks with longer delay.
fake_timer->AdvanceToScheduledTime();
fake_timer->AdvanceToScheduledTime();
fake_client.set_service_state(SYNC_SERVICE_TEMPORARY_UNAVAILABLE);
fake_runner.CompleteTask(SYNC_STATUS_FAILED);
EXPECT_EQ(SyncProcessRunner::kSyncDelaySlowInMilliseconds,
fake_timer->GetCurrentDelay());
// Consecutive error reports shouldn't extend delay immediately.
fake_runner.CompleteTask(SYNC_STATUS_FAILED);
EXPECT_EQ(SyncProcessRunner::kSyncDelaySlowInMilliseconds,
fake_timer->GetCurrentDelay());
// The next task will run after throttle period is over.
// And its failure should extend the throttle period by twice.
fake_timer->AdvanceToScheduledTime();
EXPECT_EQ(SyncProcessRunner::kSyncDelaySlowInMilliseconds,
fake_timer->GetCurrentDelay());
fake_runner.CompleteTask(SYNC_STATUS_FAILED);
EXPECT_EQ(2 * SyncProcessRunner::kSyncDelaySlowInMilliseconds,
fake_timer->GetCurrentDelay());
// Next successful task should clear the throttling.
fake_timer->AdvanceToScheduledTime();
fake_client.set_service_state(SYNC_SERVICE_RUNNING);
fake_runner.CompleteTask(SYNC_STATUS_OK);
EXPECT_EQ(SyncProcessRunner::kSyncDelayFastInMilliseconds,
fake_timer->GetCurrentDelay());
// Then, following failing task should not extend throttling period.
fake_timer->AdvanceToScheduledTime();
fake_client.set_service_state(SYNC_SERVICE_TEMPORARY_UNAVAILABLE);
fake_runner.CompleteTask(SYNC_STATUS_FAILED);
EXPECT_EQ(SyncProcessRunner::kSyncDelaySlowInMilliseconds,
fake_timer->GetCurrentDelay());
} }
} // namespace sync_file_system } // namespace sync_file_system
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment