Commit c2420b9b authored by Patrick Monette's avatar Patrick Monette Committed by Commit Bot

[PM] Migrate MaxVoteAggregator to VoteObserver

Bug: 971272
Change-Id: Ie8ab8512d9fc00ca0111b3f3bed1fc779593cad3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2522195
Commit-Queue: Patrick Monette <pmonette@chromium.org>
Reviewed-by: default avatarChris Hamilton <chrisha@chromium.org>
Cr-Commit-Position: refs/heads/master@{#828007}
parent 00808efe
......@@ -10,20 +10,13 @@
namespace performance_manager {
namespace execution_context_priority {
// static
MaxVoteAggregator::StampedVote*
MaxVoteAggregator::StampedVote::FromAcceptedVote(AcceptedVote* accepted_vote) {
static_assert(offsetof(StampedVote, vote) == 0,
"AcceptedVote is expected to be at offset 0 of StampedVote");
return reinterpret_cast<StampedVote*>(accepted_vote);
}
MaxVoteAggregator::MaxVoteAggregator() : factory_(this) {}
MaxVoteAggregator::MaxVoteAggregator()
: vote_consumer_default_impl_(this), next_vote_id_(0) {}
MaxVoteAggregator::~MaxVoteAggregator() = default;
VotingChannel MaxVoteAggregator::GetVotingChannel() {
return factory_.BuildVotingChannel();
return vote_consumer_default_impl_.BuildVotingChannel();
}
void MaxVoteAggregator::SetUpstreamVotingChannel(VotingChannel&& channel) {
......@@ -33,52 +26,36 @@ void MaxVoteAggregator::SetUpstreamVotingChannel(VotingChannel&& channel) {
channel_ = std::move(channel);
}
VoteReceipt MaxVoteAggregator::SubmitVote(
util::PassKey<VotingChannel>,
voting::VoterId<Vote> voter_id,
void MaxVoteAggregator::OnVoteSubmitted(
VoterId voter_id,
const ExecutionContext* execution_context,
const Vote& vote) {
DCHECK(vote.IsValid());
DCHECK(channel_.IsValid());
// NOTE: We don't currently explicitly worry about having multiple votes for
// the same execution context from a single voter, although such logic could
// be added.
// Add the new vote.
VoteData& vote_data = vote_data_map_[execution_context];
auto accepted_vote = AcceptedVote(this, voter_id, execution_context, vote);
auto receipt = accepted_vote.IssueReceipt();
if (vote_data.AddVote(std::move(accepted_vote), next_vote_id_++))
vote_data.UpstreamVote(&channel_);
// Finally, return a vote receipt to our voter for the received vote.
return receipt;
if (vote_data.AddVote(voter_id, vote, next_vote_id_++))
vote_data.UpstreamVote(execution_context, &channel_);
}
void MaxVoteAggregator::ChangeVote(util::PassKey<AcceptedVote>,
AcceptedVote* old_vote,
void MaxVoteAggregator::OnVoteChanged(VoterId voter_id,
const ExecutionContext* execution_context,
const Vote& new_vote) {
DCHECK(old_vote->IsValid());
VoteData& vote_data = GetVoteData(old_vote)->second;
size_t index = vote_data.GetVoteIndex(old_vote);
// Update the vote directly, then repair the heap.
old_vote->UpdateVote(new_vote);
if (vote_data.UpdateVote(index, next_vote_id_++))
vote_data.UpstreamVote(&channel_);
VoteData& vote_data = GetVoteData(execution_context)->second;
if (vote_data.UpdateVote(voter_id, new_vote))
vote_data.UpstreamVote(execution_context, &channel_);
}
void MaxVoteAggregator::VoteInvalidated(util::PassKey<AcceptedVote>,
AcceptedVote* vote) {
DCHECK(!vote->IsValid());
auto it = GetVoteData(vote);
void MaxVoteAggregator::OnVoteInvalidated(
VoterId voter_id,
const ExecutionContext* execution_context) {
auto it = GetVoteData(execution_context);
VoteData& vote_data = it->second;
size_t index = vote_data.GetVoteIndex(vote);
// Remove the vote, and upstream if necessary.
if (vote_data.RemoveVote(index))
vote_data.UpstreamVote(&channel_);
if (vote_data.RemoveVote(voter_id))
vote_data.UpstreamVote(execution_context, &channel_);
// If all the votes for this execution context have disappeared then remove
// the entry entirely. This will automatically cancel our upstream vote.
......@@ -87,22 +64,14 @@ void MaxVoteAggregator::VoteInvalidated(util::PassKey<AcceptedVote>,
}
MaxVoteAggregator::StampedVote::StampedVote() = default;
MaxVoteAggregator::StampedVote::StampedVote(AcceptedVote&& vote,
uint32_t vote_id)
: vote(std::move(vote)), vote_id(vote_id) {}
MaxVoteAggregator::StampedVote::StampedVote(const Vote& vote, uint32_t vote_id)
: vote_(vote), vote_id_(vote_id) {}
MaxVoteAggregator::StampedVote::StampedVote(StampedVote&&) = default;
MaxVoteAggregator::StampedVote::~StampedVote() = default;
MaxVoteAggregator::VoteDataMap::iterator MaxVoteAggregator::GetVoteData(
AcceptedVote* vote) {
// The vote being retrieved should have us as its consumer, and we should
// already have been setup to receive votes before this is called.
DCHECK(vote);
DCHECK_EQ(this, vote->consumer());
DCHECK(channel_.IsValid());
// Find the votes associated with this execution context.
auto it = vote_data_map_.find(vote->context());
const ExecutionContext* execution_context) {
auto it = vote_data_map_.find(execution_context);
DCHECK(it != vote_data_map_.end());
return it;
}
......@@ -116,56 +85,71 @@ MaxVoteAggregator::VoteData& MaxVoteAggregator::VoteData::operator=(
MaxVoteAggregator::VoteData::~VoteData() = default;
bool MaxVoteAggregator::VoteData::AddVote(AcceptedVote&& vote,
bool MaxVoteAggregator::VoteData::AddVote(VoterId voter_id,
const Vote& vote,
uint32_t vote_id) {
DCHECK(vote.IsValid());
votes_.emplace(std::move(vote), vote_id);
// Remember the upstream vote as it may change. There could be none.
base::Optional<Vote> old_root;
if (!votes_.empty())
old_root = votes_.top().vote();
auto it = votes_.emplace(vote, vote_id);
bool inserted = heap_handles_.emplace(voter_id, it->handle()).second;
DCHECK(inserted);
// There was no previous root. This vote must be upstreamed.
if (!old_root)
return true;
}
bool MaxVoteAggregator::VoteData::UpdateVote(size_t index, uint32_t vote_id) {
DCHECK_LE(0u, index);
DCHECK_LT(index, votes_.size());
DCHECK(votes_[index].vote.IsValid());
// The vote always needs to be upstreamed if the root vote changed.
const Vote& new_root = votes_.top().vote();
return old_root.value() != new_root;
}
bool MaxVoteAggregator::VoteData::UpdateVote(VoterId voter_id,
const Vote& new_vote) {
// Remember the upstream vote as it may change.
const Vote old_root = votes_.top().vote.vote();
const Vote old_root = votes_.top().vote();
auto it = heap_handles_.find(voter_id);
DCHECK(it != heap_handles_.end());
base::HeapHandle* heap_handle = it->second;
// The AcceptedVote has actually already been changed in place. Remove the
// vote, finish the update, and reinsert it into the heap.
votes_.Update(index);
votes_.Modify(*heap_handle, [&new_vote](StampedVote& element) {
element.SetVote(new_vote);
});
// The vote always needs to be upstreamed if the changed vote was at the root.
// Otherwise, we only need to upstream if the root vote was observed to change
// as part of the heap repair.
const Vote& new_root = votes_.top().vote.vote();
return index == 0 || old_root != new_root;
// The vote always needs to be upstreamed if the root vote changed.
const Vote& new_root = votes_.top().vote();
return old_root != new_root;
}
bool MaxVoteAggregator::VoteData::RemoveVote(size_t index) {
DCHECK_LE(0u, index);
DCHECK_LT(index, votes_.size());
DCHECK(!votes_[index].vote.IsValid());
bool MaxVoteAggregator::VoteData::RemoveVote(VoterId voter_id) {
// Remember the upstream vote as it may change.
const Vote old_root = votes_.top().vote();
votes_.erase(index);
auto it = heap_handles_.find(voter_id);
DCHECK(it != heap_handles_.end());
base::HeapHandle* heap_handle = it->second;
heap_handles_.erase(it);
// A new upstream vote needs to be made if the root was disturbed.
return votes_.size() && index == 0;
}
votes_.erase(*heap_handle);
size_t MaxVoteAggregator::VoteData::GetVoteIndex(AcceptedVote* vote) {
StampedVote* stamped_vote = StampedVote::FromAcceptedVote(vote);
DCHECK_NE(0u, votes_.size());
DCHECK_LE(votes_.data(), stamped_vote);
DCHECK_LT(stamped_vote, votes_.data() + votes_.size());
return stamped_vote - votes_.data();
// If |votes_| is now empty, the upstream vote needs to be invalidated instead
// of upstreaming a new vote.
if (votes_.empty())
return false;
const Vote& new_root = votes_.top().vote();
return old_root != new_root;
}
void MaxVoteAggregator::VoteData::UpstreamVote(VotingChannel* channel) {
DCHECK_NE(0u, votes_.size());
DCHECK(votes_.top().vote.IsValid());
const ExecutionContext* execution_context = votes_.top().vote.context();
const Vote& vote = votes_.top().vote.vote();
void MaxVoteAggregator::VoteData::UpstreamVote(
const ExecutionContext* execution_context,
VotingChannel* channel) {
DCHECK(!votes_.empty());
const Vote& vote = votes_.top().vote();
// Change our existing vote, or create a new one as necessary.
if (receipt_.HasVote()) {
......
......@@ -41,143 +41,10 @@ static_assert(kPriority0 < kPriority1 && kPriority1 < kPriority2,
static const char kReason0[] = "a reason";
static const char kReason1[] = "another reason";
static const char kReason2[] = "yet another reason";
size_t RandIndex(const VoteData& vote_data) {
DCHECK(!vote_data.IsEmpty());
int i = base::RandInt(0, static_cast<int>(vote_data.GetSize() - 1));
return static_cast<size_t>(i);
}
base::TaskPriority RandPriority() {
int i = base::RandInt(static_cast<int>(base::TaskPriority::LOWEST),
static_cast<int>(base::TaskPriority::HIGHEST));
return static_cast<base::TaskPriority>(i);
}
const char* RandReason() {
int i = base::RandInt(0, 2);
if (i == 0)
return kReason0;
if (i == 1)
return kReason1;
DCHECK_EQ(2, i);
return kReason2;
}
class FakeVoteConsumer : public DummyVoteConsumer {
public:
FakeVoteConsumer() = default;
~FakeVoteConsumer() override = default;
protected:
// Deliberately override VoteInvalidated so that this consumer silently
// ignores these notifications.
void VoteInvalidated(util::PassKey<AcceptedVote>,
AcceptedVote* vote) override {
return;
}
private:
DISALLOW_COPY_AND_ASSIGN(FakeVoteConsumer);
};
} // namespace
TEST(MaxVoteAggregatorTest, StampedVoteCast) {
StampedVote stamped_vote;
EXPECT_EQ(&stamped_vote, StampedVote::FromAcceptedVote(&stamped_vote.vote));
}
TEST(MaxVoteAggregatorTest, VoteDataHeapStressTest) {
// Build a simple consumer/voter chain so that we generate an actual
// voting::VoterId.
FakeVoteConsumer consumer;
DummyVoter voter;
voting::VoterId<Vote> voter_id;
{
auto channel = consumer.voting_channel_factory_.BuildVotingChannel();
voter_id = channel.voter_id();
voter.SetVotingChannel(std::move(channel));
}
MaxVoteAggregatorTestAccess::VoteData vd;
// Parameters controlling the test.
static constexpr int kInsert = 0;
static constexpr int kMove = 1;
static constexpr int kRemove = 2;
static constexpr size_t kInitialInserts = 100;
static constexpr size_t kNops = 10000;
uint32_t next_vote_id = 0;
for (size_t i = 0; i < kNops; ++i) {
const size_t ops_left = kNops - i;
// Determine the type of operation to perform.
int operation = base::RandInt(kInsert, kRemove);
if (vd.GetSize() == 0 || i < kInitialInserts)
operation = kInsert;
// If an insertion will make it impossible to remove all elements with the
// remaining nops, make it a move instead. This guarantees that we finish
// with zero elements in the heap, although we may actually get to that
// point before we completely run out of operations.
if (operation == kInsert && vd.GetSize() + 1 > ops_left - 1)
operation = kMove;
// If there are as many operations left as elements then do a removal.
if (ops_left == vd.GetSize())
operation = kRemove;
switch (operation) {
case kInsert: {
auto priority = RandPriority();
auto* reason = RandReason();
vd.AddVote(AcceptedVote(&consumer, voter_id, kExecutionContext0,
Vote(priority, reason)),
next_vote_id++);
} break;
case kMove: {
// Choose a vote and generate a new priority/reason for it.
size_t index = RandIndex(vd);
auto& vote = vd.GetVoteForTesting(index);
auto priority = RandPriority();
auto* reason = RandReason();
while (priority == vote.vote().value() &&
reason == vote.vote().reason()) {
priority = RandPriority();
reason = RandReason();
}
// Update the vote.
vote.UpdateVote(Vote(priority, reason));
vd.UpdateVote(index, next_vote_id++);
} break;
case kRemove: {
// Choose a vote and remove it.
size_t index = RandIndex(vd);
// Issue a receipt that is immediately destroyed, so that the vote is no
// longer valid. Then remove the vote.
vd.GetVoteForTesting(index).IssueReceipt();
vd.RemoveVote(index);
} break;
default:
NOTREACHED();
}
}
// Expect the heap to be empty at the end of the test.
EXPECT_TRUE(vd.IsEmpty());
}
TEST(MaxVoteAggregatorTest, BlackboxTest) {
TEST(MaxVoteAggregatorTest, AggregationWorks) {
// Builds the small hierarchy of voters as follows:
//
// consumer
......
......@@ -18,7 +18,7 @@ namespace execution_context_priority {
// the maximum vote for each frame. The upstream voting channel must be set
// before any votes are submitted to this aggregator. New voting channels may
// continue to be issued at any time during its lifetime, however.
class MaxVoteAggregator : public VoteConsumer {
class MaxVoteAggregator : public VoteObserver {
public:
MaxVoteAggregator();
~MaxVoteAggregator() override;
......@@ -30,55 +30,48 @@ class MaxVoteAggregator : public VoteConsumer {
void SetUpstreamVotingChannel(VotingChannel&& channel);
protected:
// VoteConsumer implementation:
VoteReceipt SubmitVote(util::PassKey<VotingChannel>,
voting::VoterId<Vote> voter_id,
// VoteObserver implementation:
void OnVoteSubmitted(VoterId voter_id,
const ExecutionContext* execution_context,
const Vote& vote) override;
void ChangeVote(util::PassKey<AcceptedVote>,
AcceptedVote* old_vote,
void OnVoteChanged(VoterId voter_id,
const ExecutionContext* execution_context,
const Vote& new_vote) override;
void VoteInvalidated(util::PassKey<AcceptedVote>,
AcceptedVote* vote) override;
void OnVoteInvalidated(VoterId voter_id,
const ExecutionContext* execution_context) override;
private:
friend class MaxVoteAggregatorTestAccess;
// A StampedVote is an AcceptedVote with a serial number that can be used to
// order votes by the order in which they were received. This ensures that
// votes upstreamed by this aggregator remain as stable as possible.
struct StampedVote {
// A StampedVote is a Vote with a serial number that can be used to order
// votes by the order in which they were received. This ensures that votes
// upstreamed by this aggregator remain as stable as possible.
class StampedVote : public base::InternalHeapHandleStorage {
public:
StampedVote();
StampedVote(AcceptedVote&& vote, uint32_t vote_id);
StampedVote(const Vote& vote, uint32_t vote_id);
StampedVote(StampedVote&&);
StampedVote(const StampedVote&) = delete;
~StampedVote();
~StampedVote() override;
StampedVote& operator=(StampedVote&&) = default;
StampedVote& operator=(const StampedVote&) = delete;
bool operator<(const StampedVote& rhs) const {
if (vote.vote().value() != rhs.vote.vote().value())
return vote.vote().value() < rhs.vote.vote().value();
if (vote_.value() != rhs.vote_.value())
return vote_.value() < rhs.vote_.value();
// Higher |vote_id| values are of lower priority.
return vote_id > rhs.vote_id;
return vote_id_ > rhs.vote_id_;
}
// Given an AcceptedVote that's embedded in a StampedVote::vote, retrieve
// the embedding StampedVote instance.
static StampedVote* FromAcceptedVote(AcceptedVote* accepted_vote);
// IntrusiveHeap contract. We actually don't need HeapHandles, as we already
// know the positions of the elements in the heap directly, as they are
// tracked with explicit back pointers.
void SetHeapHandle(base::HeapHandle) {}
void ClearHeapHandle() {}
base::HeapHandle GetHeapHandle() const {
return base::HeapHandle::Invalid();
}
const Vote& vote() const { return vote_; }
uint32_t vote_id() const { return vote_id_; }
AcceptedVote vote;
uint32_t vote_id = 0;
void SetVote(const Vote& new_vote) { vote_ = new_vote; }
private:
Vote vote_;
uint32_t vote_id_ = 0;
};
// The collection of votes for a single execution context. This is move-only
......@@ -95,21 +88,19 @@ class MaxVoteAggregator : public VoteConsumer {
~VoteData();
// Adds a vote. Returns true if a new upstream vote is needed.
bool AddVote(AcceptedVote&& vote, uint32_t vote_id);
bool AddVote(VoterId voter_id, const Vote& vote, uint32_t vote_id);
// Updates the vote from its given index to a new index. Returns true if the
// root was disturbed and a new upstream vote is needed.
bool UpdateVote(size_t index, uint32_t vote_id);
bool UpdateVote(VoterId voter_id, const Vote& new_vote);
// Removes the vote at the provided index. Returns true if the root was
// disturbed and a new upstream vote is needed.
bool RemoveVote(size_t index);
// Gets the index of the given vote.
size_t GetVoteIndex(AcceptedVote* vote);
bool RemoveVote(VoterId voter_id);
// Upstreams the vote for this vote data, using the given voting |channel|.
void UpstreamVote(VotingChannel* channel);
void UpstreamVote(const ExecutionContext* execution_context,
VotingChannel* channel);
// Returns the number of votes in this structure.
size_t GetSize() const { return votes_.size(); }
......@@ -117,28 +108,28 @@ class MaxVoteAggregator : public VoteConsumer {
// Returns true if this VoteData is empty.
bool IsEmpty() const { return votes_.empty(); }
AcceptedVote& GetVoteForTesting(size_t index) {
return const_cast<AcceptedVote&>(votes_[index].vote);
}
private:
base::IntrusiveHeap<StampedVote> votes_;
// Maps each voting channel to the HeapHandle to their associated vote in
// |votes_|.
std::map<VoterId, base::HeapHandle*> heap_handles_;
// The receipt for the vote we've upstreamed.
VoteReceipt receipt_;
};
using VoteDataMap = std::map<const ExecutionContext*, VoteData>;
// Looks up the VoteData associated with the provided |vote|. The data is
// expected to already exist (enforced by a DCHECK).
VoteDataMap::iterator GetVoteData(AcceptedVote* vote);
// Looks up the VoteData associated with the provided |execution_context|. The
// data is expected to already exist (enforced by a DCHECK).
VoteDataMap::iterator GetVoteData(const ExecutionContext* execution_context);
// Our channel for upstreaming our votes.
VotingChannel channel_;
// Our VotingChannelFactory for providing VotingChannels to our input voters.
VotingChannelFactory factory_;
// Provides VotingChannels to our input voters.
VoteConsumerDefaultImpl vote_consumer_default_impl_;
// The next StampedVote ID to use.
uint32_t next_vote_id_;
......
......@@ -448,7 +448,9 @@ class VoteConsumerDefaultImpl : public VoteConsumer<VoteImpl> {
VotingChannelFactory<VoteImpl> voting_channel_factory_;
std::map<const ContextType*, AcceptedVote<VoteImpl>> accepted_votes_;
std::map<VoterId<VoteImpl>,
std::map<const ContextType*, AcceptedVote<VoteImpl>>>
accepted_votes_by_voter_id_;
};
/////////////////////////////////////////////////////////////////////
......@@ -872,12 +874,14 @@ VoteReceipt<VoteImpl> VoteConsumerDefaultImpl<VoteImpl>::SubmitVote(
VoterId<VoteImpl> voter_id,
const ContextType* context,
const VoteImpl& vote) {
auto& accepted_votes = accepted_votes_by_voter_id_[voter_id];
AcceptedVote<VoteImpl> accepted_vote(this, voter_id, context, vote);
VoteReceipt<VoteImpl> vote_receipt = accepted_vote.IssueReceipt();
bool inserted =
accepted_votes_.emplace(context, std::move(accepted_vote)).second;
accepted_votes.emplace(context, std::move(accepted_vote)).second;
DCHECK(inserted);
vote_observer_->OnVoteSubmitted(voter_id, context, vote);
......@@ -890,8 +894,12 @@ void VoteConsumerDefaultImpl<VoteImpl>::ChangeVote(
util::PassKey<AcceptedVote<VoteImpl>>,
AcceptedVote<VoteImpl>* old_vote,
const VoteImpl& new_vote) {
auto it = accepted_votes_.find(old_vote->context());
DCHECK(it != accepted_votes_.end());
VoterId<VoteImpl> voter_id = old_vote->voter_id();
auto& accepted_votes = accepted_votes_by_voter_id_[voter_id];
auto it = accepted_votes.find(old_vote->context());
DCHECK(it != accepted_votes.end());
auto* accepted_vote = &it->second;
DCHECK_EQ(accepted_vote, old_vote);
......@@ -905,15 +913,24 @@ template <class VoteImpl>
void VoteConsumerDefaultImpl<VoteImpl>::VoteInvalidated(
util::PassKey<AcceptedVote<VoteImpl>>,
AcceptedVote<VoteImpl>* vote) {
auto it = accepted_votes_.find(vote->context());
DCHECK(it != accepted_votes_.end());
VoterId<VoteImpl> voter_id = vote->voter_id();
auto& accepted_votes = accepted_votes_by_voter_id_[voter_id];
auto it = accepted_votes.find(vote->context());
DCHECK(it != accepted_votes.end());
auto* accepted_vote = &it->second;
DCHECK_EQ(accepted_vote, vote);
vote_observer_->OnVoteInvalidated(accepted_vote->voter_id(),
accepted_vote->context());
accepted_votes_.erase(it);
accepted_votes.erase(it);
if (accepted_votes.empty()) {
size_t removed = accepted_votes_by_voter_id_.erase(voter_id);
DCHECK_EQ(removed, 1u);
}
}
} // namespace voting
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment