sync: Refactor job ownership in SyncScheduler

This change separates the tracking of what work needs to be done from
the decision of when to do it.  Prior to this change, SyncSessionJobs
were owned either by Closures posted to the sync thread's message loop,
or held temporarily in unscheduled_nudge_storage_, a member of the
SyncScheduler.  Following this change, there can be only two jobs in
existence, and they will be referenced only by the scoped_ptr members of
SyncScheduler named pending_nudge_job_ and pending_configure_job_.

This change, along with some previous changes to the way we schedule
tasks, makes it possible to simplify the job "saving" logic.  Jobs with
purpose == NUDGE are saved by assigning them to pending_nudge_job_ or
coalescing them with the existing pending_nudge_job_.  Jobs with purpose
== CONFIGURE are never coalesced, and can be saved in the
pending_configure_job_ member.  These changes allow us to make
SyncSessionJob::Clone() obsolete.

The logic in ScheduleNudgeImpl() has been updated to take advantage of
the fact that the pending job is much easier to find now.  It should now
be much better at coalescing its sources.  In other words, there will be
less scenarios where it can drop notification hints.  However, there are
still some cases in DecideOnJob() that may induce it to drop hints
unnecessarily.

The scheduling logic has been modified, too.  We've removed support for
the nudge while in an exponential backoff interval.  This makes it
possible to track the next wakeup time using a single timer, since the
wakeup event will be one of:
- The end of a throttled interval
- An end-of-backoff-interval retry
- A scheduled nudge
and these scenarios are now mutually exclusive.

BUG=175024

Review URL: https://chromiumcodereview.appspot.com/13422003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192666 0039d316-1c4b-4281-b951-d872f2087c98
parent 2fecc2cb
This diff is collapsed.
......@@ -15,7 +15,7 @@
#include "base/memory/linked_ptr.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/observer_list.h"
#include "base/threading/non_thread_safe.h"
#include "base/time.h"
#include "base/timer.h"
#include "sync/base/sync_export.h"
......@@ -34,7 +34,9 @@ namespace syncer {
class BackoffDelayProvider;
class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
class SYNC_EXPORT_PRIVATE SyncSchedulerImpl
: public SyncScheduler,
public base::NonThreadSafe {
public:
// |name| is a display string to identify the syncer thread. Takes
// |ownership of |syncer| and |delay_provider|.
......@@ -107,8 +109,6 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
SaveNudgeWhileTypeThrottled);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueNudge);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, DropPoll);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinuePoll);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueConfiguration);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
SaveConfigurationWhileThrottled);
......@@ -116,10 +116,7 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
SaveNudgeWhileThrottled);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
ContinueCanaryJobConfig);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
ContinueNudgeWhileExponentialBackOff);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, TransientPollFailure);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, GetInitialBackoffDelay);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest,
ServerConnectionChangeDuringBackoff);
FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest,
......@@ -129,9 +126,8 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
enum Mode {
// Uninitialized state, should not be set in practice.
UNKNOWN = -1,
// A wait interval whose duration has been affected by exponential
// backoff.
// EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
// We enter a series of increasingly longer WaitIntervals if we experience
// repeated transient failures. We retry at the end of each interval.
EXPONENTIAL_BACKOFF,
// A server-initiated throttled interval. We do not allow any syncing
// during such an interval.
......@@ -144,38 +140,28 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
static const char* GetModeString(Mode mode);
Mode mode;
// This bool is set to true if we have observed a nudge during this
// interval and mode == EXPONENTIAL_BACKOFF.
bool had_nudge;
base::TimeDelta length;
base::OneShotTimer<SyncSchedulerImpl> timer;
// Configure jobs are saved only when backing off or throttling. So we
// expose the pointer here (does not own, similar to pending_nudge).
SyncSessionJob* pending_configure_job;
};
static const char* GetModeString(Mode mode);
static const char* GetDecisionString(JobProcessDecision decision);
// Helper to cancel any existing delayed task and replace it with a new one.
// It will not post any tasks if the scheduler is in a "stopped" state.
void PostDelayedTask(const tracked_objects::Location& from_here,
const char* name,
const base::Closure& task,
base::TimeDelta delay);
// Invoke the syncer to perform a non-POLL job.
bool DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job,
JobPriority priority);
// Invoke the Syncer to perform a non-poll job.
bool DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
JobPriority priority);
// Invoke the syncer to perform a nudge job.
void DoNudgeSyncSessionJob(JobPriority priority);
// Invoke the syncer to perform a configuration job.
bool DoConfigurationSyncSessionJob(JobPriority priority);
// Returns whether or not it's safe to run a poll job at this time.
bool ShouldPoll();
// Invoke the Syncer to perform a poll job.
void DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job);
void DoPollSyncSessionJob();
// Called after the Syncer has performed the sync represented by |job|, to
// reset our state. |exited_prematurely| is true if the Syncer did not
......@@ -193,7 +179,7 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
void AdjustPolling(const SyncSessionJob* old_job);
// Helper to restart waiting with |wait_interval_|'s timer.
void RestartWaiting(scoped_ptr<SyncSessionJob> job);
void RestartWaiting();
// Helper to ScheduleNextSync in case of consecutive sync errors.
void HandleContinuationError(scoped_ptr<SyncSessionJob> old_job,
......@@ -203,10 +189,6 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
JobProcessDecision DecideOnJob(const SyncSessionJob& job,
JobPriority priority);
// If DecideOnJob decides that |job| should be SAVEd, this function will
// carry out the task of actually "saving" (or coalescing) the job.
void HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job);
// Decide on whether to CONTINUE, SAVE or DROP the job when we are in
// backoff mode.
JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job,
......@@ -235,22 +217,12 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
// Helper to signal listeners about changed retry time
void NotifyRetryTime(base::Time retry_time);
// Callback to change backoff state. |to_be_canary| in both cases is the job
// that should be granted canary privileges. Note: it is possible that the
// job that gets scheduled when this callback is scheduled is different from
// the job that will actually get executed, because other jobs may have been
// scheduled while we were waiting for the callback.
void DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary);
void Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary);
// Returns a pending job that has potential to run given the state of the
// scheduler, if it exists. Useful whenever an event occurs that may
// change conditions that permit a job to run, such as re-establishing
// network connection, auth refresh, mode changes etc. Note that the returned
// job may have been scheduled to run at a later time, or may have been
// unscheduled. In the former case, this will result in abandoning the old
// job and effectively cancelling it.
scoped_ptr<SyncSessionJob> TakePendingJobForCurrentMode();
// Looks for pending work and, if it finds any, run this work at "canary"
// priority.
void TryCanaryJob();
// Transitions out of the THROTTLED WaitInterval then calls TryCanaryJob().
void Unthrottle();
// Called when the root cause of the current connection error is fixed.
void OnServerConnectionErrorFixed();
......@@ -283,10 +255,6 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
// Used for logging.
const std::string name_;
// The message loop this object is on. Almost all methods have to
// be called on this thread.
base::MessageLoop* const sync_loop_;
// Set in Start(), unset in Stop().
bool started_;
......@@ -304,27 +272,21 @@ class SYNC_EXPORT_PRIVATE SyncSchedulerImpl : public SyncScheduler {
// The mode of operation.
Mode mode_;
// Tracks (does not own) in-flight nudges (scheduled or unscheduled),
// so we can coalesce. NULL if there is no pending nudge.
SyncSessionJob* pending_nudge_;
// There are certain situations where we want to remember a nudge, but
// there is no well defined moment in time in the future when that nudge
// should run, e.g. if it requires a mode switch or updated auth credentials.
// This member will own NUDGE jobs in those cases, until an external event
// (mode switch or fixed auth) occurs to trigger a retry. Should be treated
// as opaque / not interacted with (i.e. we could build a wrapper to
// hide the type, but that's probably overkill).
scoped_ptr<SyncSessionJob> unscheduled_nudge_storage_;
// Current wait state. Null if we're not in backoff and not throttled.
scoped_ptr<WaitInterval> wait_interval_;
scoped_ptr<BackoffDelayProvider> delay_provider_;
// We allow at most one PostedTask to be pending at one time. This is it.
// We will cancel this task before starting a new one.
base::CancelableClosure pending_wakeup_;
// The event that will wake us up.
base::OneShotTimer<SyncSchedulerImpl> pending_wakeup_timer_;
// Pending configure job storage. Note that
// (mode_ != CONFIGURATION_MODE) \implies !pending_configure_job_.
scoped_ptr<SyncSessionJob> pending_configure_job_;
// Pending nudge job storage. These jobs can exist in CONFIGURATION_MODE, but
// they will be run only in NORMAL_MODE.
scoped_ptr<SyncSessionJob> pending_nudge_job_;
// Invoked to run through the sync cycle.
scoped_ptr<Syncer> syncer_;
......
......@@ -72,6 +72,14 @@ void PumpLoop() {
RunLoop();
}
void PumpLoopFor(base::TimeDelta time) {
// Allow the loop to run for the specified amount of time.
MessageLoop::current()->PostDelayedTask(FROM_HERE,
base::Bind(&QuitLoopNow),
time);
RunLoop();
}
ModelSafeRoutingInfo TypesToRoutingInfo(ModelTypeSet types) {
ModelSafeRoutingInfo routes;
for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
......@@ -975,21 +983,20 @@ TEST_F(SyncSchedulerTest, BackoffDropsJobs) {
EXPECT_EQ(GetUpdatesCallerInfo::LOCAL,
r.snapshots[0].source().updates_source);
EXPECT_CALL(*syncer(), SyncShare(_,_,_)).Times(1)
.WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed),
RecordSyncShare(&r)));
// Wait a while (10x poll interval) so a few poll jobs will be attempted.
PumpLoopFor(poll * 10);
// We schedule a nudge with enough delay (10X poll interval) that at least
// one or two polls would have taken place. The nudge should succeed.
// Try (and fail) to schedule a nudge.
scheduler()->ScheduleNudgeAsync(
poll * 10, NUDGE_SOURCE_LOCAL, types, FROM_HERE);
RunLoop();
base::TimeDelta::FromMilliseconds(1),
NUDGE_SOURCE_LOCAL,
types,
FROM_HERE);
Mock::VerifyAndClearExpectations(syncer());
Mock::VerifyAndClearExpectations(delay());
ASSERT_EQ(2U, r.snapshots.size());
EXPECT_EQ(GetUpdatesCallerInfo::LOCAL,
r.snapshots[1].source().updates_source);
ASSERT_EQ(1U, r.snapshots.size());
EXPECT_CALL(*delay(), GetDelay(_)).Times(0);
......
......@@ -88,10 +88,6 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
TimeDelta::FromSeconds(1)));
}
void SetWaitIntervalHadNudge(bool had_nudge) {
scheduler_->wait_interval_->had_nudge = had_nudge;
}
SyncSchedulerImpl::JobProcessDecision DecideOnJob(
const SyncSessionJob& job,
SyncSchedulerImpl::JobPriority priority) {
......@@ -233,22 +229,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) {
EXPECT_EQ(decision, SyncSchedulerImpl::SAVE);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) {
InitializeSyncerOnNormalMode();
SetMode(SyncScheduler::NORMAL_MODE);
SetWaitIntervalToExponentialBackoff();
SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
SyncSessionJob::NUDGE);
EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) {
InitializeSyncerOnNormalMode();
SetMode(SyncScheduler::NORMAL_MODE);
SetWaitIntervalToExponentialBackoff();
SetWaitIntervalHadNudge(true);
SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
SyncSessionJob::NUDGE);
......
......@@ -18,8 +18,7 @@ SyncSessionJob::SyncSessionJob(
: purpose_(purpose),
source_info_(source_info),
scheduled_start_(start),
config_params_(config_params),
finished_(NOT_FINISHED) {
config_params_(config_params) {
}
#define ENUM_CASE(x) case x: return #x; break;
......@@ -36,15 +35,9 @@ const char* SyncSessionJob::GetPurposeString(SyncSessionJob::Purpose purpose) {
#undef ENUM_CASE
bool SyncSessionJob::Finish(bool early_exit, sessions::SyncSession* session) {
DCHECK_EQ(finished_, NOT_FINISHED);
// Did we run through all SyncerSteps from start_step() to end_step()
// until the SyncSession returned !HasMoreToSync()?
// Note: if not, it's possible the scheduler hasn't started with
// SyncShare yet, it's possible there is still more to sync in the session,
// and it's also possible the job quit part way through due to a premature
// exit condition (such as shutdown).
finished_ = early_exit ? EARLY_EXIT : FINISHED;
// Did we quit part-way through due to premature exit condition, like
// shutdown? Note that this branch will not be hit for other kinds
// of early return scenarios, like certain kinds of transient errors.
if (early_exit)
return false;
......@@ -75,12 +68,6 @@ bool SyncSessionJob::Finish(bool early_exit, sessions::SyncSession* session) {
return true;
}
scoped_ptr<SyncSessionJob> SyncSessionJob::Clone() const {
return scoped_ptr<SyncSessionJob>(new SyncSessionJob(
purpose_, scheduled_start_, source_info_,
config_params_));
}
void SyncSessionJob::CoalesceSources(const sessions::SyncSourceInfo& source) {
CoalesceStates(source.types, &source_info_.types);
source_info_.updates_source = source.updates_source;
......
......@@ -37,10 +37,6 @@ class SYNC_EXPORT_PRIVATE SyncSessionJob {
const ConfigurationParams& config_params);
~SyncSessionJob();
// Returns a new clone of the job, with a cloned SyncSession ready to be
// retried / rescheduled.
scoped_ptr<SyncSessionJob> Clone() const;
// Overwrite the sync update source with the most recent and merge the
// type/state map.
void CoalesceSources(const sessions::SyncSourceInfo& source);
......@@ -75,14 +71,6 @@ class SYNC_EXPORT_PRIVATE SyncSessionJob {
ConfigurationParams config_params() const;
private:
// A SyncSessionJob can be in one of these three states, controlled by the
// Finish() function, see method comments.
enum FinishedState {
NOT_FINISHED, // Finish has not been called.
EARLY_EXIT, // Finish was called but the job was "preempted",
FINISHED // Indicates a "clean" finish operation.
};
const Purpose purpose_;
sessions::SyncSourceInfo source_info_;
......@@ -92,11 +80,6 @@ class SYNC_EXPORT_PRIVATE SyncSessionJob {
// Succeeded() behavior may be arguments to subclass in the future.
const ConfigurationParams config_params_;
// Set to true if Finish() was called, false otherwise. True implies that
// a SyncShare operation took place with |session_| and it cycled through
// all requisite steps given |purpose_| without being preempted.
FinishedState finished_;
DISALLOW_COPY_AND_ASSIGN(SyncSessionJob);
};
......
......@@ -120,64 +120,6 @@ class SyncSessionJobTest : public testing::Test {
bool config_params_callback_invoked_;
};
TEST_F(SyncSessionJobTest, Clone) {
SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(),
MakeSourceInfo(), ConfigurationParams());
scoped_ptr<SyncSession> session1 = MakeSession().Pass();
sessions::test_util::SimulateSuccess(session1.get(),
job1.start_step(),
job1.end_step());
job1.Finish(false, session1.get());
ModelSafeRoutingInfo new_routes;
new_routes[AUTOFILL] = GROUP_PASSIVE;
context()->set_routing_info(new_routes);
scoped_ptr<SyncSessionJob> clone1 = job1.Clone();
ExpectClones(&job1, clone1.get());
context()->set_routing_info(routes());
scoped_ptr<SyncSession> session2 = MakeSession().Pass();
sessions::test_util::SimulateSuccess(session2.get(),
clone1->start_step(),
clone1->end_step());
clone1->Finish(false, session2.get());
scoped_ptr<SyncSessionJob> clone2 = clone1->Clone();
ExpectClones(clone1.get(), clone2.get());
clone1.reset();
ExpectClones(&job1, clone2.get());
}
TEST_F(SyncSessionJobTest, CloneAfterEarlyExit) {
scoped_ptr<SyncSession> session = MakeSession().Pass();
SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(),
MakeSourceInfo(), ConfigurationParams());
job1.Finish(true, session.get());
scoped_ptr<SyncSessionJob> job2 = job1.Clone();
ExpectClones(&job1, job2.get());
}
// Tests interaction between Finish and sync cycle success / failure.
TEST_F(SyncSessionJobTest, Finish) {
SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(),
MakeSourceInfo(), ConfigurationParams());
scoped_ptr<SyncSession> session1 = MakeSession().Pass();
sessions::test_util::SimulateSuccess(session1.get(),
job1.start_step(),
job1.end_step());
EXPECT_TRUE(job1.Finish(false /* early_exit */, session1.get()));
scoped_ptr<SyncSessionJob> job2 = job1.Clone();
scoped_ptr<SyncSession> session2 = MakeSession().Pass();
sessions::test_util::SimulateConnectionFailure(session2.get(),
job2->start_step(),
job2->end_step());
EXPECT_FALSE(job2->Finish(false, session2.get()));
}
TEST_F(SyncSessionJobTest, FinishCallsReadyTask) {
ConfigurationParams params;
params.ready_task = base::Bind(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment