Commit fc7a29d9 authored by Leonid Baraz's avatar Leonid Baraz Committed by Commit Bot

Another fix for the issue that made tests flaky.

Bug: 1133962
Bug: b:169427520
Change-Id: I899a35654fbbc91763b46e54350a447d17834581
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2446451
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Reviewed-by: default avatarZach Trudo <zatrudo@google.com>
Cr-Commit-Position: refs/heads/master@{#813435}
parent c61d14b4
...@@ -54,6 +54,40 @@ namespace reporting { ...@@ -54,6 +54,40 @@ namespace reporting {
using DmServerUploader = DmServerUploadService::DmServerUploader; using DmServerUploader = DmServerUploadService::DmServerUploader;
using ::policy::CloudPolicyClient; using ::policy::CloudPolicyClient;
namespace {
// Thread-safe helper callback class: calls callback once |Decrement|
// is invoked |count| times and then self-destructs. |Increment| can be
// called at any time, provided that the counter has not dropped to 0 yet.
class CollectorCallback {
public:
CollectorCallback(size_t count, base::OnceClosure done_cb)
: count_(count), done_cb_(std::move(done_cb)) {
DCHECK_GT(count, 0u);
}
CollectorCallback(CollectorCallback& other) = delete;
CollectorCallback& operator=(CollectorCallback& other) = delete;
~CollectorCallback() { std::move(done_cb_).Run(); }
void Decrement() {
size_t old_count = count_.fetch_sub(1);
DCHECK_GT(old_count, 0u);
if (old_count > 1) {
return;
}
delete this;
}
void Increment() {
size_t old_count = count_.fetch_add(1);
DCHECK_GT(old_count, 0u) << "Cannot increment if already 0";
}
private:
std::atomic<size_t> count_;
base::OnceClosure done_cb_;
};
} // namespace
DmServerUploadService::RecordHandler::RecordHandler(CloudPolicyClient* client) DmServerUploadService::RecordHandler::RecordHandler(CloudPolicyClient* client)
: client_(client) {} : client_(client) {}
...@@ -122,14 +156,24 @@ void DmServerUploader::HandleRecords() { ...@@ -122,14 +156,24 @@ void DmServerUploader::HandleRecords() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Status handle_status = Status::StatusOK(); Status handle_status = Status::StatusOK();
// Set collector to count 1; execution_cb will increment it
// every time |execution_cb| posts a task, and then callback
// will decrement it. Self-destructs when dropping to 0.
CollectorCallback* const collector = new CollectorCallback(
/*count=*/1, base::BindOnce(&DmServerUploader::OnRecordsHandled,
base::Unretained(this)));
// TODO(chromium:1078512) Cannot verify client state on this thread. Find a // TODO(chromium:1078512) Cannot verify client state on this thread. Find a
// way to do that and restructure this loop to handle it. // way to do that and restructure this loop to handle it.
// Passing raw |record_infos_| pointer is safe since record_infos will not die // Passing raw |record_infos_| pointer is safe since record_infos will not die
// until after handlers_ does. // until after handlers_ does.
auto done_cb = base::BindRepeating(&CollectorCallback::Decrement,
base::Unretained(collector));
auto execution_cb = base::BindRepeating( auto execution_cb = base::BindRepeating(
[](std::vector<RecordInfo>* record_infos, [](std::vector<RecordInfo>* record_infos,
base::RepeatingCallback<void(const SequencingInformation&)> base::RepeatingCallback<void(const SequencingInformation&)>
add_successfull_upload_cb, add_successfull_upload_cb,
CollectorCallback* collector,
std::unique_ptr<RecordHandler>& record_handler) { std::unique_ptr<RecordHandler>& record_handler) {
for (auto record_info_it = record_infos->begin(); for (auto record_info_it = record_infos->begin();
record_info_it != record_infos->end();) { record_info_it != record_infos->end();) {
...@@ -139,6 +183,7 @@ void DmServerUploader::HandleRecords() { ...@@ -139,6 +183,7 @@ void DmServerUploader::HandleRecords() {
// Record was successfully handled - mark it as such and move on to // Record was successfully handled - mark it as such and move on to
// the next record. // the next record.
if (handle_status.ok()) { if (handle_status.ok()) {
collector->Increment();
add_successfull_upload_cb.Run( add_successfull_upload_cb.Run(
record_info_it->sequencing_information); record_info_it->sequencing_information);
...@@ -151,7 +196,8 @@ void DmServerUploader::HandleRecords() { ...@@ -151,7 +196,8 @@ void DmServerUploader::HandleRecords() {
}, },
&record_infos_, &record_infos_,
base::BindRepeating(&DmServerUploader::AddSuccessfulUpload, base::BindRepeating(&DmServerUploader::AddSuccessfulUpload,
base::Unretained(this))); base::Unretained(this), done_cb),
base::Unretained(collector));
auto predicate_cb = base::BindRepeating( auto predicate_cb = base::BindRepeating(
[](std::vector<RecordInfo>* record_infos, [](std::vector<RecordInfo>* record_infos,
...@@ -160,11 +206,10 @@ void DmServerUploader::HandleRecords() { ...@@ -160,11 +206,10 @@ void DmServerUploader::HandleRecords() {
}, },
&record_infos_); &record_infos_);
handlers_->ExecuteOnEachElement( handlers_->ExecuteOnEachElement(std::move(execution_cb),
std::move(execution_cb), base::BindOnce(&CollectorCallback::Decrement,
base::BindOnce(&DmServerUploader::OnRecordsHandled, base::Unretained(collector)),
base::Unretained(this)), std::move(predicate_cb));
std::move(predicate_cb));
} }
void DmServerUploader::OnRecordsHandled() { void DmServerUploader::OnRecordsHandled() {
...@@ -211,15 +256,28 @@ Status DmServerUploader::IsRecordValid( ...@@ -211,15 +256,28 @@ Status DmServerUploader::IsRecordValid(
} }
void DmServerUploader::AddSuccessfulUpload( void DmServerUploader::AddSuccessfulUpload(
base::RepeatingClosure done_cb,
const SequencingInformation& sequencing_information) { const SequencingInformation& sequencing_information) {
Schedule(&DmServerUploader::ProcessSuccessfulUploadAddition, Schedule(&DmServerUploader::ProcessSuccessfulUploadAddition,
base::Unretained(this), sequencing_information); base::Unretained(this), std::move(done_cb), sequencing_information);
} }
void DmServerUploader::ProcessSuccessfulUploadAddition( void DmServerUploader::ProcessSuccessfulUploadAddition(
base::RepeatingClosure done_cb,
SequencingInformation sequencing_information) { SequencingInformation sequencing_information) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Auto-call done_cb when returning.
class CleanUp {
public:
explicit CleanUp(base::RepeatingClosure done_cb)
: done_cb_(std::move(done_cb)) {}
~CleanUp() { std::move(done_cb_).Run(); }
private:
base::RepeatingClosure done_cb_;
} clean_up(std::move(done_cb));
// If this is the first successful record - set highest to this record. // If this is the first successful record - set highest to this record.
if (!highest_successful_sequence_.has_value()) { if (!highest_successful_sequence_.has_value()) {
highest_successful_sequence_ = sequencing_information; highest_successful_sequence_ = sequencing_information;
...@@ -317,33 +375,6 @@ Status DmServerUploadService::EnqueueUpload( ...@@ -317,33 +375,6 @@ Status DmServerUploadService::EnqueueUpload(
return Status::StatusOK(); return Status::StatusOK();
} }
namespace {
class CollectorCallback {
public:
CollectorCallback(size_t count, base::OnceClosure done_cb)
: count_(count), done_cb_(std::move(done_cb)) {
DCHECK_GT(count, 0u);
}
CollectorCallback(CollectorCallback& other) = delete;
CollectorCallback& operator=(CollectorCallback& other) = delete;
~CollectorCallback() { std::move(done_cb_).Run(); }
void Decrement() {
size_t old_count = count_.fetch_sub(1);
DCHECK_GT(old_count, 0u);
if (old_count > 1) {
return;
}
delete this;
}
private:
std::atomic<size_t> count_;
base::OnceClosure done_cb_;
};
} // namespace
void DmServerUploadService::InitRecordHandlers( void DmServerUploadService::InitRecordHandlers(
std::unique_ptr<DmServerUploadService> uploader, std::unique_ptr<DmServerUploadService> uploader,
#ifdef OS_CHROMEOS #ifdef OS_CHROMEOS
......
...@@ -116,10 +116,12 @@ class DmServerUploadService { ...@@ -116,10 +116,12 @@ class DmServerUploadService {
// Helper function for tracking the highest sequencing information per // Helper function for tracking the highest sequencing information per
// generation id. Schedules ProcessSuccessfulUploadAddition. // generation id. Schedules ProcessSuccessfulUploadAddition.
void AddSuccessfulUpload( void AddSuccessfulUpload(
base::RepeatingClosure done_cb,
const SequencingInformation& sequencing_information); const SequencingInformation& sequencing_information);
// Processes successful uploads on sequence. // Processes successful uploads on sequence.
void ProcessSuccessfulUploadAddition( void ProcessSuccessfulUploadAddition(
base::RepeatingClosure done_cb,
SequencingInformation sequencing_information); SequencingInformation sequencing_information);
std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records_; std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records_;
......
...@@ -72,53 +72,46 @@ TEST(DmServerUploadServiceTest, DeniesNullptrProfile) { ...@@ -72,53 +72,46 @@ TEST(DmServerUploadServiceTest, DeniesNullptrProfile) {
class TestCallbackWaiter { class TestCallbackWaiter {
public: public:
TestCallbackWaiter() TestCallbackWaiter() : run_loop_(std::make_unique<base::RunLoop>()) {}
: completed_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {}
void CompleteExpectSuccess( void CompleteExpectSuccess(
DmServerUploadService::CompletionResponse response) { DmServerUploadService::CompletionResponse response) {
DCHECK(!completed_.IsSignaled());
EXPECT_TRUE(response.ok()); EXPECT_TRUE(response.ok());
completed_.Signal(); run_loop_->Quit();
} }
void CompleteExpectUnimplemented( void CompleteExpectUnimplemented(
DmServerUploadService::CompletionResponse response) { DmServerUploadService::CompletionResponse response) {
DCHECK(!completed_.IsSignaled());
EXPECT_FALSE(response.ok()); EXPECT_FALSE(response.ok());
EXPECT_EQ(response.status().error_code(), error::UNIMPLEMENTED); EXPECT_EQ(response.status().error_code(), error::UNIMPLEMENTED);
completed_.Signal(); run_loop_->Quit();
} }
void CompleteExpectInvalidArgument( void CompleteExpectInvalidArgument(
DmServerUploadService::CompletionResponse response) { DmServerUploadService::CompletionResponse response) {
DCHECK(!completed_.IsSignaled());
EXPECT_FALSE(response.ok()); EXPECT_FALSE(response.ok());
EXPECT_EQ(response.status().error_code(), error::INVALID_ARGUMENT); EXPECT_EQ(response.status().error_code(), error::INVALID_ARGUMENT);
completed_.Signal(); run_loop_->Quit();
} }
void CompleteExpectFailedPrecondition( void CompleteExpectFailedPrecondition(
DmServerUploadService::CompletionResponse response) { DmServerUploadService::CompletionResponse response) {
DCHECK(!completed_.IsSignaled());
EXPECT_FALSE(response.ok()); EXPECT_FALSE(response.ok());
EXPECT_EQ(response.status().error_code(), error::FAILED_PRECONDITION); EXPECT_EQ(response.status().error_code(), error::FAILED_PRECONDITION);
completed_.Signal(); run_loop_->Quit();
} }
void CompleteExpectDeadlineExceeded( void CompleteExpectDeadlineExceeded(
DmServerUploadService::CompletionResponse response) { DmServerUploadService::CompletionResponse response) {
DCHECK(!completed_.IsSignaled());
EXPECT_FALSE(response.ok()); EXPECT_FALSE(response.ok());
EXPECT_EQ(response.status().error_code(), error::DEADLINE_EXCEEDED); EXPECT_EQ(response.status().error_code(), error::DEADLINE_EXCEEDED);
completed_.Signal(); run_loop_->Quit();
} }
void Wait() { completed_.Wait(); } void Wait() { run_loop_->Run(); }
private: private:
base::WaitableEvent completed_; std::unique_ptr<base::RunLoop> run_loop_;
}; };
class TestRecordHandler : public DmServerUploadService::RecordHandler { class TestRecordHandler : public DmServerUploadService::RecordHandler {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment