Commit 677d1a8e authored by zea@chromium.org's avatar zea@chromium.org

[Sync] Ensure all accesses to SyncInternal's observers are thread safe.

BUG=72913
TEST=unit tests with tsan


Review URL: http://codereview.chromium.org/7044059

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@88472 0039d316-1c4b-4281-b951-d872f2087c98
parent fb7e7492
......@@ -1247,8 +1247,10 @@ class SyncManager::SyncInternal
virtual void StoreState(const std::string& cookie);
// Thread-safe observers_ accessors.
void CopyObservers(ObserverList<SyncManager::Observer>* observers_copy);
bool HaveObservers() const;
void AddObserver(SyncManager::Observer* observer);
void RemoveObserver(SyncManager::Observer* observer);
// Accessors for the private members.
......@@ -1539,6 +1541,9 @@ class SyncManager::SyncInternal
MessageLoop* core_message_loop_;
// We have to lock around every observers_ access because it can get accessed
// from any thread and added to/removed from on the core thread.
mutable base::Lock observers_lock_;
ObserverList<SyncManager::Observer> observers_;
browser_sync::JsEventRouter* parent_router_;
......@@ -1820,8 +1825,10 @@ void SyncManager::SyncInternal::BootstrapEncryption(
nigori.CopyFrom(node.GetNigoriSpecifics());
Cryptographer::UpdateResult result = cryptographer->Update(nigori);
if (result == Cryptographer::NEEDS_PASSPHRASE) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
OnPassphraseRequired(sync_api::REASON_DECRYPTION));
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseRequired(sync_api::REASON_DECRYPTION));
}
// Refresh list of encrypted datatypes.
......@@ -1835,10 +1842,10 @@ void SyncManager::SyncInternal::BootstrapEncryption(
}
void SyncManager::SyncInternal::StartSyncingNormally() {
// Start the syncer thread. This won't actually
// result in any syncing until at least the
// DirectoryManager broadcasts the OPENED event,
// and a valid server connection is detected.
// Start the syncer thread. This won't actually
// result in any syncing until at least the
// DirectoryManager broadcasts the OPENED event,
// and a valid server connection is detected.
if (syncer_thread()) // NULL during certain unittests.
syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL);
}
......@@ -1857,7 +1864,9 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
}
// Notify that initialization is complete.
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnInitializationComplete());
}
......@@ -1876,7 +1885,9 @@ bool SyncManager::SyncInternal::OpenDirectory() {
bool share_opened = dir_manager()->Open(username_for_share());
DCHECK(share_opened);
if (!share_opened) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnStopSyncingPermanently());
LOG(ERROR) << "Could not open share for:" << username_for_share();
......@@ -1961,8 +1972,9 @@ void SyncManager::SyncInternal::UpdateEnabledTypes() {
}
void SyncManager::SyncInternal::RaiseAuthNeededEvent() {
FOR_EACH_OBSERVER(
SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS)));
}
......@@ -1984,7 +1996,9 @@ void SyncManager::SyncInternal::SetPassphrase(
// We do not accept empty passphrases.
if (passphrase.empty()) {
VLOG(1) << "Rejecting empty passphrase.";
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED));
return;
}
......@@ -1997,7 +2011,9 @@ void SyncManager::SyncInternal::SetPassphrase(
if (cryptographer->has_pending_keys()) {
if (!cryptographer->DecryptPendingKeys(params)) {
VLOG(1) << "Passphrase failed to decrypt pending keys.";
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseRequired(sync_api::REASON_SET_PASSPHRASE_FAILED));
return;
}
......@@ -2043,7 +2059,9 @@ void SyncManager::SyncInternal::SetPassphrase(
std::string bootstrap_token;
cryptographer->GetBootstrapToken(&bootstrap_token);
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseAccepted(bootstrap_token));
}
......@@ -2162,7 +2180,9 @@ void SyncManager::SyncInternal::ReEncryptEverything(WriteTransaction* trans) {
}
}
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnEncryptionComplete(encrypted_types));
}
......@@ -2256,13 +2276,16 @@ void SyncManager::SyncInternal::OnServerConnectionEvent(
allstatus_.HandleServerConnectionEvent(event);
if (event.connection_code ==
browser_sync::HttpResponse::SERVER_CONNECTION_OK) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnAuthError(AuthError::None()));
}
if (event.connection_code == browser_sync::HttpResponse::SYNC_AUTH_ERROR) {
FOR_EACH_OBSERVER(
SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnAuthError(AuthError(AuthError::INVALID_GAIA_CREDENTIALS)));
}
}
......@@ -2272,13 +2295,15 @@ void SyncManager::SyncInternal::HandleTransactionCompleteChangeEvent(
// This notification happens immediately after the transaction mutex is
// released. This allows work to be performed without blocking other threads
// from acquiring a transaction.
if (observers_.size() <= 0)
if (!HaveObservers())
return;
// Call commit.
for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) {
if (models_with_changes.test(i)) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnChangesComplete(syncable::ModelTypeFromInt(i)));
}
}
......@@ -2289,7 +2314,7 @@ ModelTypeBitSet SyncManager::SyncInternal::HandleTransactionEndingChangeEvent(
// This notification happens immediately before a syncable WriteTransaction
// falls out of scope. It happens while the channel mutex is still held,
// and while the transaction mutex is held, so it cannot be re-entrant.
if (observers_.size() <= 0 || ChangeBuffersAreEmpty())
if (!HaveObservers() || ChangeBuffersAreEmpty())
return ModelTypeBitSet();
// This will continue the WriteTransaction using a read only wrapper.
......@@ -2306,8 +2331,9 @@ ModelTypeBitSet SyncManager::SyncInternal::HandleTransactionEndingChangeEvent(
vector<ChangeRecord> ordered_changes;
change_buffers_[i].GetAllChangesInTreeOrder(&read_trans, &ordered_changes);
if (!ordered_changes.empty()) {
FOR_EACH_OBSERVER(
SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnChangesApplied(syncable::ModelTypeFromInt(i), &read_trans,
&ordered_changes[0], ordered_changes.size()));
models_with_changes.set(i, true);
......@@ -2456,7 +2482,7 @@ void SyncManager::SyncInternal::RequestNudgeWithDataTypes(
void SyncManager::SyncInternal::OnSyncEngineEvent(
const SyncEngineEvent& event) {
if (observers_.size() <= 0) {
if (!HaveObservers()) {
VLOG(0) << "OnSyncEngineEvent returning because observers_.size() is zero";
return;
}
......@@ -2504,12 +2530,16 @@ void SyncManager::SyncInternal::OnSyncEngineEvent(
// yet, prompt the user for a passphrase.
if (cryptographer->has_pending_keys()) {
VLOG(1) << "OnPassPhraseRequired Sent";
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseRequired(sync_api::REASON_DECRYPTION));
} else if (!cryptographer->is_ready()) {
VLOG(1) << "OnPassphraseRequired sent because cryptographer is not "
<< "ready";
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnPassphraseRequired(sync_api::REASON_ENCRYPTION));
}
// If everything is in order(we have the passphrase) then there is no
......@@ -2527,7 +2557,9 @@ void SyncManager::SyncInternal::OnSyncEngineEvent(
if (!event.snapshot->has_more_to_sync) {
VLOG(1) << "OnSyncCycleCompleted sent";
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnSyncCycleCompleted(event.snapshot));
}
......@@ -2549,25 +2581,33 @@ void SyncManager::SyncInternal::OnSyncEngineEvent(
}
if (event.what_happened == SyncEngineEvent::STOP_SYNCING_PERMANENTLY) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnStopSyncingPermanently());
return;
}
if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_SUCCEEDED) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnClearServerDataSucceeded());
return;
}
if (event.what_happened == SyncEngineEvent::CLEAR_SERVER_DATA_FAILED) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnClearServerDataFailed());
return;
}
if (event.what_happened == SyncEngineEvent::UPDATED_TOKEN) {
FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
ObserverList<SyncManager::Observer> temp_obs_list;
CopyObservers(&temp_obs_list);
FOR_EACH_OBSERVER(SyncManager::Observer, temp_obs_list,
OnUpdatedToken(event.updated_token));
return;
}
......@@ -2872,13 +2912,38 @@ void SyncManager::SyncInternal::StoreState(
lookup->SaveChanges();
}
// Note: it is possible that an observer will remove itself after we have made
// a copy, but before the copy is consumed. This could theoretically result
// in accessing a garbage pointer, but can only occur when an about:sync window
// is closed in the middle of a notification.
// See crbug.com/85481.
void SyncManager::SyncInternal::CopyObservers(
ObserverList<SyncManager::Observer>* observers_copy) {
DCHECK_EQ(0U, observers_copy->size());
base::AutoLock lock(observers_lock_);
if (observers_.size() == 0)
return;
ObserverListBase<SyncManager::Observer>::Iterator it(observers_);
SyncManager::Observer* obs;
while ((obs = it.GetNext()) != NULL)
observers_copy->AddObserver(obs);
}
bool SyncManager::SyncInternal::HaveObservers() const {
base::AutoLock lock(observers_lock_);
return observers_.size() > 0;
}
void SyncManager::SyncInternal::AddObserver(
SyncManager::Observer* observer) {
base::AutoLock lock(observers_lock_);
observers_.AddObserver(observer);
}
void SyncManager::SyncInternal::RemoveObserver(
SyncManager::Observer* observer) {
base::AutoLock lock(observers_lock_);
observers_.RemoveObserver(observer);
}
......
......@@ -116,18 +116,10 @@
fun:JSC::Yarr::interpret*
}
{
bug_72913
ThreadSanitizer:Race
...
fun:ObserverListBase*
...
fun:sync_api::SyncManager::SyncInternal::*
}
{
bug_82279
ThreadSanitizer:Race
...
fun:ObserverListBase::Iterator::Iterator
fun:syncable::BaseTransaction::NotifyTransactionChangingAndEnding
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment