Commit 9168c418 authored by Julian Pastarmov's avatar Julian Pastarmov Committed by Commit Bot

[local-sync] Use ImportantFileWriter for LoopbackServer file writes

This reduces the number of individual writes to the file and sets
maximum update rate of 6 writes per minute.

BUG=1013546
TEST=components_unittests:LoopbackServerTests

Change-Id: I40a51051b449dd8f88582420f78079dd2187bd58
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1940687Reviewed-by: default avatarGreg Thompson <grt@chromium.org>
Reviewed-by: default avatarMikel Astiz <mastiz@chromium.org>
Commit-Queue: Julian Pastarmov <pastarmovj@chromium.org>
Cr-Commit-Position: refs/heads/master@{#738543}
parent badc7198
...@@ -295,8 +295,6 @@ void SyncTest::TearDown() { ...@@ -295,8 +295,6 @@ void SyncTest::TearDown() {
// Return OSCrypt to its real behaviour // Return OSCrypt to its real behaviour
OSCryptMocker::TearDown(); OSCryptMocker::TearDown();
fake_server_.reset();
} }
void SyncTest::SetUpCommandLine(base::CommandLine* cl) { void SyncTest::SetUpCommandLine(base::CommandLine* cl) {
...@@ -860,6 +858,7 @@ void SyncTest::TearDownOnMainThread() { ...@@ -860,6 +858,7 @@ void SyncTest::TearDownOnMainThread() {
observer : fake_server_invalidation_observers_) { observer : fake_server_invalidation_observers_) {
fake_server_->RemoveObserver(observer.get()); fake_server_->RemoveObserver(observer.get());
} }
fake_server_.reset();
} }
// Delete things that unsubscribe in destructor before their targets are gone. // Delete things that unsubscribe in destructor before their targets are gone.
......
...@@ -16,11 +16,14 @@ ...@@ -16,11 +16,14 @@
#include "base/metrics/histogram_macros.h" #include "base/metrics/histogram_macros.h"
#include "base/rand_util.h" #include "base/rand_util.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/sequenced_task_runner.h"
#include "base/stl_util.h" #include "base/stl_util.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h" #include "base/strings/string_split.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/task/post_task.h"
#include "base/task/task_traits.h"
#include "components/sync/engine_impl/loopback_server/persistent_bookmark_entity.h" #include "components/sync/engine_impl/loopback_server/persistent_bookmark_entity.h"
#include "components/sync/engine_impl/loopback_server/persistent_permanent_entity.h" #include "components/sync/engine_impl/loopback_server/persistent_permanent_entity.h"
#include "components/sync/engine_impl/loopback_server/persistent_tombstone_entity.h" #include "components/sync/engine_impl/loopback_server/persistent_tombstone_entity.h"
...@@ -231,15 +234,22 @@ LoopbackServer::LoopbackServer(const base::FilePath& persistent_file) ...@@ -231,15 +234,22 @@ LoopbackServer::LoopbackServer(const base::FilePath& persistent_file)
version_(0), version_(0),
store_birthday_(0), store_birthday_(0),
persistent_file_(persistent_file), persistent_file_(persistent_file),
writer_(persistent_file_,
base::CreateSequencedTaskRunner(
{base::ThreadPool(), base::MayBlock(),
base::TaskShutdownBehavior::BLOCK_SHUTDOWN})),
observer_for_tests_(nullptr) { observer_for_tests_(nullptr) {
DCHECK(!persistent_file_.empty()); DCHECK(!persistent_file_.empty());
Init(); Init();
} }
LoopbackServer::~LoopbackServer() {} LoopbackServer::~LoopbackServer() {
if (writer_.HasPendingWrite())
writer_.DoScheduledWrite();
}
void LoopbackServer::Init() { void LoopbackServer::Init() {
if (LoadStateFromFile(persistent_file_)) if (LoadStateFromFile())
return; return;
store_birthday_ = base::Time::Now().ToJavaTime(); store_birthday_ = base::Time::Now().ToJavaTime();
...@@ -367,8 +377,7 @@ net::HttpStatusCode LoopbackServer::HandleCommand( ...@@ -367,8 +377,7 @@ net::HttpStatusCode LoopbackServer::HandleCommand(
response->set_store_birthday(GetStoreBirthday()); response->set_store_birthday(GetStoreBirthday());
// TODO(pastarmovj): This should be done asynchronously. ScheduleSaveStateToFile();
SaveStateToFile(persistent_file_);
return net::HTTP_OK; return net::HTTP_OK;
} }
...@@ -823,45 +832,52 @@ bool LoopbackServer::DeSerializeState( ...@@ -823,45 +832,52 @@ bool LoopbackServer::DeSerializeState(
return true; return true;
} }
// Saves all entities and server state to a protobuf file in |filename|. bool LoopbackServer::SerializeData(std::string* data) {
bool LoopbackServer::SaveStateToFile(const base::FilePath& filename) const { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
sync_pb::LoopbackServerProto proto; sync_pb::LoopbackServerProto proto;
SerializeState(&proto); SerializeState(&proto);
if (!proto.SerializeToString(data)) {
LOG(ERROR) << "Loopback sync proto could not be serialized";
return false;
}
UMA_HISTOGRAM_MEMORY_KB(
"Sync.Local.FileSizeKB",
base::saturated_cast<base::Histogram::Sample>(
base::ClampDiv(base::ClampAdd(data->size(), 512), 1024)));
return true;
}
std::string serialized = proto.SerializeAsString(); bool LoopbackServer::ScheduleSaveStateToFile() {
if (!base::CreateDirectory(filename.DirName())) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!base::CreateDirectory(persistent_file_.DirName())) {
LOG(ERROR) << "Loopback sync could not create the storage directory."; LOG(ERROR) << "Loopback sync could not create the storage directory.";
return false; return false;
} }
int result = base::WriteFile(filename, serialized.data(), serialized.size());
if (result == static_cast<int>(serialized.size())) writer_.ScheduleWrite(this);
UMA_HISTOGRAM_MEMORY_KB("Sync.Local.FileSizeKB", result / 1024); return true;
// TODO(pastarmovj): Add new UMA here to catch error counts. }
return result == static_cast<int>(serialized.size());
} bool LoopbackServer::LoadStateFromFile() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Loads all entities and server state from a protobuf file in |filename|. if (!base::PathExists(persistent_file_)) {
bool LoopbackServer::LoadStateFromFile(const base::FilePath& filename) { LOG(WARNING) << "Loopback sync persistent state file does not exist.";
if (base::PathExists(filename)) { return false;
std::string serialized; }
if (base::ReadFileToString(filename, &serialized)) { std::string serialized;
sync_pb::LoopbackServerProto proto; if (base::ReadFileToString(persistent_file_, &serialized)) {
if (serialized.length() > 0 && proto.ParseFromString(serialized)) { sync_pb::LoopbackServerProto proto;
return DeSerializeState(proto); if (serialized.length() > 0 && proto.ParseFromString(serialized)) {
} else { return DeSerializeState(proto);
LOG(ERROR) << "Loopback sync can not parse the persistent state file.";
return false;
}
} else {
// TODO(pastarmovj): Try to understand what is the issue e.g. file already
// open, no access rights etc. and decide if better course of action is
// available instead of giving up and wiping the global state on the next
// write.
LOG(ERROR) << "Loopback sync can not read the persistent state file.";
return false;
} }
LOG(ERROR) << "Loopback sync can not parse the persistent state file.";
return false;
} }
LOG(WARNING) << "Loopback sync persistent state file does not exist."; // TODO(pastarmovj): Try to understand what is the issue e.g. file already
// open, no access rights etc. and decide if better course of action is
// available instead of giving up and wiping the global state on the next
// write.
LOG(ERROR) << "Loopback sync can not read the persistent state file.";
return false; return false;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "base/callback.h" #include "base/callback.h"
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/files/important_file_writer.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/values.h" #include "base/values.h"
...@@ -30,7 +31,7 @@ class FakeServer; ...@@ -30,7 +31,7 @@ class FakeServer;
namespace syncer { namespace syncer {
// A loopback version of the Sync server used for local profile serialization. // A loopback version of the Sync server used for local profile serialization.
class LoopbackServer { class LoopbackServer : public base::ImportantFileWriter::DataSerializer {
public: public:
class ObserverForTests { class ObserverForTests {
public: public:
...@@ -48,7 +49,7 @@ class LoopbackServer { ...@@ -48,7 +49,7 @@ class LoopbackServer {
}; };
explicit LoopbackServer(const base::FilePath& persistent_file); explicit LoopbackServer(const base::FilePath& persistent_file);
virtual ~LoopbackServer(); ~LoopbackServer() override;
// Handles a /command POST (with the given |message|) to the server. // Handles a /command POST (with the given |message|) to the server.
// |response| must not be null. // |response| must not be null.
...@@ -91,6 +92,9 @@ class LoopbackServer { ...@@ -91,6 +92,9 @@ class LoopbackServer {
base::RepeatingCallback<sync_pb::CommitResponse::ResponseType( base::RepeatingCallback<sync_pb::CommitResponse::ResponseType(
const LoopbackServerEntity& entity)>; const LoopbackServerEntity& entity)>;
// ImportantFileWriter::DataSerializer:
bool SerializeData(std::string* data) override;
// Gets LoopbackServer ready for syncing. // Gets LoopbackServer ready for syncing.
void Init(); void Init();
...@@ -208,11 +212,14 @@ class LoopbackServer { ...@@ -208,11 +212,14 @@ class LoopbackServer {
// Populates the server state from |proto|. Returns true iff successful. // Populates the server state from |proto|. Returns true iff successful.
bool DeSerializeState(const sync_pb::LoopbackServerProto& proto); bool DeSerializeState(const sync_pb::LoopbackServerProto& proto);
// Saves all entities and server state to a protobuf file in |filename|. // Schedules committing state to disk at some later time. Repeat calls are
bool SaveStateToFile(const base::FilePath& filename) const; // batched together. Outstanding scheduled writes are committed at shutdown.
// Returns true on success.
bool ScheduleSaveStateToFile();
// Loads all entities and server state from a protobuf file in |filename|. // Loads all entities and server state from a protobuf file. Returns true on
bool LoadStateFromFile(const base::FilePath& filename); // success.
bool LoadStateFromFile();
void set_observer_for_tests(ObserverForTests* observer) { void set_observer_for_tests(ObserverForTests* observer) {
observer_for_tests_ = observer; observer_for_tests_ = observer;
...@@ -239,6 +246,9 @@ class LoopbackServer { ...@@ -239,6 +246,9 @@ class LoopbackServer {
// The file used to store the local sync data. // The file used to store the local sync data.
base::FilePath persistent_file_; base::FilePath persistent_file_;
// Used to limit the rate of file rewrites due to updates.
base::ImportantFileWriter writer_;
// Used to verify that LoopbackServer is only used from one sequence. // Used to verify that LoopbackServer is only used from one sequence.
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/files/file_util.h" #include "base/files/file_util.h"
#include "base/guid.h" #include "base/guid.h"
#include "base/test/task_environment.h"
#include "components/sync/engine_impl/loopback_server/loopback_connection_manager.h" #include "components/sync/engine_impl/loopback_server/loopback_connection_manager.h"
#include "components/sync/engine_impl/syncer_proto_util.h" #include "components/sync/engine_impl/syncer_proto_util.h"
#include "components/sync/protocol/sync.pb.h" #include "components/sync/protocol/sync.pb.h"
...@@ -133,6 +134,8 @@ class LoopbackServerTest : public testing::Test { ...@@ -133,6 +134,8 @@ class LoopbackServerTest : public testing::Test {
EXPECT_FALSE(response.has_commit()); EXPECT_FALSE(response.has_commit());
} }
base::test::TaskEnvironment task_environment_;
base::FilePath persistent_file_; base::FilePath persistent_file_;
std::unique_ptr<LoopbackConnectionManager> lcm_; std::unique_ptr<LoopbackConnectionManager> lcm_;
}; };
...@@ -224,8 +227,6 @@ TEST_F(LoopbackServerTest, CommitBookmarkTombstoneFailure) { ...@@ -224,8 +227,6 @@ TEST_F(LoopbackServerTest, CommitBookmarkTombstoneFailure) {
TEST_F(LoopbackServerTest, LoadSavedState) { TEST_F(LoopbackServerTest, LoadSavedState) {
std::string id = CommitVerifySuccess(NewBookmarkEntity(kUrl1, kBookmarkBar)); std::string id = CommitVerifySuccess(NewBookmarkEntity(kUrl1, kBookmarkBar));
LoopbackConnectionManager second_user(persistent_file_);
ClientToServerMessage get_updates_msg; ClientToServerMessage get_updates_msg;
SyncerProtoUtil::SetProtocolVersion(&get_updates_msg); SyncerProtoUtil::SetProtocolVersion(&get_updates_msg);
get_updates_msg.set_share("required"); get_updates_msg.set_share("required");
...@@ -234,6 +235,18 @@ TEST_F(LoopbackServerTest, LoadSavedState) { ...@@ -234,6 +235,18 @@ TEST_F(LoopbackServerTest, LoadSavedState) {
->add_from_progress_marker() ->add_from_progress_marker()
->set_data_type_id(EntitySpecifics::kBookmarkFieldNumber); ->set_data_type_id(EntitySpecifics::kBookmarkFieldNumber);
ClientToServerResponse expected_response;
EXPECT_TRUE(CallPostAndProcessHeaders(lcm_.get(), nullptr, get_updates_msg,
&expected_response));
EXPECT_EQ(SyncEnums::SUCCESS, expected_response.error_code());
ASSERT_TRUE(expected_response.has_get_updates());
ASSERT_TRUE(expected_response.has_store_birthday());
lcm_.reset();
task_environment_.RunUntilIdle();
LoopbackConnectionManager second_user(persistent_file_);
ClientToServerResponse response; ClientToServerResponse response;
EXPECT_TRUE(CallPostAndProcessHeaders(&second_user, nullptr, get_updates_msg, EXPECT_TRUE(CallPostAndProcessHeaders(&second_user, nullptr, get_updates_msg,
&response)); &response));
...@@ -243,13 +256,7 @@ TEST_F(LoopbackServerTest, LoadSavedState) { ...@@ -243,13 +256,7 @@ TEST_F(LoopbackServerTest, LoadSavedState) {
EXPECT_EQ(5, response.get_updates().entries_size()); EXPECT_EQ(5, response.get_updates().entries_size());
EXPECT_EQ(1U, ResponseToMap(response).count(id)); EXPECT_EQ(1U, ResponseToMap(response).count(id));
ClientToServerResponse response2; EXPECT_EQ(expected_response.store_birthday(), response.store_birthday());
EXPECT_TRUE(CallPostAndProcessHeaders(lcm_.get(), nullptr, get_updates_msg,
&response2));
EXPECT_EQ(SyncEnums::SUCCESS, response2.error_code());
ASSERT_TRUE(response2.has_get_updates());
ASSERT_TRUE(response2.has_store_birthday());
EXPECT_EQ(response2.store_birthday(), response.store_birthday());
} }
TEST_F(LoopbackServerTest, CommitCommandUpdate) { TEST_F(LoopbackServerTest, CommitCommandUpdate) {
......
...@@ -1178,7 +1178,7 @@ class SyncManagerTest : public testing::Test, ...@@ -1178,7 +1178,7 @@ class SyncManagerTest : public testing::Test,
private: private:
// Needed by |sync_manager_|. // Needed by |sync_manager_|.
base::test::SingleThreadTaskEnvironment task_environment_; base::test::TaskEnvironment task_environment_;
// Needed by |sync_manager_|. // Needed by |sync_manager_|.
base::ScopedTempDir temp_dir_; base::ScopedTempDir temp_dir_;
// Sync Id's for the roots of the enabled datatypes. // Sync Id's for the roots of the enabled datatypes.
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "base/android/jni_array.h" #include "base/android/jni_array.h"
#include "base/android/jni_string.h" #include "base/android/jni_string.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "components/sync/base/model_type.h" #include "components/sync/base/model_type.h"
#include "components/sync/base/time.h" #include "components/sync/base/time.h"
...@@ -55,6 +56,7 @@ void FakeServerHelperAndroid::DeleteFakeServer(JNIEnv* env, ...@@ -55,6 +56,7 @@ void FakeServerHelperAndroid::DeleteFakeServer(JNIEnv* env,
const JavaParamRef<jobject>& obj, const JavaParamRef<jobject>& obj,
jlong fake_server, jlong fake_server,
jlong profile_sync_service) { jlong profile_sync_service) {
base::ScopedAllowBlockingForTesting scoped_allow;
syncer::ProfileSyncService* sync_service = syncer::ProfileSyncService* sync_service =
reinterpret_cast<syncer::ProfileSyncService*>(profile_sync_service); reinterpret_cast<syncer::ProfileSyncService*>(profile_sync_service);
sync_service->OverrideNetworkForTest(syncer::CreateHttpPostProviderFactory()); sync_service->OverrideNetworkForTest(syncer::CreateHttpPostProviderFactory());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment