Commit 08193a9c authored by Zach Trudo's avatar Zach Trudo Committed by Chromium LUCI CQ

Enable bulk upload of records

Bug: chromium:1163560
Change-Id: I87dece32d57228f08bee6d11e66b7909618b0c79
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2611273
Commit-Queue: Zach Trudo <zatrudo@google.com>
Reviewed-by: default avatarLeonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#842750}
parent c76cd4b4
...@@ -74,7 +74,7 @@ class RecordHandlerImpl::ReportUploader ...@@ -74,7 +74,7 @@ class RecordHandlerImpl::ReportUploader
void OnStart() override; void OnStart() override;
void StartUpload(const EncryptedRecord& encrypted_record); void StartUpload();
void OnUploadComplete(base::Optional<base::Value> response); void OnUploadComplete(base::Optional<base::Value> response);
void HandleFailedUpload(); void HandleFailedUpload();
void HandleSuccessfulUpload(); void HandleSuccessfulUpload();
...@@ -159,26 +159,27 @@ void RecordHandlerImpl::ReportUploader::OnStart() { ...@@ -159,26 +159,27 @@ void RecordHandlerImpl::ReportUploader::OnStart() {
return; return;
} }
// We'll be popping records off the back. StartUpload();
std::reverse(records_->begin(), records_->end());
StartUpload(records_->back());
} }
void RecordHandlerImpl::ReportUploader::StartUpload( void RecordHandlerImpl::ReportUploader::StartUpload() {
const EncryptedRecord& encrypted_record) {
auto response_cb = auto response_cb =
base::BindOnce(&RecordHandlerImpl::ReportUploader::OnUploadComplete, base::BindOnce(&RecordHandlerImpl::ReportUploader::OnUploadComplete,
base::Unretained(this)); base::Unretained(this));
auto request_result = UploadEncryptedReportingRequestBuilder request_builder{need_encryption_key_};
UploadEncryptedReportingRequestBuilder(need_encryption_key_) for (const auto& record : *records_) {
.AddRecord(encrypted_record) request_builder.AddRecord(record);
.Build(); }
auto request_result = request_builder.Build();
if (!request_result.has_value()) { if (!request_result.has_value()) {
std::move(response_cb).Run(base::nullopt); std::move(response_cb).Run(base::nullopt);
return; return;
} }
// Records have been captured in the request, safe to clear the vector.
records_->clear();
base::Value request = std::move(request_result.value()); base::Value request = std::move(request_result.value());
base::PostTask( base::PostTask(
...@@ -208,18 +209,12 @@ void RecordHandlerImpl::ReportUploader::OnUploadComplete( ...@@ -208,18 +209,12 @@ void RecordHandlerImpl::ReportUploader::OnUploadComplete(
} }
void RecordHandlerImpl::ReportUploader::HandleFailedUpload() { void RecordHandlerImpl::ReportUploader::HandleFailedUpload() {
Status data_loss = Status(
error::DATA_LOSS,
base::StrCat({"Record failed uploaded: ",
records_->back().sequencing_information().DebugString()}));
LOG(ERROR) << data_loss;
if (highest_sequencing_information_.has_value()) { if (highest_sequencing_information_.has_value()) {
Complete(std::move(highest_sequencing_information_.value())); Complete(std::move(highest_sequencing_information_.value()));
return; return;
} }
Complete(data_loss); Complete(Status(error::INTERNAL, "Unable to upload any records"));
} }
void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() { void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
...@@ -275,7 +270,7 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() { ...@@ -275,7 +270,7 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
} }
} }
// Check if the previous record was unprocessable on the server. // Check if a record was unprocessable on the server.
const base::Value* failed_uploaded_record = last_response_.FindDictPath( const base::Value* failed_uploaded_record = last_response_.FindDictPath(
"firstFailedUploadedRecord.failedUploadedRecord"); "firstFailedUploadedRecord.failedUploadedRecord");
if (failed_uploaded_record != nullptr) { if (failed_uploaded_record != nullptr) {
...@@ -289,20 +284,15 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() { ...@@ -289,20 +284,15 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
auto gap_record_result = auto gap_record_result =
HandleFailedUploadedSequencingInformation(*failed_uploaded_record); HandleFailedUploadedSequencingInformation(*failed_uploaded_record);
if (gap_record_result.has_value()) { if (gap_record_result.has_value()) {
gap_record_ = std::move(gap_record_result.value());
LOG(ERROR) << "Data Loss. Record was unprocessable by the server: " LOG(ERROR) << "Data Loss. Record was unprocessable by the server: "
<< *failed_uploaded_record; << *failed_uploaded_record;
StartUpload(gap_record_); records_->push_back(std::move(gap_record_result.value()));
return;
} }
} }
// Pop the last record that was processed.
records_->pop_back();
if (!records_->empty()) { if (!records_->empty()) {
// Upload the next record but do not request encryption key again. // Upload the next record but do not request encryption key again.
StartUpload(records_->back()); StartUpload();
return; return;
} }
......
...@@ -55,6 +55,12 @@ MATCHER_P(ValueEqualsProto, ...@@ -55,6 +55,12 @@ MATCHER_P(ValueEqualsProto,
return arg.ValueOrDie().SerializeAsString() == expected.SerializeAsString(); return arg.ValueOrDie().SerializeAsString() == expected.SerializeAsString();
} }
MATCHER_P(StatusOrErrorCodeEquals,
expected,
"Compares StatusOr<T>.status().error_code() to expected") {
return arg.status().error_code() == expected;
}
class TestCallbackWaiter { class TestCallbackWaiter {
public: public:
TestCallbackWaiter() = default; TestCallbackWaiter() = default;
...@@ -233,10 +239,9 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) { ...@@ -233,10 +239,9 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
constexpr uint64_t kGenerationId = 1234; constexpr uint64_t kGenerationId = 1234;
auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId); auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId);
TestCallbackWaiterWithCounter client_waiter{kNumTestRecords}; TestCallbackWaiter client_waiter;
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _)) EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumTestRecords) .WillOnce(WithArgs<0, 2>(
.WillRepeatedly(WithArgs<0, 2>(
Invoke([&client_waiter]( Invoke([&client_waiter](
base::Value request, base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) { policy::CloudPolicyClient::ResponseCallback callback) {
...@@ -246,8 +251,6 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) { ...@@ -246,8 +251,6 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
client_waiter.Signal(); client_waiter.Signal();
}))); })));
RecordHandlerImpl handler(client_.get());
StrictMock<TestEncryptionKeyAttached> encryption_key_attached; StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
StrictMock<TestCompletionResponder> responder; StrictMock<TestCompletionResponder> responder;
TestCallbackWaiter responder_waiter; TestCallbackWaiter responder_waiter;
...@@ -272,6 +275,7 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) { ...@@ -272,6 +275,7 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call, auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder)); base::Unretained(&responder));
RecordHandlerImpl handler(client_.get());
handler.HandleRecords(need_encryption_key(), std::move(test_records), handler.HandleRecords(need_encryption_key(), std::move(test_records),
std::move(responder_callback), std::move(responder_callback),
encryption_key_attached_callback); encryption_key_attached_callback);
...@@ -280,54 +284,27 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) { ...@@ -280,54 +284,27 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
responder_waiter.Wait(); responder_waiter.Wait();
} }
TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) { TEST_P(RecordHandlerImplTest, ReportsUploadFailure) {
uint64_t kNumSuccessfulUploads = 5;
uint64_t kNumTestRecords = 10; uint64_t kNumTestRecords = 10;
uint64_t kGenerationId = 1234; uint64_t kGenerationId = 1234;
auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId); auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId);
// Wait kNumSuccessfulUploads times + 1 for the failure. TestCallbackWaiter client_waiter;
TestCallbackWaiterWithCounter client_waiter{kNumSuccessfulUploads + 1}; EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.WillOnce(WithArgs<2>(Invoke(
{ [&client_waiter](
::testing::InSequence seq; base::OnceCallback<void(base::Optional<base::Value>)> callback) {
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _)) std::move(callback).Run(base::nullopt);
.Times(kNumSuccessfulUploads) client_waiter.Signal();
.WillRepeatedly(WithArgs<0, 2>( })));
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.WillOnce(WithArgs<2>(Invoke(
[&client_waiter](base::OnceCallback<void(
base::Optional<base::Value>)> callback) {
std::move(callback).Run(base::nullopt);
client_waiter.Signal();
})));
}
RecordHandlerImpl handler(client_.get());
StrictMock<TestCompletionResponder> responder; StrictMock<TestCompletionResponder> responder;
TestCallbackWaiter responder_waiter; TestCallbackWaiter responder_waiter;
EXPECT_CALL( EXPECT_CALL(responder, Call(StatusOrErrorCodeEquals(error::INTERNAL)))
responder,
Call(ValueEqualsProto(
(*test_records)[kNumSuccessfulUploads - 1].sequencing_information())))
.WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); })); .WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); }));
StrictMock<TestEncryptionKeyAttached> encryption_key_attached; StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
EXPECT_CALL( EXPECT_CALL(encryption_key_attached, Call(_)).Times(0);
encryption_key_attached,
Call(AllOf(Property(&SignedEncryptionInfo::public_asymmetric_key,
Not(IsEmpty())),
Property(&SignedEncryptionInfo::public_key_id, Gt(0)),
Property(&SignedEncryptionInfo::signature, Not(IsEmpty())))))
.Times(need_encryption_key() ? 1 : 0);
auto encryption_key_attached_callback = auto encryption_key_attached_callback =
base::BindRepeating(&TestEncryptionKeyAttached::Call, base::BindRepeating(&TestEncryptionKeyAttached::Call,
...@@ -336,6 +313,7 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) { ...@@ -336,6 +313,7 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) {
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call, auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder)); base::Unretained(&responder));
RecordHandlerImpl handler(client_.get());
handler.HandleRecords(need_encryption_key(), std::move(test_records), handler.HandleRecords(need_encryption_key(), std::move(test_records),
std::move(responder_callback), std::move(responder_callback),
encryption_key_attached_callback); encryption_key_attached_callback);
...@@ -345,29 +323,14 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) { ...@@ -345,29 +323,14 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) {
} }
TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) { TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) {
uint64_t kNumInitialSuccessfulUploads = 5;
uint64_t kNumTestRecords = 10; uint64_t kNumTestRecords = 10;
uint64_t kNumFinalSuccessfulUploads =
kNumTestRecords - kNumInitialSuccessfulUploads;
uint64_t kGenerationId = 1234; uint64_t kGenerationId = 1234;
auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId); auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId);
// Wait kNumTestRecords times + 1 for the failure. // Once for failure, and once for gap.
TestCallbackWaiterWithCounter client_waiter{kNumTestRecords + 1}; TestCallbackWaiterWithCounter client_waiter{2};
{ {
::testing::InSequence seq; ::testing::InSequence seq;
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumInitialSuccessfulUploads)
.WillRepeatedly(WithArgs<0, 2>(
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _)) EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.WillOnce(WithArgs<0, 2>( .WillOnce(WithArgs<0, 2>(
Invoke([&client_waiter]( Invoke([&client_waiter](
...@@ -379,8 +342,7 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) { ...@@ -379,8 +342,7 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) {
client_waiter.Signal(); client_waiter.Signal();
}))); })));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _)) EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumFinalSuccessfulUploads) .WillOnce(WithArgs<0, 2>(
.WillRepeatedly(WithArgs<0, 2>(
Invoke([&client_waiter]( Invoke([&client_waiter](
base::Value request, base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) { policy::CloudPolicyClient::ResponseCallback callback) {
...@@ -391,8 +353,6 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) { ...@@ -391,8 +353,6 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) {
}))); })));
} }
RecordHandlerImpl handler(client_.get());
StrictMock<TestCallbackWaiter> responder_waiter; StrictMock<TestCallbackWaiter> responder_waiter;
TestCompletionResponder responder; TestCompletionResponder responder;
EXPECT_CALL( EXPECT_CALL(
...@@ -415,6 +375,7 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) { ...@@ -415,6 +375,7 @@ TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) {
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call, auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder)); base::Unretained(&responder));
RecordHandlerImpl handler(client_.get());
handler.HandleRecords(need_encryption_key(), std::move(test_records), handler.HandleRecords(need_encryption_key(), std::move(test_records),
std::move(responder_callback), std::move(responder_callback),
encryption_key_attached_callback); encryption_key_attached_callback);
...@@ -431,18 +392,15 @@ TEST_P(RecordHandlerImplTest, HandleUnknownResponseFromServer) { ...@@ -431,18 +392,15 @@ TEST_P(RecordHandlerImplTest, HandleUnknownResponseFromServer) {
constexpr uint64_t kGenerationId = 1234; constexpr uint64_t kGenerationId = 1234;
auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId); auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId);
TestCallbackWaiterWithCounter client_waiter{kNumTestRecords}; TestCallbackWaiter client_waiter;
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _)) EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumTestRecords) .WillOnce(WithArgs<2>(
.WillRepeatedly(WithArgs<2>(
Invoke([&client_waiter]( Invoke([&client_waiter](
policy::CloudPolicyClient::ResponseCallback callback) { policy::CloudPolicyClient::ResponseCallback callback) {
std::move(callback).Run(base::Value{base::Value::Type::DICTIONARY}); std::move(callback).Run(base::Value{base::Value::Type::DICTIONARY});
client_waiter.Signal(); client_waiter.Signal();
}))); })));
RecordHandlerImpl handler(client_.get());
StrictMock<TestEncryptionKeyAttached> encryption_key_attached; StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
StrictMock<TestCompletionResponder> responder; StrictMock<TestCompletionResponder> responder;
TestCallbackWaiter responder_waiter; TestCallbackWaiter responder_waiter;
...@@ -462,6 +420,7 @@ TEST_P(RecordHandlerImplTest, HandleUnknownResponseFromServer) { ...@@ -462,6 +420,7 @@ TEST_P(RecordHandlerImplTest, HandleUnknownResponseFromServer) {
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call, auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder)); base::Unretained(&responder));
RecordHandlerImpl handler(client_.get());
handler.HandleRecords(need_encryption_key(), std::move(test_records), handler.HandleRecords(need_encryption_key(), std::move(test_records),
std::move(responder_callback), std::move(responder_callback),
encryption_key_attached_callback); encryption_key_attached_callback);
......
...@@ -66,6 +66,8 @@ class UploadEncryptedReportingRequestBuilder { ...@@ -66,6 +66,8 @@ class UploadEncryptedReportingRequestBuilder {
bool attach_encryption_settings = false); bool attach_encryption_settings = false);
~UploadEncryptedReportingRequestBuilder(); ~UploadEncryptedReportingRequestBuilder();
// TODO(chromium:1165908) Have AddRecord take ownership of the record that is
// passed in.
UploadEncryptedReportingRequestBuilder& AddRecord( UploadEncryptedReportingRequestBuilder& AddRecord(
const EncryptedRecord& record); const EncryptedRecord& record);
......
...@@ -242,7 +242,6 @@ TEST_P(UploadClientTest, CreateUploadClientAndUploadRecords) { ...@@ -242,7 +242,6 @@ TEST_P(UploadClientTest, CreateUploadClientAndUploadRecords) {
records->push_back(encrypted_record); records->push_back(encrypted_record);
} }
TestCallbackWaiterWithCounter waiter(kExpectedCallTimes);
StrictMock<TestEncryptionKeyAttached> encryption_key_attached; StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
EXPECT_CALL( EXPECT_CALL(
...@@ -260,15 +259,16 @@ TEST_P(UploadClientTest, CreateUploadClientAndUploadRecords) { ...@@ -260,15 +259,16 @@ TEST_P(UploadClientTest, CreateUploadClientAndUploadRecords) {
client->SetDMToken( client->SetDMToken(
policy::DMToken::CreateValidTokenForTesting("FAKE_DM_TOKEN").value()); policy::DMToken::CreateValidTokenForTesting("FAKE_DM_TOKEN").value());
TestCallbackWaiter waiter;
EXPECT_CALL(*client, UploadEncryptedReport(_, _, _)) EXPECT_CALL(*client, UploadEncryptedReport(_, _, _))
.WillRepeatedly(WithArgs<0, 2>(Invoke( .WillOnce(WithArgs<0, 2>(Invoke(
[&waiter](base::Value request, [&waiter](base::Value request,
policy::CloudPolicyClient::ResponseCallback response_cb) { policy::CloudPolicyClient::ResponseCallback response_cb) {
std::move(response_cb) std::move(response_cb)
.Run(ValueFromSucceededSequencingInfo(std::move(request))); .Run(ValueFromSucceededSequencingInfo(std::move(request)));
base::ThreadPool::PostTask( base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT}, FROM_HERE, {base::TaskPriority::BEST_EFFORT},
base::BindOnce(&TestCallbackWaiterWithCounter::Signal, base::BindOnce(&TestCallbackWaiter::Signal,
base::Unretained(&waiter))); base::Unretained(&waiter)));
}))); })));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment