Commit 3221440c authored by Kramer Ge's avatar Kramer Ge Committed by Commit Bot

Add Atomic Memcpy functions for users of device::OneWriterSeqLock

For SeqLocks, it is recommended for data accesses protected by the
SeqLock to be atomics. Hence, we are adding atomic memcpy functions to
save the inconvenience for users having to implement their own atomic
accessors.

Use std::atomic in seqlocks instead of atomicops.h.

unittest:
Use atomic memcpy functions instead.
Test max_retries feature of ReadBegin.
Fix: Use std::atomic in tests

See:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1478r1.html
https://www.hpl.hp.com/techreports/2012/HPL-2012-68.pdf

Bug: 1132012
Change-Id: I0dad5d63cbc9e30ec44535b35fce62fe7c66b2be
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1572369Reviewed-by: default avatarReilly Grant <reillyg@chromium.org>
Commit-Queue: Kramer Ge <fangzhoug@chromium.org>
Cr-Commit-Position: refs/heads/master@{#810755}
parent 341bf50b
...@@ -8,10 +8,35 @@ namespace device { ...@@ -8,10 +8,35 @@ namespace device {
OneWriterSeqLock::OneWriterSeqLock() : sequence_(0) {} OneWriterSeqLock::OneWriterSeqLock() : sequence_(0) {}
base::subtle::Atomic32 OneWriterSeqLock::ReadBegin(uint32_t max_retries) const { void OneWriterSeqLock::AtomicWriterMemcpy(void* dest,
base::subtle::Atomic32 version; const void* src,
size_t size) {
DCHECK(!(reinterpret_cast<std::uintptr_t>(dest) % 4));
DCHECK(!(reinterpret_cast<std::uintptr_t>(src) % 4));
DCHECK(size % 4 == 0);
for (size_t i = 0; i < size / 4; ++i) {
reinterpret_cast<std::atomic<int32_t>*>(dest)[i].store(
reinterpret_cast<const int32_t*>(src)[i], std::memory_order_relaxed);
}
}
void OneWriterSeqLock::AtomicReaderMemcpy(void* dest,
const void* src,
size_t size) {
DCHECK(!(reinterpret_cast<std::uintptr_t>(dest) % 4));
DCHECK(!(reinterpret_cast<std::uintptr_t>(src) % 4));
DCHECK(size % 4 == 0);
for (size_t i = 0; i < size / 4; ++i) {
reinterpret_cast<int32_t*>(dest)[i] =
reinterpret_cast<const std::atomic<int32_t>*>(src)[i].load(
std::memory_order_relaxed);
}
}
int32_t OneWriterSeqLock::ReadBegin(uint32_t max_retries) const {
int32_t version;
for (uint32_t i = 0; i <= max_retries; ++i) { for (uint32_t i = 0; i <= max_retries; ++i) {
version = base::subtle::Acquire_Load(&sequence_); version = sequence_.load(std::memory_order_acquire);
// If the counter is even, then the associated data might be in a // If the counter is even, then the associated data might be in a
// consistent state, so we can try to read. // consistent state, so we can try to read.
...@@ -27,16 +52,19 @@ base::subtle::Atomic32 OneWriterSeqLock::ReadBegin(uint32_t max_retries) const { ...@@ -27,16 +52,19 @@ base::subtle::Atomic32 OneWriterSeqLock::ReadBegin(uint32_t max_retries) const {
return version; return version;
} }
bool OneWriterSeqLock::ReadRetry(base::subtle::Atomic32 version) const { bool OneWriterSeqLock::ReadRetry(int32_t version) const {
// If the sequence number was updated then a read should be re-attempted. // If the sequence number was updated then a read should be re-attempted.
// -- Load fence, read membarrier // -- Load fence, read membarrier
return base::subtle::Release_Load(&sequence_) != version; atomic_thread_fence(std::memory_order_acquire);
return sequence_.load(std::memory_order_relaxed) != version;
} }
void OneWriterSeqLock::WriteBegin() { void OneWriterSeqLock::WriteBegin() {
// Increment the sequence number to odd to indicate the beginning of a write // Increment the sequence number to odd to indicate the beginning of a write
// update. // update.
base::subtle::Barrier_AtomicIncrement(&sequence_, 1); int32_t version = sequence_.fetch_add(1, std::memory_order_relaxed);
atomic_thread_fence(std::memory_order_release);
DCHECK((version & 1) == 0);
// -- Store fence, write membarrier // -- Store fence, write membarrier
} }
...@@ -44,7 +72,8 @@ void OneWriterSeqLock::WriteEnd() { ...@@ -44,7 +72,8 @@ void OneWriterSeqLock::WriteEnd() {
// Increment the sequence to an even number to indicate the completion of // Increment the sequence to an even number to indicate the completion of
// a write update. // a write update.
// -- Store fence, write membarrier // -- Store fence, write membarrier
base::subtle::Barrier_AtomicIncrement(&sequence_, 1); int32_t version = sequence_.fetch_add(1, std::memory_order_release);
DCHECK((version & 1) != 0);
} }
} // namespace device } // namespace device
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#ifndef DEVICE_BASE_SYNCHRONIZATION_ONE_WRITER_SEQLOCK_H_ #ifndef DEVICE_BASE_SYNCHRONIZATION_ONE_WRITER_SEQLOCK_H_
#define DEVICE_BASE_SYNCHRONIZATION_ONE_WRITER_SEQLOCK_H_ #define DEVICE_BASE_SYNCHRONIZATION_ONE_WRITER_SEQLOCK_H_
#include <atomic>
#include "base/atomicops.h" #include "base/atomicops.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
...@@ -36,16 +38,22 @@ namespace device { ...@@ -36,16 +38,22 @@ namespace device {
class OneWriterSeqLock { class OneWriterSeqLock {
public: public:
OneWriterSeqLock(); OneWriterSeqLock();
// Copies data from src into dest using atomic stores. This should be used by
// writer of SeqLock. Data must be 4-byte aligned.
static void AtomicWriterMemcpy(void* dest, const void* src, size_t size);
// Copies data from src into dest using atomic loads. This should be used by
// readers of SeqLock. Data must be 4-byte aligned.
static void AtomicReaderMemcpy(void* dest, const void* src, size_t size);
// ReadBegin returns |sequence_| when it is even, or when it has retried // ReadBegin returns |sequence_| when it is even, or when it has retried
// |max_retries| times. Omitting |max_retries| results in ReadBegin not // |max_retries| times. Omitting |max_retries| results in ReadBegin not
// returning until |sequence_| is even. // returning until |sequence_| is even.
base::subtle::Atomic32 ReadBegin(uint32_t max_retries = UINT32_MAX) const; int32_t ReadBegin(uint32_t max_retries = UINT32_MAX) const;
bool ReadRetry(base::subtle::Atomic32 version) const; bool ReadRetry(int32_t version) const;
void WriteBegin(); void WriteBegin();
void WriteEnd(); void WriteEnd();
private: private:
base::subtle::Atomic32 sequence_; std::atomic<int32_t> sequence_;
DISALLOW_COPY_AND_ASSIGN(OneWriterSeqLock); DISALLOW_COPY_AND_ASSIGN(OneWriterSeqLock);
}; };
......
...@@ -5,8 +5,8 @@ ...@@ -5,8 +5,8 @@
#include "device/base/synchronization/one_writer_seqlock.h" #include "device/base/synchronization/one_writer_seqlock.h"
#include <stdlib.h> #include <stdlib.h>
#include <atomic>
#include "base/atomic_ref_count.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/third_party/dynamic_annotations/dynamic_annotations.h" #include "base/third_party/dynamic_annotations/dynamic_annotations.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
...@@ -18,7 +18,8 @@ namespace device { ...@@ -18,7 +18,8 @@ namespace device {
// Basic test to make sure that basic operation works correctly. // Basic test to make sure that basic operation works correctly.
struct TestData { struct TestData {
unsigned a, b, c; // Data copies larger than a cache line.
uint32_t buffer[32];
}; };
class BasicSeqLockTestThread : public base::PlatformThread::Delegate { class BasicSeqLockTestThread : public base::PlatformThread::Delegate {
...@@ -27,13 +28,13 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate { ...@@ -27,13 +28,13 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate {
void Init(OneWriterSeqLock* seqlock, void Init(OneWriterSeqLock* seqlock,
TestData* data, TestData* data,
base::AtomicRefCount* ready) { std::atomic<int>* ready) {
seqlock_ = seqlock; seqlock_ = seqlock;
data_ = data; data_ = data;
ready_ = ready; ready_ = ready;
} }
void ThreadMain() override { void ThreadMain() override {
while (ready_->IsZero()) { while (!*ready_) {
base::PlatformThread::YieldCurrentThread(); base::PlatformThread::YieldCurrentThread();
} }
...@@ -42,24 +43,54 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate { ...@@ -42,24 +43,54 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate {
base::subtle::Atomic32 version; base::subtle::Atomic32 version;
do { do {
version = seqlock_->ReadBegin(); version = seqlock_->ReadBegin();
copy = *data_; OneWriterSeqLock::AtomicReaderMemcpy(&copy, data_, sizeof(TestData));
} while (seqlock_->ReadRetry(version)); } while (seqlock_->ReadRetry(version));
EXPECT_EQ(copy.a + 100, copy.b); for (unsigned j = 1; j < 32; ++j)
EXPECT_EQ(copy.c, copy.b + copy.a); EXPECT_EQ(copy.buffer[j], copy.buffer[0] + copy.buffer[j - 1]);
} }
ready_->Decrement(); --(*ready_);
} }
private: private:
OneWriterSeqLock* seqlock_; OneWriterSeqLock* seqlock_;
TestData* data_; TestData* data_;
base::AtomicRefCount* ready_; std::atomic<int>* ready_;
DISALLOW_COPY_AND_ASSIGN(BasicSeqLockTestThread); DISALLOW_COPY_AND_ASSIGN(BasicSeqLockTestThread);
}; };
class MaxRetriesSeqLockTestThread : public base::PlatformThread::Delegate {
public:
MaxRetriesSeqLockTestThread() = default;
void Init(OneWriterSeqLock* seqlock, std::atomic<int>* ready) {
seqlock_ = seqlock;
ready_ = ready;
}
void ThreadMain() override {
while (!*ready_) {
base::PlatformThread::YieldCurrentThread();
}
for (unsigned i = 0; i < 10; ++i) {
base::subtle::Atomic32 version;
version = seqlock_->ReadBegin(100);
EXPECT_NE(version & 1, 0);
}
--*ready_;
}
private:
OneWriterSeqLock* seqlock_;
std::atomic<int>* ready_;
DISALLOW_COPY_AND_ASSIGN(MaxRetriesSeqLockTestThread);
};
#if defined(OS_ANDROID) #if defined(OS_ANDROID)
#define MAYBE_ManyThreads FLAKY_ManyThreads #define MAYBE_ManyThreads FLAKY_ManyThreads
#else #else
...@@ -67,8 +98,8 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate { ...@@ -67,8 +98,8 @@ class BasicSeqLockTestThread : public base::PlatformThread::Delegate {
#endif #endif
TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) { TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) {
OneWriterSeqLock seqlock; OneWriterSeqLock seqlock;
TestData data = {0, 0, 0}; TestData data;
base::AtomicRefCount ready(0); std::atomic<int> ready(0);
ANNOTATE_BENIGN_RACE_SIZED(&data, sizeof(data), "Racey reads are discarded"); ANNOTATE_BENIGN_RACE_SIZED(&data, sizeof(data), "Racey reads are discarded");
...@@ -76,24 +107,27 @@ TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) { ...@@ -76,24 +107,27 @@ TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) {
BasicSeqLockTestThread threads[kNumReaderThreads]; BasicSeqLockTestThread threads[kNumReaderThreads];
base::PlatformThreadHandle handles[kNumReaderThreads]; base::PlatformThreadHandle handles[kNumReaderThreads];
for (unsigned i = 0; i < kNumReaderThreads; ++i) for (uint32_t i = 0; i < kNumReaderThreads; ++i)
threads[i].Init(&seqlock, &data, &ready); threads[i].Init(&seqlock, &data, &ready);
for (unsigned i = 0; i < kNumReaderThreads; ++i) for (uint32_t i = 0; i < kNumReaderThreads; ++i)
ASSERT_TRUE(base::PlatformThread::Create(0, &threads[i], &handles[i])); ASSERT_TRUE(base::PlatformThread::Create(0, &threads[i], &handles[i]));
// The main thread is the writer, and the spawned are readers. // The main thread is the writer, and the spawned are readers.
unsigned counter = 0; uint32_t counter = 0;
for (;;) { for (;;) {
TestData new_data;
new_data.buffer[0] = counter++;
for (unsigned i = 1; i < 32; ++i) {
new_data.buffer[i] = new_data.buffer[0] + new_data.buffer[i - 1];
}
seqlock.WriteBegin(); seqlock.WriteBegin();
data.a = counter++; OneWriterSeqLock::AtomicWriterMemcpy(&data, &new_data, sizeof(TestData));
data.b = data.a + 100;
data.c = data.b + data.a;
seqlock.WriteEnd(); seqlock.WriteEnd();
if (counter == 1) if (counter == 1)
ready.Increment(kNumReaderThreads); ready += kNumReaderThreads;
if (ready.IsZero()) if (!ready)
break; break;
} }
...@@ -101,4 +135,29 @@ TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) { ...@@ -101,4 +135,29 @@ TEST(OneWriterSeqLockTest, MAYBE_ManyThreads) {
base::PlatformThread::Join(handles[i]); base::PlatformThread::Join(handles[i]);
} }
TEST(OneWriterSeqLockTest, MaxRetries) {
OneWriterSeqLock seqlock;
std::atomic<int> ready(0);
static const unsigned kNumReaderThreads = 3;
MaxRetriesSeqLockTestThread threads[kNumReaderThreads];
base::PlatformThreadHandle handles[kNumReaderThreads];
for (uint32_t i = 0; i < kNumReaderThreads; ++i)
threads[i].Init(&seqlock, &ready);
for (uint32_t i = 0; i < kNumReaderThreads; ++i)
ASSERT_TRUE(base::PlatformThread::Create(0, &threads[i], &handles[i]));
// The main thread is the writer, and the spawned are readers.
seqlock.WriteBegin();
ready += kNumReaderThreads;
while (ready) {
base::PlatformThread::YieldCurrentThread();
}
seqlock.WriteEnd();
for (unsigned i = 0; i < kNumReaderThreads; ++i)
base::PlatformThread::Join(handles[i]);
}
} // namespace device } // namespace device
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment