Commit cbcb5ba6 authored by Bailey Berro's avatar Bailey Berro Committed by Commit Bot

Add abort functionality to SmbTaskQueue

This change adds Abort functionality to SmbTaskQueue. The unrun tasks for
a given operation can be aborted via SmbTaskQueue::AbortOperation().

Test=unittest

Bug: chromium:757625
Change-Id: I317cc95556e4952a2c7049ad2cdf358d3058147b
Reviewed-on: https://chromium-review.googlesource.com/956260
Commit-Queue: Bailey Berro <baileyberro@chromium.org>
Reviewed-by: default avatarZentaro Kavanagh <zentaro@chromium.org>
Cr-Commit-Position: refs/heads/master@{#542915}
parent 73a8a52f
......@@ -13,8 +13,18 @@ namespace smb_client {
SmbTaskQueue::SmbTaskQueue(size_t max_pending) : max_pending_(max_pending) {}
SmbTaskQueue::~SmbTaskQueue() {}
void SmbTaskQueue::AddTask(SmbTask task) {
operations_.push(std::move(task));
OperationId SmbTaskQueue::GetNextOperationId() {
return next_operation_id++;
}
void SmbTaskQueue::AddTask(SmbTask task, OperationId operation_id) {
DCHECK(IsValidOperationId(operation_id));
if (!operation_map_.count(operation_id)) {
operations_.push(operation_id);
}
operation_map_[operation_id].push(std::move(task));
RunTaskIfNeccessary();
}
......@@ -24,7 +34,14 @@ void SmbTaskQueue::TaskFinished() {
RunTaskIfNeccessary();
}
void SmbTaskQueue::AbortOperation(OperationId operation_id) {
DCHECK(IsValidOperationId(operation_id));
operation_map_.erase(operation_id);
}
void SmbTaskQueue::RunTaskIfNeccessary() {
PruneOperationQueue();
if (IsCapacityToRunTask() && IsTaskToRun()) {
RunNextTask();
......@@ -37,9 +54,19 @@ void SmbTaskQueue::RunTaskIfNeccessary() {
SmbTask SmbTaskQueue::GetNextTask() {
DCHECK(IsTaskToRun());
const OperationId operation_id = operations_.front();
DCHECK(operation_map_.count(operation_id));
auto& queue = operation_map_.find(operation_id)->second;
SmbTask next_task = std::move(queue.front());
queue.pop();
if (queue.empty()) {
operation_map_.erase(operation_id);
operations_.pop();
}
SmbTask next_task = std::move(operations_.front());
operations_.pop();
return next_task;
}
......@@ -50,7 +77,18 @@ void SmbTaskQueue::RunNextTask() {
GetNextTask().Run();
}
void SmbTaskQueue::PruneOperationQueue() {
while (!IsPruned()) {
operations_.pop();
}
}
bool SmbTaskQueue::IsPruned() const {
return (operations_.empty() || operation_map_.count(operations_.front()));
}
bool SmbTaskQueue::IsTaskToRun() const {
DCHECK(IsPruned());
return !operations_.empty();
}
......@@ -58,5 +96,9 @@ bool SmbTaskQueue::IsCapacityToRunTask() const {
return num_pending_ < max_pending_;
}
bool SmbTaskQueue::IsValidOperationId(OperationId operation_id) const {
return operation_id < next_operation_id;
}
} // namespace smb_client
} // namespace chromeos
......@@ -15,6 +15,7 @@ namespace smb_client {
// An SmbTask is a call to SmbProviderClient with a bound SmbFileSystem callback
// that runs when SmbProviderClient receives a D-Bus message response.
using SmbTask = base::OnceClosure;
using OperationId = uint32_t;
// SmbTaskQueue handles the queuing of SmbTasks. Tasks are 'pending' while
// SmbProviderClient awaits a D-Bus Message Response. Tasks are added to
......@@ -24,8 +25,8 @@ using SmbTask = base::OnceClosure;
//
// Example:
//
// void CreateDirectory(FilePath directory_path, bool recursive,
// StatusCallback callback) {
// AbortCallback CreateDirectory(FilePath directory_path, bool recursive,
// StatusCallback callback) {
// auto reply = base::BindOnce(&SmbFileSystem::HandleStatusCallback,
// AsWeakPtr(), callback);
//
......@@ -35,7 +36,10 @@ using SmbTask = base::OnceClosure;
// directory_path,
// recursive,
// std::move(reply));
// OperationId operation_id = 1;
// tq_.AddTask(std::move(task));
//
// return CreateAbortCallbackForOperationId(operation_id);
// }
//
// void HandleStatusCallback(StatusCallback callback, ErrorType error) {
......@@ -44,10 +48,20 @@ using SmbTask = base::OnceClosure;
// }
class SmbTaskQueue {
public:
// Adds |task| to the task queue. If fewer that max_pending_ tasks are
// outstanding, |task| will run immediately. Otherwise, it will be added to
// operations_ and run in FIFO order.
void AddTask(SmbTask task);
// Provides the caller with a new OperationId to associate new tasks with.
OperationId GetNextOperationId();
// Adds the sub-task |task| for the Operation |operation_id| to the task
// queue. If fewer that max_pending_ tasks are outstanding, |task| will run
// immediately. Otherwise, it will be added to operations_ and run in FIFO
// order.
void AddTask(SmbTask task, OperationId operation_id);
// Attempts to abort any outstanding tasks associated with the operation
// |operation_id|. Any subtasks that have not been sent over to D-Bus to the
// Smb Daemon will be cancelled, and OperationId will be removed from
// operation_map_.
void AbortOperation(OperationId operation_id);
// Must be called by the owner of this class to indicate that a response to a
// task was received.
......@@ -57,6 +71,8 @@ class SmbTaskQueue {
~SmbTaskQueue();
private:
using TaskList = base::queue<SmbTask>;
// This runs the next task in operations_ if there is capacity to run an
// additional task, and a task remaing to run.
void RunTaskIfNeccessary();
......@@ -67,16 +83,30 @@ class SmbTaskQueue {
// Helper method that runs the next task.
void RunNextTask();
// Prunes operations_ by removing OperationIds from the front of the queue if
// there are no tasks associated with them.
void PruneOperationQueue();
// Helper method that returns whether operations_ has been pruned.
bool IsPruned() const;
// Helper method that returns whether there are tasks in operations_ to run.
// operations_ must be pruned (i.e. the top Operation in operations_ must have
// atleast 1 task associated with it).
bool IsTaskToRun() const;
// Helper method that returns whether there are fewer than max_pending tasks
// outstanding.
bool IsCapacityToRunTask() const;
// Helper method that returns whether |operation_id| is valid.
bool IsValidOperationId(OperationId operation_id) const;
const size_t max_pending_;
size_t num_pending_ = 0;
base::queue<SmbTask> operations_;
OperationId next_operation_id = 0;
base::queue<OperationId> operations_;
std::map<OperationId, TaskList> operation_map_;
DISALLOW_COPY_AND_ASSIGN(SmbTaskQueue);
};
......
......@@ -28,15 +28,26 @@ class SmbTaskQueueTest : public testing::Test {
~SmbTaskQueueTest() override = default;
protected:
// Creates and adds the task with |task_id| to task_queue_.
// Calls SmbTaskQueue::GetNextOperationId().
OperationId GetOperationId() { return task_queue_.GetNextOperationId(); }
// Creates and adds a task with |task_id| and a default operation
// id. |task_id| is used for testing to manually control when tasks finish.
void CreateAndAddTask(uint32_t task_id) {
const OperationId operation_id = task_queue_.GetNextOperationId();
CreateAndAddTask(task_id, operation_id);
}
// Creates and adds a task with |task_id| for the corresponding
// |operation_id|.
void CreateAndAddTask(uint32_t task_id, OperationId operation_id) {
base::OnceClosure reply =
base::BindOnce(&SmbTaskQueueTest::OnReply, base::Unretained(this));
SmbTask task =
base::BindOnce(&SmbTaskQueueTest::Start, base::Unretained(this),
task_id, std::move(reply));
task_queue_.AddTask(std::move(task));
task_queue_.AddTask(std::move(task), operation_id);
}
// Checks whether the task |task_id| is pending.
......@@ -55,6 +66,11 @@ class SmbTaskQueueTest : public testing::Test {
// Returns the number of pending tasks.
size_t PendingCount() const { return pending_.size(); }
// Calls AbortOperation with |operation_id| on task_queue_.
void Abort(OperationId operation_id) {
task_queue_.AbortOperation(operation_id);
}
private:
void OnReply() { task_queue_.TaskFinished(); }
......@@ -131,5 +147,101 @@ TEST_F(SmbTaskQueueTest, TaskQueueDoesNotRunAdditionalTestsWhenFull) {
EXPECT_TRUE(IsPending(task_id_4));
}
// AbortOperation removes all the tasks corresponding to an operation that has
// not begin to run yet.
TEST_F(SmbTaskQueueTest, AbortOperationRemovesAllTasksOfUnrunOperation) {
// Saturate the SmbTaskQueue with tasks.
const uint32_t filler_task_1 = 1;
const uint32_t filler_task_2 = 2;
const uint32_t filler_task_3 = 3;
CreateAndAddTask(filler_task_1);
CreateAndAddTask(filler_task_2);
CreateAndAddTask(filler_task_3);
// Create some tasks coresponding to a specific operation_id.
const OperationId operation_id = GetOperationId();
const uint32_t task_to_cancel_1 = 101;
const uint32_t task_to_cancel_2 = 102;
const uint32_t task_to_cancel_3 = 103;
CreateAndAddTask(task_to_cancel_1, operation_id);
CreateAndAddTask(task_to_cancel_2, operation_id);
CreateAndAddTask(task_to_cancel_3, operation_id);
// The filler tasks should be pending, the tasks to cancel should not be.
EXPECT_EQ(kTaskQueueCapacity, PendingCount());
EXPECT_TRUE(IsPending(filler_task_1));
EXPECT_TRUE(IsPending(filler_task_2));
EXPECT_TRUE(IsPending(filler_task_3));
EXPECT_FALSE(IsPending(task_to_cancel_1));
EXPECT_FALSE(IsPending(task_to_cancel_2));
EXPECT_FALSE(IsPending(task_to_cancel_3));
// Aborting operation_id and completeing the filler tasks should not run
// the task_to_cancel's.
Abort(operation_id);
EXPECT_EQ(kTaskQueueCapacity, PendingCount());
CompleteTask(filler_task_1);
EXPECT_EQ(2u, PendingCount());
CompleteTask(filler_task_2);
EXPECT_EQ(1u, PendingCount());
CompleteTask(filler_task_3);
EXPECT_EQ(0u, PendingCount());
EXPECT_FALSE(IsPending(task_to_cancel_1));
EXPECT_FALSE(IsPending(task_to_cancel_2));
EXPECT_FALSE(IsPending(task_to_cancel_3));
}
// AbortOperation aborts all the tasks correspoding to an operation that has
// some running tasks.
TEST_F(SmbTaskQueueTest, AbortOperationRemovesUnrunTasksOfRunningOperation) {
const uint32_t filler_task_1 = 1;
const uint32_t filler_task_2 = 2;
CreateAndAddTask(filler_task_1);
CreateAndAddTask(filler_task_2);
// Create some tasks coresponding to a specific operation_id.
const OperationId operation_id = GetOperationId();
const uint32_t task_to_cancel_1 = 101;
const uint32_t task_to_cancel_2 = 102;
const uint32_t task_to_cancel_3 = 103;
CreateAndAddTask(task_to_cancel_1, operation_id);
CreateAndAddTask(task_to_cancel_2, operation_id);
CreateAndAddTask(task_to_cancel_3, operation_id);
// The task queue should be running the maximum number of pending tasks,
// including the first task for operation_id. The remaining tasks for
// operation_id are not yet running.
EXPECT_EQ(kTaskQueueCapacity, PendingCount());
EXPECT_TRUE(IsPending(task_to_cancel_1));
EXPECT_FALSE(IsPending(task_to_cancel_2));
EXPECT_FALSE(IsPending(task_to_cancel_3));
// Aborting operation_id should not effect the status of task_to_cancel_1
// since it was already pending.
Abort(operation_id);
EXPECT_TRUE(IsPending(task_to_cancel_1));
// task_to_cancel_2 and task_to_cancel_3 should not run when the task queue
// has space capacity for them.
CompleteTask(filler_task_1);
CompleteTask(filler_task_2);
CompleteTask(task_to_cancel_1);
EXPECT_LT(PendingCount(), kTaskQueueCapacity);
EXPECT_FALSE(IsPending(task_to_cancel_2));
EXPECT_FALSE(IsPending(task_to_cancel_3));
}
} // namespace smb_client
} // namespace chromeos
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment