Commit 08d153c2 authored by sorin's avatar sorin Committed by Commit bot

Fix task concurrency in components/update_client

1. Only dequeue pending tasks when no other tasks are running.
2. Use the running tasks list to accept or reject a foreground task.

BUG=549305

Review URL: https://codereview.chromium.org/1419473005

Cr-Commit-Position: refs/heads/master@{#356992}
parent 19fe1a8d
...@@ -5,6 +5,9 @@ ...@@ -5,6 +5,9 @@
#ifndef COMPONENTS_UPDATE_CLIENT_TASK_H_ #ifndef COMPONENTS_UPDATE_CLIENT_TASK_H_
#define COMPONENTS_UPDATE_CLIENT_TASK_H_ #define COMPONENTS_UPDATE_CLIENT_TASK_H_
#include <string>
#include <vector>
#include "base/callback.h" #include "base/callback.h"
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "components/update_client/update_client.h" #include "components/update_client/update_client.h"
...@@ -22,6 +25,9 @@ class Task { ...@@ -22,6 +25,9 @@ class Task {
virtual ~Task() {} virtual ~Task() {}
virtual void Run() = 0; virtual void Run() = 0;
// Returns the ids corresponding to the CRXs associated with this update task.
virtual std::vector<std::string> GetIds() const = 0;
}; };
} // namespace update_client } // namespace update_client
......
...@@ -41,6 +41,10 @@ void TaskUpdate::Run() { ...@@ -41,6 +41,10 @@ void TaskUpdate::Run() {
base::Bind(&TaskUpdate::RunComplete, base::Unretained(this))); base::Bind(&TaskUpdate::RunComplete, base::Unretained(this)));
} }
std::vector<std::string> TaskUpdate::GetIds() const {
return ids_;
}
void TaskUpdate::RunComplete(int error) { void TaskUpdate::RunComplete(int error) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
......
...@@ -40,6 +40,8 @@ class TaskUpdate : public Task { ...@@ -40,6 +40,8 @@ class TaskUpdate : public Task {
void Run() override; void Run() override;
std::vector<std::string> GetIds() const override;
private: private:
// Called when the Run function associated with this task has completed. // Called when the Run function associated with this task has completed.
void RunComplete(int error); void RunComplete(int error);
......
...@@ -89,7 +89,7 @@ void UpdateClientImpl::Install(const std::string& id, ...@@ -89,7 +89,7 @@ void UpdateClientImpl::Install(const std::string& id,
const CompletionCallback& completion_callback) { const CompletionCallback& completion_callback) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
if (update_engine_->IsUpdating(id)) { if (IsUpdating(id)) {
completion_callback.Run(Error::ERROR_UPDATE_IN_PROGRESS); completion_callback.Run(Error::ERROR_UPDATE_IN_PROGRESS);
return; return;
} }
...@@ -104,6 +104,7 @@ void UpdateClientImpl::Install(const std::string& id, ...@@ -104,6 +104,7 @@ void UpdateClientImpl::Install(const std::string& id,
scoped_ptr<TaskUpdate> task(new TaskUpdate(update_engine_.get(), true, ids, scoped_ptr<TaskUpdate> task(new TaskUpdate(update_engine_.get(), true, ids,
crx_data_callback, callback)); crx_data_callback, callback));
// Install tasks are run concurrently and never queued up.
auto it = tasks_.insert(task.release()).first; auto it = tasks_.insert(task.release()).first;
RunTask(*it); RunTask(*it);
} }
...@@ -118,6 +119,8 @@ void UpdateClientImpl::Update(const std::vector<std::string>& ids, ...@@ -118,6 +119,8 @@ void UpdateClientImpl::Update(const std::vector<std::string>& ids,
scoped_ptr<TaskUpdate> task(new TaskUpdate(update_engine_.get(), false, ids, scoped_ptr<TaskUpdate> task(new TaskUpdate(update_engine_.get(), false, ids,
crx_data_callback, callback)); crx_data_callback, callback));
// If no other tasks are running at the moment, run this update task.
// Otherwise, queue the task up.
if (tasks_.empty()) { if (tasks_.empty()) {
auto it = tasks_.insert(task.release()).first; auto it = tasks_.insert(task.release()).first;
RunTask(*it); RunTask(*it);
...@@ -145,7 +148,9 @@ void UpdateClientImpl::OnTaskComplete( ...@@ -145,7 +148,9 @@ void UpdateClientImpl::OnTaskComplete(
tasks_.erase(task); tasks_.erase(task);
delete task; delete task;
if (!task_queue_.empty()) { // Pick up a task from the queue if the queue has pending tasks and no other
// task is running.
if (tasks_.empty() && !task_queue_.empty()) {
RunTask(task_queue_.front()); RunTask(task_queue_.front());
task_queue_.pop(); task_queue_.pop();
} }
...@@ -173,7 +178,16 @@ bool UpdateClientImpl::GetCrxUpdateState(const std::string& id, ...@@ -173,7 +178,16 @@ bool UpdateClientImpl::GetCrxUpdateState(const std::string& id,
} }
bool UpdateClientImpl::IsUpdating(const std::string& id) const { bool UpdateClientImpl::IsUpdating(const std::string& id) const {
return update_engine_->IsUpdating(id); DCHECK(thread_checker_.CalledOnValidThread());
for (const auto& task : tasks_) {
const auto ids(task->GetIds());
if (std::find(std::begin(ids), std::end(ids), id) != std::end(ids)) {
return true;
}
}
return false;
} }
scoped_refptr<UpdateClient> UpdateClientFactory( scoped_refptr<UpdateClient> UpdateClientFactory(
......
...@@ -262,18 +262,27 @@ class UpdateClient : public base::RefCounted<UpdateClient> { ...@@ -262,18 +262,27 @@ class UpdateClient : public base::RefCounted<UpdateClient> {
// the observers are being notified. // the observers are being notified.
virtual void RemoveObserver(Observer* observer) = 0; virtual void RemoveObserver(Observer* observer) = 0;
// Installs the specified CRX. Calls back after the install has been handled. // Installs the specified CRX. Calls back on |completion_callback| after the
// Calls back on |completion_callback| after the update has been handled. The // update has been handled. The |error| parameter of the |completion_callback|
// |error| parameter of the |completion_callback| contains an error code in // contains an error code in the case of a run-time error, or 0 if the
// the case of a run-time error, or 0 if the Install has been handled // install has been handled successfully. Overlapping calls of this function
// successfully. // are executed concurrently, as long as the id parameter is different,
// meaning that installs of different components are parallelized.
// The |Install| function is intended to be used for foreground installs of
// one CRX. These cases are usually associated with on-demand install
// scenarios, which are triggered by user actions. Installs are never
// queued up.
virtual void Install(const std::string& id, virtual void Install(const std::string& id,
const CrxDataCallback& crx_data_callback, const CrxDataCallback& crx_data_callback,
const CompletionCallback& completion_callback) = 0; const CompletionCallback& completion_callback) = 0;
// Updates the specified CRXs. Calls back on |crx_data_callback| before the // Updates the specified CRXs. Calls back on |crx_data_callback| before the
// update is attempted to give the caller the opportunity to provide the // update is attempted to give the caller the opportunity to provide the
// instances of CrxComponent to be used for this update. // instances of CrxComponent to be used for this update. The |Update| function
// is intended to be used for background updates of several CRXs. Overlapping
// calls to this function result in a queuing behavior, and the execution
// of each call is serialized. In addition, updates are always queued up when
// installs are running.
virtual void Update(const std::vector<std::string>& ids, virtual void Update(const std::vector<std::string>& ids,
const CrxDataCallback& crx_data_callback, const CrxDataCallback& crx_data_callback,
const CompletionCallback& completion_callback) = 0; const CompletionCallback& completion_callback) = 0;
...@@ -284,6 +293,7 @@ class UpdateClient : public base::RefCounted<UpdateClient> { ...@@ -284,6 +293,7 @@ class UpdateClient : public base::RefCounted<UpdateClient> {
virtual bool GetCrxUpdateState(const std::string& id, virtual bool GetCrxUpdateState(const std::string& id,
CrxUpdateItem* update_item) const = 0; CrxUpdateItem* update_item) const = 0;
// Returns true if the |id| is found in any running task.
virtual bool IsUpdating(const std::string& id) const = 0; virtual bool IsUpdating(const std::string& id) const = 0;
protected: protected:
......
...@@ -65,10 +65,17 @@ class UpdateClientImpl : public UpdateClient { ...@@ -65,10 +65,17 @@ class UpdateClientImpl : public UpdateClient {
scoped_refptr<Configurator> config_; scoped_refptr<Configurator> config_;
// Contains the tasks that are queued up. // Contains the tasks that are pending. In the current implementation,
// only update tasks (background tasks) are queued up. These tasks are
// pending while they are in this queue. They are not being handled for
// the moment.
std::queue<Task*> task_queue_; std::queue<Task*> task_queue_;
// Contains all tasks in progress. // Contains all tasks in progress. These are the tasks that the update engine
// is executing at one moment. Install tasks are run concurrently, update
// tasks are always serialized, and update tasks are queued up if install
// tasks are running. In addition, concurrent install tasks for the same id
// are not allowed.
std::set<Task*> tasks_; std::set<Task*> tasks_;
// TODO(sorin): try to make the ping manager an observer of the service. // TODO(sorin): try to make the ping manager an observer of the service.
......
...@@ -1974,6 +1974,120 @@ TEST_F(UpdateClientTest, OneCrxInstall) { ...@@ -1974,6 +1974,120 @@ TEST_F(UpdateClientTest, OneCrxInstall) {
StopWorkerPool(); StopWorkerPool();
} }
// Tests that overlapping installs of the same CRX result in an error.
TEST_F(UpdateClientTest, ConcurrentInstallSameCRX) {
class DataCallbackFake {
public:
static void Callback(const std::vector<std::string>& ids,
std::vector<CrxComponent>* components) {
CrxComponent crx;
crx.name = "test_jebg";
crx.pk_hash.assign(jebg_hash, jebg_hash + arraysize(jebg_hash));
crx.version = Version("0.0");
crx.installer = new TestInstaller;
components->push_back(crx);
}
};
class CompletionCallbackFake {
public:
static void Callback(const base::Closure& quit_closure, int error) {
static int num_call = 0;
++num_call;
EXPECT_LE(num_call, 2);
if (num_call == 1) {
EXPECT_EQ(Error::ERROR_UPDATE_IN_PROGRESS, error);
return;
}
if (num_call == 2) {
EXPECT_EQ(0, error);
quit_closure.Run();
}
}
};
class FakeUpdateChecker : public UpdateChecker {
public:
static scoped_ptr<UpdateChecker> Create(const Configurator& config) {
return scoped_ptr<UpdateChecker>(new FakeUpdateChecker());
}
bool CheckForUpdates(
const std::vector<CrxUpdateItem*>& items_to_check,
const std::string& additional_attributes,
const UpdateCheckCallback& update_check_callback) override {
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(update_check_callback, GURL(), 0, "",
UpdateResponse::Results()));
return true;
}
};
class FakeCrxDownloader : public CrxDownloader {
public:
static scoped_ptr<CrxDownloader> Create(
bool is_background_download,
net::URLRequestContextGetter* context_getter,
const scoped_refptr<base::SequencedTaskRunner>&
url_fetcher_task_runner) {
return scoped_ptr<CrxDownloader>(new FakeCrxDownloader());
}
private:
FakeCrxDownloader() : CrxDownloader(scoped_ptr<CrxDownloader>().Pass()) {}
~FakeCrxDownloader() override {}
void DoStartDownload(const GURL& url) override { EXPECT_TRUE(false); }
};
class FakePingManager : public FakePingManagerImpl {
public:
explicit FakePingManager(const Configurator& config)
: FakePingManagerImpl(config) {}
~FakePingManager() override { EXPECT_TRUE(items().empty()); }
};
scoped_ptr<FakePingManager> ping_manager(new FakePingManager(*config()));
scoped_refptr<UpdateClient> update_client(new UpdateClientImpl(
config(), ping_manager.Pass(), &FakeUpdateChecker::Create,
&FakeCrxDownloader::Create));
// Verify that calling Install sets ondemand.
OnDemandTester ondemand_tester(update_client, true);
MockObserver observer;
ON_CALL(observer, OnEvent(_, _))
.WillByDefault(Invoke(&ondemand_tester, &OnDemandTester::CheckOnDemand));
EXPECT_CALL(observer, OnEvent(Events::COMPONENT_CHECKING_FOR_UPDATES,
"jebgalgnebhfojomionfpkfelancnnkf"))
.Times(1);
EXPECT_CALL(observer, OnEvent(Events::COMPONENT_NOT_UPDATED,
"jebgalgnebhfojomionfpkfelancnnkf"))
.Times(1);
update_client->AddObserver(&observer);
update_client->Install(
std::string("jebgalgnebhfojomionfpkfelancnnkf"),
base::Bind(&DataCallbackFake::Callback),
base::Bind(&CompletionCallbackFake::Callback, quit_closure()));
update_client->Install(
std::string("jebgalgnebhfojomionfpkfelancnnkf"),
base::Bind(&DataCallbackFake::Callback),
base::Bind(&CompletionCallbackFake::Callback, quit_closure()));
RunThreads();
update_client->RemoveObserver(&observer);
StopWorkerPool();
}
// Make sure that we don't get any crashes when trying to update an empty list // Make sure that we don't get any crashes when trying to update an empty list
// of ids. // of ids.
TEST_F(UpdateClientTest, EmptyIdList) { TEST_F(UpdateClientTest, EmptyIdList) {
......
...@@ -60,20 +60,6 @@ UpdateEngine::~UpdateEngine() { ...@@ -60,20 +60,6 @@ UpdateEngine::~UpdateEngine() {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
} }
bool UpdateEngine::IsUpdating(const std::string& id) const {
DCHECK(thread_checker_.CalledOnValidThread());
for (const auto& context : update_contexts_) {
const auto& ids = context->ids;
const auto it = std::find_if(
ids.begin(), ids.end(),
[id](const std::string& this_id) { return id == this_id; });
if (it != ids.end()) {
return true;
}
}
return false;
}
bool UpdateEngine::GetUpdateState(const std::string& id, bool UpdateEngine::GetUpdateState(const std::string& id,
CrxUpdateItem* update_item) { CrxUpdateItem* update_item) {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
......
...@@ -53,9 +53,6 @@ class UpdateEngine { ...@@ -53,9 +53,6 @@ class UpdateEngine {
const NotifyObserversCallback& notify_observers_callback); const NotifyObserversCallback& notify_observers_callback);
~UpdateEngine(); ~UpdateEngine();
// Returns true is the CRX identified by the given |id| is being updated.
bool IsUpdating(const std::string& id) const;
bool GetUpdateState(const std::string& id, CrxUpdateItem* update_state); bool GetUpdateState(const std::string& id, CrxUpdateItem* update_state);
void Update(bool is_foreground, void Update(bool is_foreground,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment