Commit 46f9dfec authored by Zach Trudo's avatar Zach Trudo Committed by Chromium LUCI CQ

Handle firstFailedUploadedRecord

firstFailedUploadedRecord indicates that a record was
unprocessable on the server, at this time there is nothing
that the device can do to fix any errors the server might
send. So reupload the SequencingInformation as a gap record,
and note the data loss.

Bug: chromium:169883262
Change-Id: I9de35c24c82dbc33b4360ee8afa73d1d4662ebd5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2583506
Commit-Queue: Zach Trudo <zatrudo@google.com>
Reviewed-by: default avatarLeonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#836659}
parent e20e37c9
......@@ -14,6 +14,7 @@
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
......@@ -52,11 +53,11 @@ class RecordHandlerImpl::ReportUploader
void OnStart() override;
void StartUpload(bool need_encryption_key,
const EncryptedRecord& encrypted_record);
void StartUpload(const EncryptedRecord& encrypted_record);
void OnUploadComplete(base::Optional<base::Value> response);
void HandleFailedUpload();
void HandleSuccessfulUpload();
void Complete(DmServerUploadService::CompletionResponse result);
// Populates upload request. Returns JSON request base::Value or nullopt,
// if an error was detected.
......@@ -64,9 +65,23 @@ class RecordHandlerImpl::ReportUploader
bool need_encryption_key,
const EncryptedRecord& encrypted_record);
void Complete(DmServerUploadService::CompletionResponse result);
const bool need_encryption_key_;
// Returns a gap record if it is necessary. Expects the contents of the
// failedUploadedRecord field in the response:
// {
// "sequencingId": 1234
// "generationId": 4321
// "priority": 3
// }
base::Optional<EncryptedRecord> HandleFailedUploadedSequencingInformation(
const base::Value& sequencing_information);
// Helper function for converting a base::Value representation of
// SequencingInformation into a proto. Will return an INVALID_ARGUMENT error
// if the base::Value is not convertable.
StatusOr<SequencingInformation> SequencingInformationValueToProto(
const base::Value& value);
bool need_encryption_key_;
std::unique_ptr<std::vector<EncryptedRecord>> records_;
policy::CloudPolicyClient* client_;
......@@ -80,6 +95,10 @@ class RecordHandlerImpl::ReportUploader
// any attempt to retry calling the client, otherwise it will be overwritten.
base::Value last_response_;
// When a record fails to be processed on the server, |ReportUploader| creates
// a gap record to upload in its place.
EncryptedRecord gap_record_;
// Set for the highest record being uploaded.
base::Optional<SequencingInformation> highest_sequencing_information_;
};
......@@ -128,18 +147,17 @@ void RecordHandlerImpl::ReportUploader::OnStart() {
// We'll be popping records off the back.
std::reverse(records_->begin(), records_->end());
StartUpload(need_encryption_key_, records_->back());
StartUpload(records_->back());
}
void RecordHandlerImpl::ReportUploader::StartUpload(
bool need_encryption_key,
const EncryptedRecord& encrypted_record) {
auto response_cb =
base::BindOnce(&RecordHandlerImpl::ReportUploader::OnUploadComplete,
base::Unretained(this));
auto request_result =
UploadEncryptedReportingRequestBuilder(need_encryption_key)
UploadEncryptedReportingRequestBuilder(need_encryption_key_)
.AddRecord(encrypted_record)
.Build();
if (!request_result.has_value()) {
......@@ -200,31 +218,19 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
// "encryptionSettings": ... // EncryptionSettings proto
// }
// TODO(b/169883262): Factor out the decoding into a separate class.
const base::Value* last_succeed_uploaded_record =
last_response_.FindDictKey("lastSucceedUploadedRecord");
if (last_succeed_uploaded_record != nullptr) {
// Note: Fields below are 'int', should be converted into 'uint64_t'.
const std::string* sequencing_id_str =
last_succeed_uploaded_record->FindStringKey("sequencingId");
const std::string* generation_id_str =
last_succeed_uploaded_record->FindStringKey("generationId");
const auto priority = last_succeed_uploaded_record->FindIntKey("priority");
uint64_t sequencing_id = 0;
uint64_t generation_id = 0;
if (sequencing_id_str &&
base::StringToUint64(*sequencing_id_str, &sequencing_id) &&
generation_id_str &&
base::StringToUint64(*generation_id_str, &generation_id) &&
priority.has_value() && Priority_IsValid(priority.value())) {
SequencingInformation seq_info;
seq_info.set_sequencing_id(sequencing_id);
seq_info.set_generation_id(generation_id);
seq_info.set_priority(Priority(priority.value()));
highest_sequencing_information_ = std::move(seq_info);
auto seq_info_result =
SequencingInformationValueToProto(*last_succeed_uploaded_record);
if (seq_info_result.ok()) {
highest_sequencing_information_ = std::move(seq_info_result.ValueOrDie());
} else {
LOG(ERROR) << "Server responded with an invalid SequencingInformation "
"for lastSucceedUploadedRecord:"
<< *last_succeed_uploaded_record;
}
}
// TODO(b/169883262): Decode and handle failure information.
// Handle the encryption settings.
// Note: server can attach it to response regardless of whether
......@@ -254,6 +260,29 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
signed_encryption_key.set_public_key_id(public_key_id_result.value());
signed_encryption_key.set_signature(public_key_signature);
encryption_key_attached_cb_.Run(signed_encryption_key);
need_encryption_key_ = false;
}
}
// Check if the previous record was unprocessable on the server.
const base::Value* failed_uploaded_record = last_response_.FindDictPath(
"firstFailedUploadedRecord.failedUploadedRecord");
if (failed_uploaded_record != nullptr) {
// The record we uploaded previously was unprocessable by the server, if the
// record was after the current |highest_sequencing_information_| we should
// return a gap record. A gap record consists of an EncryptedRecord with
// just SequencingInformation. The server will report success for the gap
// record and |highest_sequencing_information_| will be updated in the next
// response. In the future there may be recoverable |failureStatus|, but
// for now all the device can do is delete the record.
auto gap_record_result =
HandleFailedUploadedSequencingInformation(*failed_uploaded_record);
if (gap_record_result.has_value()) {
gap_record_ = std::move(gap_record_result.value());
LOG(ERROR) << "Data Loss. Record was unprocessable by the server: "
<< *failed_uploaded_record;
StartUpload(gap_record_);
return;
}
}
......@@ -266,7 +295,42 @@ void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
}
// Upload the next record but do not request encryption key again.
StartUpload(/*need_encryption_key=*/false, records_->back());
StartUpload(records_->back());
}
base::Optional<EncryptedRecord>
RecordHandlerImpl::ReportUploader::HandleFailedUploadedSequencingInformation(
const base::Value& sequencing_information) {
if (!highest_sequencing_information_.has_value()) {
LOG(ERROR) << "highest_sequencing_information_ has no value.";
return base::nullopt;
}
auto seq_info_result =
SequencingInformationValueToProto(sequencing_information);
if (!seq_info_result.ok()) {
LOG(ERROR) << "Server responded with an invalid SequencingInformation for "
"firstFailedUploadedRecord.failedUploadedRecord:"
<< sequencing_information;
return base::nullopt;
}
SequencingInformation& seq_info = seq_info_result.ValueOrDie();
// |seq_info| should be of the same generation and priority as
// highest_sequencing_information_, and have the next sequencing_id.
if (seq_info.generation_id() !=
highest_sequencing_information_->generation_id() ||
seq_info.priority() != highest_sequencing_information_->priority() ||
seq_info.sequencing_id() !=
highest_sequencing_information_->sequencing_id() + 1) {
return base::nullopt;
}
// Build a gap record and return it.
EncryptedRecord encrypted_record;
*encrypted_record.mutable_sequencing_information() = std::move(seq_info);
return encrypted_record;
}
void RecordHandlerImpl::ReportUploader::Complete(
......@@ -275,6 +339,36 @@ void RecordHandlerImpl::ReportUploader::Complete(
completion_result);
}
StatusOr<SequencingInformation>
RecordHandlerImpl::ReportUploader::SequencingInformationValueToProto(
const base::Value& value) {
const std::string* sequencing_id = value.FindStringKey("sequencingId");
const std::string* generation_id = value.FindStringKey("generationId");
const auto priority = value.FindIntKey("priority");
// If any of the previous values don't exist, or are malformed, return error.
// TODO(chromium:1158036) Once SequencingId starts at 1 instead of 0, we
// should use 0 as an error value.
uint64_t seq_id;
uint64_t gen_id;
if (!sequencing_id || sequencing_id->empty() ||
!base::StringToUint64(*sequencing_id, &seq_id) || !generation_id ||
generation_id->empty() ||
!base::StringToUint64(*generation_id, &gen_id) || gen_id == 0 ||
!priority.has_value() || !Priority_IsValid(priority.value())) {
return Status(error::INVALID_ARGUMENT,
base::StrCat({"Provided value did not conform to a valid "
"SequencingInformation proto: ",
value.DebugString()}));
}
SequencingInformation proto;
proto.set_sequencing_id(seq_id);
proto.set_generation_id(gen_id);
proto.set_priority(Priority(priority.value()));
return proto;
}
RecordHandlerImpl::RecordHandlerImpl(policy::CloudPolicyClient* client)
: RecordHandler(client),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
......
......@@ -82,44 +82,97 @@ using TestCompletionResponder =
using TestEncryptionKeyAttached = MockFunction<void(SignedEncryptionInfo)>;
// Helper function composes JSON represented as base::Value from Sequencing
// information in request.
base::Value ValueFromSucceededSequencingInfo(
const base::Optional<base::Value> request) {
EXPECT_TRUE(request.has_value());
EXPECT_TRUE(request.value().is_dict());
base::Value response(base::Value::Type::DICTIONARY);
// Helper function for retrieving and processing the SequencingInformation from
// a request.
void RetrieveFinalSequencingInforamation(const base::Value& request,
base::Value& sequencing_info) {
ASSERT_TRUE(request.is_dict());
// Retrieve and process sequencing information
const base::Value* const encrypted_record_list =
request.value().FindListKey("encryptedRecord");
EXPECT_TRUE(encrypted_record_list != nullptr);
EXPECT_FALSE(encrypted_record_list->GetList().empty());
const base::Value* seq_info =
encrypted_record_list->GetList().rbegin()->FindDictKey(
"sequencingInformation");
EXPECT_TRUE(seq_info != nullptr);
response.SetPath("lastSucceedUploadedRecord", seq_info->Clone());
request.FindListKey("encryptedRecord");
ASSERT_TRUE(encrypted_record_list != nullptr);
ASSERT_FALSE(encrypted_record_list->GetList().empty());
const auto* seq_info = encrypted_record_list->GetList().rbegin()->FindDictKey(
"sequencingInformation");
ASSERT_TRUE(seq_info != nullptr);
ASSERT_TRUE(seq_info->FindStringKey("sequencingId"));
ASSERT_TRUE(seq_info->FindStringKey("generationId"));
ASSERT_TRUE(seq_info->FindIntKey("priority"));
sequencing_info.MergeDictionary(seq_info);
}
base::Optional<base::Value> BuildEncryptionSettingsFromRequest(
const base::Value& request) {
// If attach_encryption_settings it true, process that.
const auto attach_encryption_settings =
request.value().FindBoolKey("attachEncryptionSettings");
if (attach_encryption_settings.has_value() &&
attach_encryption_settings.value()) {
base::Value encryption_settings{base::Value::Type::DICTIONARY};
std::string public_key;
base::Base64Encode("PUBLIC KEY", &public_key);
encryption_settings.SetStringKey("publicKey", public_key);
encryption_settings.SetIntKey("publicKeyId", 12345);
std::string public_key_signature;
// TODO(b/170054326): Generate signature.
base::Base64Encode("PUBLIC KEY SIG", &public_key_signature);
encryption_settings.SetStringKey("publicKeySignature",
public_key_signature);
response.SetPath("encryptionSettings", std::move(encryption_settings));
request.FindBoolKey("attachEncryptionSettings");
if (!attach_encryption_settings.has_value() ||
!attach_encryption_settings.value()) {
return base::nullopt;
}
return response;
base::Value encryption_settings{base::Value::Type::DICTIONARY};
std::string public_key;
base::Base64Encode("PUBLIC KEY", &public_key);
encryption_settings.SetStringKey("publicKey", public_key);
encryption_settings.SetIntKey("publicKeyId", 12345);
std::string public_key_signature;
// TODO(b/170054326): Generate signature.
base::Base64Encode("PUBLIC KEY SIG", &public_key_signature);
encryption_settings.SetStringKey("publicKeySignature", public_key_signature);
return encryption_settings;
}
// Immitates the server response for successful record upload. Since additional
// steps and tests require the response from the server to be accurate, ASSERTS
// that the |request| must be valid, and on a valid request updates |response|.
void SucceedResponseFromRequest(const base::Value& request,
base::Value& response) {
base::Value seq_info{base::Value::Type::DICTIONARY};
RetrieveFinalSequencingInforamation(request, seq_info);
response.SetPath("lastSucceedUploadedRecord", std::move(seq_info));
// If attach_encryption_settings it true, process that.
auto encryption_settings_result = BuildEncryptionSettingsFromRequest(request);
if (encryption_settings_result.has_value()) {
response.SetPath("encryptionSettings",
std::move(encryption_settings_result.value()));
}
}
// Immitates the server response for failed record upload. Since additional
// steps and tests require the response from the server to be accurate, ASSERTS
// that the |request| must be valid, and on a valid request updates |response|.
void FailedResponseFromRequest(const base::Value& request,
base::Value& response) {
base::Value seq_info{base::Value::Type::DICTIONARY};
RetrieveFinalSequencingInforamation(request, seq_info);
// |seq_info| has been built by RetrieveFinalSequencingInforamation and is
// guaranteed to have these keys.
uint64_t sequencing_id;
ASSERT_TRUE(base::StringToUint64(*seq_info.FindStringKey("sequencingId"),
&sequencing_id));
// The lastSucceedUploadedRecord should be the record before the one
// indicated in seq_info.
response.SetStringPath("lastSucceedUploadedRecord.sequencingId",
base::NumberToString(sequencing_id - 1));
response.SetStringPath("lastSucceedUploadedRecord.generationId",
*seq_info.FindStringKey("generationId"));
response.SetIntPath("lastSucceedUploadedRecord.priority",
seq_info.FindIntKey("priority").value());
// The firstFailedUploadedRecord.failedUploadedRecord should be the one
// indicated in seq_info.
response.SetPath("firstFailedUploadedRecord.failedUploadedRecord",
std::move(seq_info));
auto encryption_settings_result = BuildEncryptionSettingsFromRequest(request);
if (encryption_settings_result.has_value()) {
response.SetPath("encryptionSettings",
std::move(encryption_settings_result.value()));
}
}
class RecordHandlerImplTest : public ::testing::TestWithParam<bool> {
......@@ -172,8 +225,9 @@ TEST_P(RecordHandlerImplTest, ForwardsRecordsToCloudPolicyClient) {
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
std::move(callback).Run(
ValueFromSucceededSequencingInfo(std::move(request)));
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
......@@ -223,8 +277,9 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) {
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
std::move(callback).Run(
ValueFromSucceededSequencingInfo(std::move(request)));
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
......@@ -237,23 +292,96 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) {
}
RecordHandlerImpl handler(client_.get());
StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
StrictMock<TestCompletionResponder> responder;
TestCallbackWaiter responder_waiter;
EXPECT_CALL(
responder,
Call(ValueEqualsProto(
(*test_records)[kNumSuccessfulUploads - 1].sequencing_information())))
.WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); }));
StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
EXPECT_CALL(encryption_key_attached, Call(_))
.Times(need_encryption_key() ? 1 : 0);
auto encryption_key_attached_callback =
base::BindRepeating(&TestEncryptionKeyAttached::Call,
base::Unretained(&encryption_key_attached));
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder));
handler.HandleRecords(need_encryption_key(), std::move(test_records),
std::move(responder_callback),
encryption_key_attached_callback);
client_waiter.Wait();
responder_waiter.Wait();
}
TEST_P(RecordHandlerImplTest, UploadsGapRecordOnServerFailure) {
uint64_t kNumInitialSuccessfulUploads = 5;
uint64_t kNumTestRecords = 10;
uint64_t kNumFinalSuccessfulUploads =
kNumTestRecords - kNumInitialSuccessfulUploads;
uint64_t kGenerationId = 1234;
auto test_records = BuildTestRecordsVector(kNumTestRecords, kGenerationId);
// Wait kNumTestRecords times + 1 for the failure.
TestCallbackWaiterWithCounter client_waiter{kNumTestRecords + 1};
{
::testing::InSequence seq;
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumInitialSuccessfulUploads)
.WillRepeatedly(WithArgs<0, 2>(
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.WillOnce(WithArgs<0, 2>(
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
base::Value response{base::Value::Type::DICTIONARY};
FailedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
EXPECT_CALL(*client_, UploadEncryptedReport(_, _, _))
.Times(kNumFinalSuccessfulUploads)
.WillRepeatedly(WithArgs<0, 2>(
Invoke([&client_waiter](
base::Value request,
policy::CloudPolicyClient::ResponseCallback callback) {
base::Value response{base::Value::Type::DICTIONARY};
SucceedResponseFromRequest(request, response);
std::move(callback).Run(std::move(response));
client_waiter.Signal();
})));
}
RecordHandlerImpl handler(client_.get());
StrictMock<TestCallbackWaiter> responder_waiter;
TestCompletionResponder responder;
EXPECT_CALL(
responder,
Call(ValueEqualsProto(
(*test_records)[kNumSuccessfulUploads - 1].sequencing_information())))
(*test_records)[kNumTestRecords - 1].sequencing_information())))
.WillOnce(Invoke([&responder_waiter]() { responder_waiter.Signal(); }));
StrictMock<TestEncryptionKeyAttached> encryption_key_attached;
EXPECT_CALL(encryption_key_attached, Call(_))
.Times(need_encryption_key() ? 1 : 0);
auto encryption_key_attached_callback =
base::BindRepeating(&TestEncryptionKeyAttached::Call,
base::Unretained(&encryption_key_attached));
auto responder_callback = base::BindOnce(&TestCompletionResponder::Call,
base::Unretained(&responder));
......@@ -268,6 +396,5 @@ TEST_P(RecordHandlerImplTest, ReportsEarlyFailure) {
INSTANTIATE_TEST_SUITE_P(NeedOrNoNeedKey,
RecordHandlerImplTest,
testing::Bool());
} // namespace
} // namespace reporting
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment