Commit 1ae6d05f authored by rkc@chromium.org's avatar rkc@chromium.org

Stop playing/recording when not needed.

Currently when unpublishing or unsubscribing, we still keep playing or recording
audio till it times out. We need to keep a track of which operations have
requested playing or recording and once those operations are unpublished or
unsubscribed, we need to appropriately stop the record/playback.

To do this, the first change is to stop keeping a list of tokens, instead we
just keep a list of operations. If we send our currently playing token to the
server, it is guaraunteed to _not issue another token, unless our current token
is going to expire in less time than is on the publish. In that case, we simply
replace our currently playing token with the new one, making sure that we
always just need to keep one token around.

With this, the logic for playing/recording is completely changed. Now we just
check if we have active transmit/receive, and if we do, we ensure that we are
playing our current token (or keep recording); if we do not have an active
transmit and we are playing, we stop playing. For all other cases our
ProcessNextTransmit and ProcessNextReceive is a nop.

The one ugliness in the code is that we have to keep the code for processing
the audible and inaudible tokens in the same class, since the WhispernetClient
can only give 'one' method tokens back. If two different classes call the
WhispernetClient to encode tokens, the get samples callback from the second
will overrite the callback for the first, hence the first class will never
get its samples back. Once we find a way around this, we can just have two
AudioDirectiveHandlers, one for audible and one for inaudible, but till then
we need to keep this processing together in one AudioDirectiveHandler.


R=xiyuan@chromium.org
BUG=392028

Review URL: https://codereview.chromium.org/461803003

Cr-Commit-Position: refs/heads/master@{#289219}
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@289219 0039d316-1c4b-4281-b951-d872f2087c98
parent d1e2c63c
......@@ -137,7 +137,7 @@ class ChromeWhispernetClientTest : public ExtensionBrowserTest {
run_loop_->Quit();
}
void TokensCallback(const std::vector<copresence::FullToken>& tokens) {
void TokensCallback(const std::vector<copresence::AudioToken>& tokens) {
ASSERT_TRUE(run_loop_);
run_loop_->Quit();
......
......@@ -4,6 +4,8 @@
#include "chrome/browser/extensions/api/copresence_private/copresence_private_api.h"
#include <vector>
#include "base/lazy_instance.h"
#include "base/stl_util.h"
#include "chrome/browser/copresence/chrome_whispernet_client.h"
......@@ -32,10 +34,10 @@ ExtensionFunction::ResponseAction CopresencePrivateSendFoundFunction::Run() {
scoped_ptr<api::copresence_private::SendFound::Params> params(
api::copresence_private::SendFound::Params::Create(*args_));
EXTENSION_FUNCTION_VALIDATE(params.get());
std::vector<copresence::FullToken> tokens;
std::vector<copresence::AudioToken> tokens;
for (size_t i = 0; i < params->tokens.size(); ++i) {
tokens.push_back(copresence::FullToken(params->tokens[i]->token,
params->tokens[i]->audible));
tokens.push_back(copresence::AudioToken(params->tokens[i]->token,
params->tokens[i]->audible));
}
GetWhispernetClient()->GetTokensCallback().Run(tokens);
return RespondNow(NoArguments());
......
......@@ -2,10 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_
#define COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_
#ifndef COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_H_
#define COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_H_
#include <vector>
#include <string>
#include "base/basictypes.h"
#include "base/macros.h"
......@@ -15,6 +15,7 @@
#include "components/copresence/handlers/audio/audio_directive_list.h"
#include "components/copresence/mediums/audio/audio_recorder.h"
#include "components/copresence/proto/data.pb.h"
#include "components/copresence/timed_map.h"
namespace media {
class AudioBusRefCounted;
......@@ -25,11 +26,24 @@ namespace copresence {
class AudioPlayer;
// The AudioDirectiveHandler handles audio transmit and receive instructions.
// TODO(rkc): Currently since WhispernetClient can only have one token encoded
// callback at a time, we need to have both the audible and inaudible in this
// class. Investigate a better way to do this; a few options are abstracting
// out token encoding to a separate class, or allowing whispernet to have
// multiple callbacks for encoded tokens being sent back and have two versions
// of this class.
class AudioDirectiveHandler {
public:
typedef base::Callback<void(const std::string&,
bool,
const scoped_refptr<media::AudioBusRefCounted>&)>
SamplesCallback;
typedef base::Callback<void(const std::string&, bool, const SamplesCallback&)>
EncodeTokenCallback;
AudioDirectiveHandler(
const AudioRecorder::DecodeSamplesCallback& decode_cb,
const AudioDirectiveList::EncodeTokenCallback& encode_cb);
const AudioDirectiveHandler::EncodeTokenCallback& encode_cb);
virtual ~AudioDirectiveHandler();
// Do not use this class before calling this.
......@@ -38,40 +52,82 @@ class AudioDirectiveHandler {
// Adds an instruction to our handler. The instruction will execute and be
// removed after the ttl expires.
void AddInstruction(const copresence::TokenInstruction& instruction,
const std::string& op_id,
base::TimeDelta ttl_ms);
protected:
// Protected and virtual since we want to be able to mock these out.
virtual void PlayAudio(
const scoped_refptr<media::AudioBusRefCounted>& samples,
base::TimeDelta duration);
virtual void RecordAudio(base::TimeDelta duration);
// Removes all instructions associated with this operation id.
void RemoveInstructions(const std::string& op_id);
// Returns the currently playing DTMF token.
const std::string& PlayingAudibleToken() const {
return current_token_audible_;
}
// Returns the currently playing DSSS token.
const std::string& PlayingInaudibleToken() const {
return current_token_inaudible_;
}
void set_player_audible_for_testing(AudioPlayer* player) {
player_audible_ = player;
}
void set_player_inaudible_for_testing(AudioPlayer* player) {
player_inaudible_ = player;
}
void set_recorder_for_testing(AudioRecorder* recorder) {
recorder_ = recorder;
}
private:
void StopPlayback();
void StopRecording();
FRIEND_TEST_ALL_PREFIXES(AudioDirectiveHandlerTest, Basic);
// Execute the next active transmit instruction.
void ExecuteNextTransmit();
// Execute the next active receive instruction.
void ExecuteNextReceive();
typedef TimedMap<std::string, scoped_refptr<media::AudioBusRefCounted> >
SamplesMap;
AudioDirectiveList directive_list_inaudible_;
AudioDirectiveList directive_list_audible_;
// Processes the next active transmit instruction.
void ProcessNextTransmit();
// Processes the next active receive instruction.
void ProcessNextReceive();
// The next two pointers are self-deleting. When we call Finalize on them,
// they clean themselves up on the Audio thread.
AudioPlayer* player_;
void HandleToken(const std::string token, bool audible);
// This is the method that the whispernet client needs to call to return
// samples to us.
void OnTokenEncoded(const std::string& token,
bool audible,
const scoped_refptr<media::AudioBusRefCounted>& samples);
AudioDirectiveList transmits_list_audible_;
AudioDirectiveList transmits_list_inaudible_;
AudioDirectiveList receives_list_;
// Currently playing tokens.
std::string current_token_audible_;
std::string current_token_inaudible_;
// AudioPlayer and AudioRecorder objects are self-deleting. When we call
// Finalize on them, they clean themselves up on the Audio thread.
AudioPlayer* player_audible_;
AudioPlayer* player_inaudible_;
AudioRecorder* recorder_;
AudioRecorder::DecodeSamplesCallback decode_cb_;
EncodeTokenCallback encode_cb_;
base::OneShotTimer<AudioDirectiveHandler> stop_playback_timer_;
base::OneShotTimer<AudioDirectiveHandler> stop_audible_playback_timer_;
base::OneShotTimer<AudioDirectiveHandler> stop_inaudible_playback_timer_;
base::OneShotTimer<AudioDirectiveHandler> stop_recording_timer_;
// Cache that holds the encoded samples. After reaching its limit, the cache
// expires the oldest samples first.
SamplesMap samples_cache_audible_;
SamplesMap samples_cache_inaudible_;
DISALLOW_COPY_AND_ASSIGN(AudioDirectiveHandler);
};
} // namespace copresence
#endif // COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_
#endif // COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_HANDLER_H_
......@@ -6,6 +6,8 @@
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "components/copresence/mediums/audio/audio_player.h"
#include "components/copresence/mediums/audio/audio_recorder.h"
#include "components/copresence/test/audio_test_support.h"
#include "media/base/audio_bus.h"
#include "testing/gmock/include/gmock/gmock.h"
......@@ -16,31 +18,50 @@ using ::testing::Le;
namespace copresence {
class MockAudioDirectiveHandler : public AudioDirectiveHandler {
class TestAudioPlayer : public AudioPlayer {
public:
MockAudioDirectiveHandler(
const AudioDirectiveList::EncodeTokenCallback& encode_cb)
: AudioDirectiveHandler(AudioRecorder::DecodeSamplesCallback(),
encode_cb) {}
virtual ~MockAudioDirectiveHandler() {}
// Mock out the play/record methods.
MOCK_METHOD2(PlayAudio,
void(const scoped_refptr<media::AudioBusRefCounted>&,
base::TimeDelta));
MOCK_METHOD1(RecordAudio, void(base::TimeDelta));
TestAudioPlayer() {}
virtual ~TestAudioPlayer() {}
// AudioPlayer overrides:
virtual void Initialize() OVERRIDE {}
virtual void Play(
const scoped_refptr<media::AudioBusRefCounted>& /* samples */) OVERRIDE {
set_is_playing(true);
}
virtual void Stop() OVERRIDE { set_is_playing(false); }
virtual void Finalize() OVERRIDE { delete this; }
private:
DISALLOW_COPY_AND_ASSIGN(MockAudioDirectiveHandler);
DISALLOW_COPY_AND_ASSIGN(TestAudioPlayer);
};
class TestAudioRecorder : public AudioRecorder {
public:
TestAudioRecorder() : AudioRecorder(AudioRecorder::DecodeSamplesCallback()) {}
virtual ~TestAudioRecorder() {}
// AudioRecorder overrides:
virtual void Initialize() OVERRIDE {}
virtual void Record() OVERRIDE { set_is_recording(true); }
virtual void Stop() OVERRIDE { set_is_recording(false); }
virtual void Finalize() OVERRIDE { delete this; }
private:
DISALLOW_COPY_AND_ASSIGN(TestAudioRecorder);
};
class AudioDirectiveHandlerTest : public testing::Test {
public:
AudioDirectiveHandlerTest()
: directive_handler_(new MockAudioDirectiveHandler(
: directive_handler_(new AudioDirectiveHandler(
AudioRecorder::DecodeSamplesCallback(),
base::Bind(&AudioDirectiveHandlerTest::EncodeToken,
base::Unretained(this)))) {}
base::Unretained(this)))) {
directive_handler_->set_player_audible_for_testing(new TestAudioPlayer());
directive_handler_->set_player_inaudible_for_testing(new TestAudioPlayer());
directive_handler_->set_recorder_for_testing(new TestAudioRecorder());
}
virtual ~AudioDirectiveHandlerTest() {}
void DirectiveAdded() {}
......@@ -48,16 +69,19 @@ class AudioDirectiveHandlerTest : public testing::Test {
protected:
void EncodeToken(const std::string& token,
bool audible,
const AudioDirectiveList::SamplesCallback& callback) {
const AudioDirectiveHandler::SamplesCallback& callback) {
callback.Run(
token, audible, CreateRandomAudioRefCounted(0x1337, 1, 0x7331));
}
copresence::TokenInstruction CreateTransmitInstruction(
const std::string& token) {
const std::string& token,
bool audible) {
copresence::TokenInstruction instruction;
instruction.set_token_instruction_type(copresence::TRANSMIT);
instruction.set_token_id(token);
instruction.set_medium(audible ? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND);
return instruction;
}
......@@ -71,37 +95,45 @@ class AudioDirectiveHandlerTest : public testing::Test {
// our the audio directive handler since the directive list ctor (invoked
// from the directive handler ctor) will post tasks.
base::MessageLoop message_loop_;
scoped_ptr<MockAudioDirectiveHandler> directive_handler_;
scoped_ptr<AudioDirectiveHandler> directive_handler_;
private:
DISALLOW_COPY_AND_ASSIGN(AudioDirectiveHandlerTest);
};
// TODO(rkc): This test is broken, possibly due to the changes for audible.
TEST_F(AudioDirectiveHandlerTest, DISABLED_Basic) {
const base::TimeDelta kSmallTtl = base::TimeDelta::FromMilliseconds(0x1337);
const base::TimeDelta kLargeTtl = base::TimeDelta::FromSeconds(0x7331);
// Expect to play and record instructions for 'less' than the TTL specified,
// since by the time that the token would have gotten encoded, we would
// have (TTL - time_to_encode) left to play on that instruction.
EXPECT_CALL(*directive_handler_, PlayAudio(_, testing::Le(kLargeTtl)))
.Times(3);
directive_handler_->AddInstruction(CreateTransmitInstruction("token1"),
kLargeTtl);
directive_handler_->AddInstruction(CreateTransmitInstruction("token2"),
kLargeTtl);
directive_handler_->AddInstruction(CreateTransmitInstruction("token3"),
kSmallTtl);
EXPECT_CALL(*directive_handler_, RecordAudio(Le(kLargeTtl))).Times(3);
directive_handler_->AddInstruction(CreateReceiveInstruction(), kLargeTtl);
directive_handler_->AddInstruction(CreateReceiveInstruction(), kSmallTtl);
directive_handler_->AddInstruction(CreateReceiveInstruction(), kLargeTtl);
TEST_F(AudioDirectiveHandlerTest, Basic) {
const base::TimeDelta kTtl = base::TimeDelta::FromMilliseconds(9999);
directive_handler_->AddInstruction(
CreateTransmitInstruction("token", true), "op_id1", kTtl);
directive_handler_->AddInstruction(
CreateTransmitInstruction("token", false), "op_id1", kTtl);
directive_handler_->AddInstruction(
CreateTransmitInstruction("token", false), "op_id2", kTtl);
directive_handler_->AddInstruction(
CreateReceiveInstruction(), "op_id1", kTtl);
directive_handler_->AddInstruction(
CreateReceiveInstruction(), "op_id2", kTtl);
directive_handler_->AddInstruction(
CreateReceiveInstruction(), "op_id3", kTtl);
EXPECT_EQ(true, directive_handler_->player_audible_->IsPlaying());
EXPECT_EQ(true, directive_handler_->player_inaudible_->IsPlaying());
EXPECT_EQ(true, directive_handler_->recorder_->IsRecording());
directive_handler_->RemoveInstructions("op_id1");
EXPECT_EQ(false, directive_handler_->player_audible_->IsPlaying());
EXPECT_EQ(true, directive_handler_->player_inaudible_->IsPlaying());
EXPECT_EQ(true, directive_handler_->recorder_->IsRecording());
directive_handler_->RemoveInstructions("op_id2");
EXPECT_EQ(false, directive_handler_->player_inaudible_->IsPlaying());
EXPECT_EQ(true, directive_handler_->recorder_->IsRecording());
directive_handler_->RemoveInstructions("op_id3");
EXPECT_EQ(false, directive_handler_->recorder_->IsRecording());
}
// TODO(rkc): When we are keeping track of which token we're currently playing,
// add tests to make sure we don't replay if we get a token with a lower ttl
// than the current active.
// TODO(rkc): Write more tests that check more convoluted sequences of
// transmits/receives.
} // namespace copresence
......@@ -6,24 +6,8 @@
#include "base/bind.h"
#include "base/logging.h"
#include "base/strings/string_util.h"
#include "media/base/audio_bus.h"
namespace {
// UrlSafe is defined as:
// '/' represented by a '_' and '+' represented by a '-'
// TODO(rkc): Move this processing to the whispernet wrapper.
std::string FromUrlSafe(std::string token) {
base::ReplaceChars(token, "-", "+", &token);
base::ReplaceChars(token, "_", "/", &token);
return token;
}
const int kSampleExpiryTimeMs = 60 * 60 * 1000; // 60 minutes.
const int kMaxSamples = 10000;
} // namespace
#include "base/memory/scoped_ptr.h"
#include "base/time/time.h"
namespace copresence {
......@@ -32,117 +16,71 @@ namespace copresence {
AudioDirective::AudioDirective() {
}
AudioDirective::AudioDirective(const std::string& token,
const std::string& op_id,
base::Time end_time)
: token(token), op_id(op_id), end_time(end_time) {
}
AudioDirective::AudioDirective(
const std::string& token,
const std::string& op_id,
base::Time end_time,
const scoped_refptr<media::AudioBusRefCounted>& samples)
: token(token), op_id(op_id), end_time(end_time), samples(samples) {
}
AudioDirective::~AudioDirective() {
AudioDirective::AudioDirective(const std::string& op_id, base::Time end_time)
: op_id(op_id), end_time(end_time) {
}
AudioDirectiveList::AudioDirectiveList(
const EncodeTokenCallback& encode_token_callback,
const base::Closure& token_added_callback,
bool use_audible_encoding)
: encode_token_callback_(encode_token_callback),
token_added_callback_(token_added_callback),
use_audible_encoding_(use_audible_encoding),
samples_cache_(base::TimeDelta::FromMilliseconds(kSampleExpiryTimeMs),
kMaxSamples) {
AudioDirectiveList::AudioDirectiveList() {
}
AudioDirectiveList::~AudioDirectiveList() {
}
void AudioDirectiveList::AddTransmitDirective(const std::string& token,
const std::string& op_id,
base::TimeDelta ttl) {
std::string valid_token = FromUrlSafe(token);
void AudioDirectiveList::AddDirective(const std::string& op_id,
base::TimeDelta ttl) {
base::Time end_time = base::Time::Now() + ttl;
if (samples_cache_.HasKey(valid_token)) {
active_transmit_tokens_.push(AudioDirective(
valid_token, op_id, end_time, samples_cache_.GetValue(valid_token)));
// In case this op is already in the list, update it instead of adding
// it again.
std::vector<AudioDirective>::iterator it = FindDirectiveByOpId(op_id);
if (it != active_directives_.end()) {
it->end_time = end_time;
std::make_heap(active_directives_.begin(),
active_directives_.end(),
LatestFirstComparator());
return;
}
// If an encode request for this token has been sent, don't send it again.
if (pending_transmit_tokens_.find(valid_token) !=
pending_transmit_tokens_.end()) {
return;
}
pending_transmit_tokens_[valid_token] =
AudioDirective(valid_token, op_id, end_time);
// All whispernet callbacks will be cleared before we are destructed, so
// unretained is safe to use here.
encode_token_callback_.Run(
valid_token,
use_audible_encoding_,
base::Bind(&AudioDirectiveList::OnTokenEncoded, base::Unretained(this)));
active_directives_.push_back(AudioDirective(op_id, end_time));
std::push_heap(active_directives_.begin(),
active_directives_.end(),
LatestFirstComparator());
}
void AudioDirectiveList::AddReceiveDirective(const std::string& op_id,
base::TimeDelta ttl) {
active_receive_tokens_.push(
AudioDirective(std::string(), op_id, base::Time::Now() + ttl));
}
void AudioDirectiveList::RemoveDirective(const std::string& op_id) {
std::vector<AudioDirective>::iterator it = FindDirectiveByOpId(op_id);
if (it != active_directives_.end())
active_directives_.erase(it);
scoped_ptr<AudioDirective> AudioDirectiveList::GetNextTransmit() {
return GetNextFromList(&active_transmit_tokens_);
std::make_heap(active_directives_.begin(),
active_directives_.end(),
LatestFirstComparator());
}
scoped_ptr<AudioDirective> AudioDirectiveList::GetNextReceive() {
return GetNextFromList(&active_receive_tokens_);
}
scoped_ptr<AudioDirective> AudioDirectiveList::GetNextFromList(
AudioDirectiveQueue* list) {
CHECK(list);
// Checks if we have any valid tokens at all (since the top of the list is
// always pointing to the token with the latest expiry time). If we don't
// have any valid tokens left, clear the list.
if (!list->empty() && list->top().end_time < base::Time::Now()) {
while (!list->empty())
list->pop();
scoped_ptr<AudioDirective> AudioDirectiveList::GetActiveDirective() {
// The top is always the instruction that is ending the latest. If that time
// has passed, means all our previous instructions have expired too, hence
// clear the list.
if (!active_directives_.empty() &&
active_directives_.front().end_time < base::Time::Now()) {
active_directives_.clear();
}
if (list->empty())
if (active_directives_.empty())
return make_scoped_ptr<AudioDirective>(NULL);
return make_scoped_ptr(new AudioDirective(list->top()));
return make_scoped_ptr(new AudioDirective(active_directives_.front()));
}
void AudioDirectiveList::OnTokenEncoded(
const std::string& token,
bool /* audible */,
const scoped_refptr<media::AudioBusRefCounted>& samples) {
// We shouldn't re-encode a token if it's already in the cache.
DCHECK(!samples_cache_.HasKey(token));
DVLOG(3) << "Token: " << token << " encoded.";
samples_cache_.Add(token, samples);
// Copy the samples into their corresponding directive object and move
// that object into the active queue.
std::map<std::string, AudioDirective>::iterator it =
pending_transmit_tokens_.find(token);
it->second.samples = samples;
active_transmit_tokens_.push(it->second);
pending_transmit_tokens_.erase(it);
if (!token_added_callback_.is_null())
token_added_callback_.Run();
std::vector<AudioDirective>::iterator AudioDirectiveList::FindDirectiveByOpId(
const std::string& op_id) {
for (std::vector<AudioDirective>::iterator it = active_directives_.begin();
it != active_directives_.end();
++it) {
if (it->op_id == op_id)
return it;
}
return active_directives_.end();
}
} // namespace copresence
......@@ -2,21 +2,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_
#define COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_
#ifndef COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_H_
#define COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_H_
#include <map>
#include <queue>
#include <string>
#include <vector>
#include "base/basictypes.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/time/time.h"
#include "components/copresence/timed_map.h"
namespace media {
class AudioBusRefCounted;
......@@ -27,21 +22,10 @@ namespace copresence {
struct AudioDirective {
// Default ctor, required by the priority queue.
AudioDirective();
// ctor used to store transmit directives that are awaiting samples.
AudioDirective(const std::string& token,
const std::string& op_id,
base::Time end_time);
// ctor used to construct a complete transmit directive.
AudioDirective(const std::string& token,
const std::string& op_id,
base::Time end_time,
const scoped_refptr<media::AudioBusRefCounted>& samples);
~AudioDirective();
std::string token;
AudioDirective(const std::string& op_id, base::Time end_time);
std::string op_id;
base::Time end_time;
scoped_refptr<media::AudioBusRefCounted> samples;
};
// This class maintains a list of active audio directives. It fetches the audio
......@@ -52,35 +36,13 @@ struct AudioDirective {
// classes from it.
class AudioDirectiveList {
public:
typedef base::Callback<void(const std::string&,
bool,
const scoped_refptr<media::AudioBusRefCounted>&)>
SamplesCallback;
typedef base::Callback<void(const std::string&, bool, const SamplesCallback&)>
EncodeTokenCallback;
AudioDirectiveList(const EncodeTokenCallback& encode_token_callback,
const base::Closure& token_added_callback,
bool use_audible_encoding);
AudioDirectiveList();
virtual ~AudioDirectiveList();
// Adds a token to the token queue, after getting its corresponding samples
// from whispernet.
void AddTransmitDirective(const std::string& token,
const std::string& op_id,
base::TimeDelta ttl);
void AddReceiveDirective(const std::string& op_id, base::TimeDelta ttl);
void AddDirective(const std::string& op_id, base::TimeDelta ttl);
void RemoveDirective(const std::string& op_id);
// Returns the next audio token to play. This also cleans up expired tokens.
scoped_ptr<AudioDirective> GetNextTransmit();
scoped_ptr<AudioDirective> GetNextReceive();
// This is the method that the whispernet client needs to call to return
// samples to us.
void OnTokenEncoded(const std::string& token,
bool audible,
const scoped_refptr<media::AudioBusRefCounted>& samples);
scoped_ptr<AudioDirective> GetActiveDirective();
private:
// Comparator for comparing end_times on audio tokens.
......@@ -93,32 +55,16 @@ class AudioDirectiveList {
}
};
typedef std::priority_queue<AudioDirective,
std::vector<AudioDirective>,
LatestFirstComparator> AudioDirectiveQueue;
typedef TimedMap<std::string, scoped_refptr<media::AudioBusRefCounted> >
SamplesMap;
scoped_ptr<AudioDirective> GetNextFromList(AudioDirectiveQueue* list);
// A map of tokens that are awaiting their samples before we can
// add them to the active transmit tokens list.
std::map<std::string, AudioDirective> pending_transmit_tokens_;
AudioDirectiveQueue active_transmit_tokens_;
AudioDirectiveQueue active_receive_tokens_;
EncodeTokenCallback encode_token_callback_;
base::Closure token_added_callback_;
const bool use_audible_encoding_;
std::vector<AudioDirective>::iterator FindDirectiveByOpId(
const std::string& op_id);
// Cache that holds the encoded samples. After reaching its limit, the cache
// expires the oldest samples first.
SamplesMap samples_cache_;
// This vector will be organized as a heap with the latest time as the first
// element. Only currently active directives will exist in this list.
std::vector<AudioDirective> active_directives_;
DISALLOW_COPY_AND_ASSIGN(AudioDirectiveList);
};
} // namespace copresence
#endif // COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_
#endif // COMPONENTS_COPRESENCE_HANDLERS_AUDIO_AUDIO_DIRECTIVE_LIST_H_
......@@ -7,83 +7,74 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/message_loop/message_loop.h"
#include "components/copresence/test/audio_test_support.h"
#include "media/base/audio_bus.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace copresence {
class AudioDirectiveListTest : public testing::Test {
public:
AudioDirectiveListTest()
: directive_list_(new AudioDirectiveList(
base::Bind(&AudioDirectiveListTest::EncodeToken,
base::Unretained(this)),
base::Bind(&base::DoNothing),
false)) {}
AudioDirectiveListTest() : directive_list_(new AudioDirectiveList()) {}
virtual ~AudioDirectiveListTest() {}
protected:
void EncodeToken(const std::string& token,
bool audible,
const AudioDirectiveList::SamplesCallback& callback) {
callback.Run(
token, audible, CreateRandomAudioRefCounted(0x1337, 1, 0x7331));
}
base::MessageLoop message_loop_;
scoped_ptr<AudioDirectiveList> directive_list_;
};
// TODO(rkc): Fix errors in these tests. See crbug/402578.
#define MAYBE_Basic DISABLED_Basic
#define MAYBE_OutOfOrderAndMultiple DISABLED_OutOfOrderAndMultiple
TEST_F(AudioDirectiveListTest, MAYBE_Basic) {
const base::TimeDelta kZeroTtl = base::TimeDelta::FromMilliseconds(0);
const base::TimeDelta kLargeTtl = base::TimeDelta::FromSeconds(0x7331);
TEST_F(AudioDirectiveListTest, Basic) {
const base::TimeDelta kTtl = base::TimeDelta::FromSeconds(9999);
directive_list_->AddTransmitDirective("token1", "op_id1", kZeroTtl);
directive_list_->AddTransmitDirective("token2", "op_id2", kLargeTtl);
directive_list_->AddTransmitDirective("token3", "op_id1", kZeroTtl);
EXPECT_EQ(NULL, directive_list_->GetActiveDirective().get());
EXPECT_EQ("token2", directive_list_->GetNextTransmit()->token);
directive_list_->AddDirective("op_id1", kTtl);
directive_list_->AddDirective("op_id2", kTtl * 3);
directive_list_->AddDirective("op_id3", kTtl * 2);
EXPECT_EQ("op_id2", directive_list_->GetActiveDirective()->op_id);
directive_list_->AddReceiveDirective("op_id1", kZeroTtl);
directive_list_->AddReceiveDirective("op_id3", kZeroTtl);
directive_list_->AddReceiveDirective("op_id3", kLargeTtl);
directive_list_->AddReceiveDirective("op_id7", kZeroTtl);
EXPECT_EQ("op_id3", directive_list_->GetNextReceive()->op_id);
directive_list_->RemoveDirective("op_id2");
EXPECT_EQ("op_id3", directive_list_->GetActiveDirective()->op_id);
}
TEST_F(AudioDirectiveListTest, MAYBE_OutOfOrderAndMultiple) {
const base::TimeDelta kZeroTtl = base::TimeDelta::FromMilliseconds(0);
const base::TimeDelta kLargeTtl = base::TimeDelta::FromSeconds(0x7331);
EXPECT_EQ(NULL, directive_list_->GetNextTransmit().get());
EXPECT_EQ(NULL, directive_list_->GetNextReceive().get());
directive_list_->AddTransmitDirective("token1", "op_id1", kZeroTtl);
directive_list_->AddTransmitDirective("token2", "op_id2", kLargeTtl);
directive_list_->AddTransmitDirective("token3", "op_id1", kLargeTtl);
// Should keep getting the directive till it expires or we add a newer one.
EXPECT_EQ("token3", directive_list_->GetNextTransmit()->token);
EXPECT_EQ("token3", directive_list_->GetNextTransmit()->token);
EXPECT_EQ("token3", directive_list_->GetNextTransmit()->token);
EXPECT_EQ(NULL, directive_list_->GetNextReceive().get());
directive_list_->AddReceiveDirective("op_id1", kLargeTtl);
directive_list_->AddReceiveDirective("op_id3", kZeroTtl);
directive_list_->AddReceiveDirective("op_id3", kLargeTtl);
directive_list_->AddReceiveDirective("op_id7", kLargeTtl);
TEST_F(AudioDirectiveListTest, AddDirectiveMultiple) {
const base::TimeDelta kTtl = base::TimeDelta::FromSeconds(9999);
directive_list_->AddDirective("op_id1", kTtl);
directive_list_->AddDirective("op_id2", kTtl * 2);
directive_list_->AddDirective("op_id3", kTtl * 3 * 2);
directive_list_->AddDirective("op_id3", kTtl * 3 * 3);
directive_list_->AddDirective("op_id4", kTtl * 4);
EXPECT_EQ("op_id3", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id3");
EXPECT_EQ("op_id4", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id4");
EXPECT_EQ("op_id2", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id2");
EXPECT_EQ("op_id1", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id1");
EXPECT_EQ(NULL, directive_list_->GetActiveDirective().get());
}
// Should keep getting the directive till it expires or we add a newer one.
EXPECT_EQ("op_id7", directive_list_->GetNextReceive()->op_id);
EXPECT_EQ("op_id7", directive_list_->GetNextReceive()->op_id);
EXPECT_EQ("op_id7", directive_list_->GetNextReceive()->op_id);
TEST_F(AudioDirectiveListTest, RemoveDirectiveMultiple) {
const base::TimeDelta kTtl = base::TimeDelta::FromSeconds(9999);
directive_list_->AddDirective("op_id1", kTtl);
directive_list_->AddDirective("op_id2", kTtl * 2);
directive_list_->AddDirective("op_id3", kTtl * 3);
directive_list_->AddDirective("op_id4", kTtl * 4);
EXPECT_EQ("op_id4", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id4");
EXPECT_EQ("op_id3", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id3");
directive_list_->RemoveDirective("op_id3");
directive_list_->RemoveDirective("op_id3");
EXPECT_EQ("op_id2", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id2");
EXPECT_EQ("op_id1", directive_list_->GetActiveDirective()->op_id);
directive_list_->RemoveDirective("op_id1");
EXPECT_EQ(NULL, directive_list_->GetActiveDirective().get());
}
} // namespace copresence
......@@ -14,7 +14,7 @@ DirectiveHandler::DirectiveHandler() {}
void DirectiveHandler::Initialize(
const AudioRecorder::DecodeSamplesCallback& decode_cb,
const AudioDirectiveList::EncodeTokenCallback& encode_cb) {
const AudioDirectiveHandler::EncodeTokenCallback& encode_cb) {
audio_handler_.reset(new AudioDirectiveHandler(decode_cb, encode_cb));
audio_handler_->Initialize();
}
......@@ -26,6 +26,16 @@ void DirectiveHandler::AddDirective(const Directive& directive) {
// We only handle Token directives; wifi/ble requests aren't implemented.
DCHECK_EQ(directive.instruction_type(), TOKEN);
std::string op_id;
if (directive.has_published_message_id()) {
op_id = directive.published_message_id();
} else if (directive.has_subscription_id()) {
op_id = directive.subscription_id();
} else {
NOTREACHED() << "No operation associated with directive!";
return;
}
const TokenInstruction& ti = directive.token_instruction();
DCHECK(audio_handler_.get()) << "Clients must call Initialize() before "
<< "any other DirectiveHandler methods.";
......@@ -33,14 +43,22 @@ void DirectiveHandler::AddDirective(const Directive& directive) {
if (ti.medium() == AUDIO_ULTRASOUND_PASSBAND ||
ti.medium() == AUDIO_AUDIBLE_DTMF) {
audio_handler_->AddInstruction(
ti, base::TimeDelta::FromMilliseconds(directive.ttl_millis()));
ti, op_id, base::TimeDelta::FromMilliseconds(directive.ttl_millis()));
}
}
void DirectiveHandler::RemoveDirectives(const std::string& /* op_id */) {
// TODO(rkc): Forward the remove directive call to all the directive handlers.
void DirectiveHandler::RemoveDirectives(const std::string& op_id) {
DCHECK(audio_handler_.get()) << "Clients must call Initialize() before "
<< "any other DirectiveHandler methods.";
audio_handler_->RemoveInstructions(op_id);
}
const std::string& DirectiveHandler::CurrentAudibleToken() const {
return audio_handler_->PlayingAudibleToken();
}
const std::string& DirectiveHandler::CurrentInaudibleToken() const {
return audio_handler_->PlayingInaudibleToken();
}
} // namespace copresence
......@@ -10,12 +10,11 @@
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/scoped_ptr.h"
#include "components/copresence/handlers/audio/audio_directive_list.h"
#include "components/copresence/handlers/audio/audio_directive_handler.h"
#include "components/copresence/mediums/audio/audio_recorder.h"
namespace copresence {
class AudioDirectiveHandler;
class Directive;
// The directive handler manages transmit and receive directives
......@@ -31,13 +30,16 @@ class DirectiveHandler {
// and make the constructor private.
virtual void Initialize(
const AudioRecorder::DecodeSamplesCallback& decode_cb,
const AudioDirectiveList::EncodeTokenCallback& encode_cb);
const AudioDirectiveHandler::EncodeTokenCallback& encode_cb);
// Adds a directive to handle.
virtual void AddDirective(const copresence::Directive& directive);
// Removes any directives associated with the given operation id.
virtual void RemoveDirectives(const std::string& op_id);
const std::string& CurrentAudibleToken() const;
const std::string& CurrentInaudibleToken() const;
private:
scoped_ptr<AudioDirectiveHandler> audio_handler_;
......
......@@ -5,7 +5,7 @@
#include "components/copresence/mediums/audio/audio_player.h"
#include <algorithm>
#include <vector>
#include <string>
#include "base/bind.h"
#include "base/bind_helpers.h"
......@@ -29,7 +29,7 @@ namespace copresence {
// Public methods.
AudioPlayer::AudioPlayer()
: stream_(NULL), is_playing_(false), frame_index_(0) {
: is_playing_(false), stream_(NULL), frame_index_(0) {
}
AudioPlayer::~AudioPlayer() {
......@@ -56,6 +56,10 @@ void AudioPlayer::Stop() {
base::Bind(&AudioPlayer::StopOnAudioThread, base::Unretained(this)));
}
bool AudioPlayer::IsPlaying() {
return is_playing_;
}
void AudioPlayer::Finalize() {
media::AudioManager::Get()->GetTaskRunner()->PostTask(
FROM_HERE,
......@@ -104,7 +108,6 @@ void AudioPlayer::PlayOnAudioThread(
return;
}
DVLOG(2) << "Playing Audio.";
is_playing_ = true;
stream_->Start(this);
}
......
......@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_
#define COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_
#ifndef COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_H_
#define COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_H_
#include <vector>
......@@ -26,17 +26,19 @@ class AudioPlayer : public media::AudioOutputStream::AudioSourceCallback {
AudioPlayer();
// Initializes the object. Do not use this object before calling this method.
void Initialize();
virtual void Initialize();
// Play the given samples. These samples will keep on being played in a loop
// till we explicitly tell the player to stop playing.
void Play(const scoped_refptr<media::AudioBusRefCounted>& samples);
virtual void Play(const scoped_refptr<media::AudioBusRefCounted>& samples);
// Stop playing.
void Stop();
virtual void Stop();
// Cleans up and deletes this object. Do not use object after this call.
void Finalize();
virtual void Finalize();
bool IsPlaying();
// Takes ownership of the stream.
void set_output_stream_for_testing(
......@@ -44,13 +46,15 @@ class AudioPlayer : public media::AudioOutputStream::AudioSourceCallback {
output_stream_for_testing_.reset(output_stream_for_testing);
}
protected:
virtual ~AudioPlayer();
void set_is_playing(bool is_playing) { is_playing_ = is_playing; }
private:
friend class AudioPlayerTest;
FRIEND_TEST_ALL_PREFIXES(AudioPlayerTest, BasicPlayAndStop);
FRIEND_TEST_ALL_PREFIXES(AudioPlayerTest, OutOfOrderPlayAndStopMultiple);
virtual ~AudioPlayer();
// Methods to do our various operations; all of these need to be run on the
// audio thread.
void InitializeOnAudioThread();
......@@ -70,13 +74,13 @@ class AudioPlayer : public media::AudioOutputStream::AudioSourceCallback {
// performed.
void FlushAudioLoopForTesting();
bool is_playing_;
// Self-deleting object.
media::AudioOutputStream* stream_;
scoped_ptr<media::AudioOutputStream> output_stream_for_testing_;
bool is_playing_;
// All fields below here are protected by this lock.
base::Lock state_lock_;
......@@ -90,4 +94,4 @@ class AudioPlayer : public media::AudioOutputStream::AudioSourceCallback {
} // namespace copresence
#endif // COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_
#endif // COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_PLAYER_H_
......@@ -51,8 +51,8 @@ void ProcessSamples(scoped_ptr<media::AudioBus> bus,
// Public methods.
AudioRecorder::AudioRecorder(const DecodeSamplesCallback& decode_callback)
: stream_(NULL),
is_recording_(false),
: is_recording_(false),
stream_(NULL),
decode_callback_(decode_callback),
total_buffer_frames_(0),
buffer_frame_index_(0) {
......@@ -80,6 +80,10 @@ void AudioRecorder::Stop() {
base::Bind(&AudioRecorder::StopOnAudioThread, base::Unretained(this)));
}
bool AudioRecorder::IsRecording() {
return is_recording_;
}
void AudioRecorder::Finalize() {
media::AudioManager::Get()->GetTaskRunner()->PostTask(
FROM_HERE,
......@@ -137,7 +141,6 @@ void AudioRecorder::RecordOnAudioThread() {
if (!stream_ || is_recording_)
return;
DVLOG(2) << "Recording Audio.";
converter_->Reset();
stream_->Start(this);
is_recording_ = true;
......
......@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_
#define COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_
#ifndef COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_H_
#define COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_H_
#include <string>
......@@ -33,13 +33,15 @@ class AudioRecorder : public media::AudioInputStream::AudioInputCallback,
explicit AudioRecorder(const DecodeSamplesCallback& decode_callback);
// Initializes the object. Do not use this object before calling this method.
void Initialize();
virtual void Initialize();
void Record();
void Stop();
virtual void Record();
virtual void Stop();
// Cleans up and deletes this object. Do not use object after this call.
void Finalize();
virtual void Finalize();
bool IsRecording();
// Takes ownership of the stream.
void set_input_stream_for_testing(
......@@ -52,13 +54,15 @@ class AudioRecorder : public media::AudioInputStream::AudioInputCallback,
params_for_testing_.reset(params_for_testing);
}
protected:
virtual ~AudioRecorder();
void set_is_recording(bool is_recording) { is_recording_ = is_recording; }
private:
friend class AudioRecorderTest;
FRIEND_TEST_ALL_PREFIXES(AudioRecorderTest, BasicRecordAndStop);
FRIEND_TEST_ALL_PREFIXES(AudioRecorderTest, OutOfOrderRecordAndStopMultiple);
virtual ~AudioRecorder();
// Methods to do our various operations; all of these need to be run on the
// audio thread.
void InitializeOnAudioThread();
......@@ -85,8 +89,9 @@ class AudioRecorder : public media::AudioInputStream::AudioInputCallback,
// performed.
void FlushAudioLoopForTesting();
media::AudioInputStream* stream_;
bool is_recording_;
media::AudioInputStream* stream_;
DecodeSamplesCallback decode_callback_;
// ProvideInput will use this buffer as its source.
......@@ -108,4 +113,4 @@ class AudioRecorder : public media::AudioInputStream::AudioInputCallback,
} // namespace copresence
#endif // COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_
#endif // COMPONENTS_COPRESENCE_MEDIUMS_AUDIO_AUDIO_RECORDER_H_
......@@ -18,8 +18,8 @@ class AudioBusRefCounted;
namespace copresence {
struct FullToken {
FullToken(const std::string& token, bool audible)
struct AudioToken {
AudioToken(const std::string& token, bool audible)
: token(token), audible(audible) {}
std::string token;
bool audible;
......@@ -34,7 +34,7 @@ class WhispernetClient {
// Generic callback to indicate a boolean success or failure.
typedef base::Callback<void(bool)> SuccessCallback;
// Callback that returns detected tokens.
typedef base::Callback<void(const std::vector<FullToken>&)> TokensCallback;
typedef base::Callback<void(const std::vector<AudioToken>&)> TokensCallback;
// Callback that returns encoded samples for a given token.
typedef base::Callback<void(const std::string&,
bool,
......@@ -70,7 +70,7 @@ class WhispernetClient {
virtual SuccessCallback GetDetectBroadcastCallback() = 0;
virtual SuccessCallback GetInitializedCallback() = 0;
virtual ~WhispernetClient() {};
virtual ~WhispernetClient() {}
};
} // namespace copresence
......
......@@ -105,7 +105,7 @@ BroadcastScanConfiguration ExtractTokenExchangeStrategy(
// Strategies for publishes.
if (request.has_manage_messages_request()) {
const RepeatedPtrField<PublishedMessage> messages =
const RepeatedPtrField<PublishedMessage>& messages =
request.manage_messages_request().message_to_publish();
for (int i = 0; i < messages.size(); ++i) {
BroadcastScanConfiguration config =
......@@ -119,7 +119,7 @@ BroadcastScanConfiguration ExtractTokenExchangeStrategy(
// Strategies for subscriptions.
if (request.has_manage_subscriptions_request()) {
const RepeatedPtrField<Subscription> messages =
const RepeatedPtrField<Subscription>& messages =
request.manage_subscriptions_request().subscription();
for (int i = 0; i < messages.size(); ++i) {
BroadcastScanConfiguration config =
......@@ -194,6 +194,17 @@ ClientVersion* CreateVersion(const std::string& client,
return version;
}
void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
TokenObservation* token_observation =
request->mutable_update_signals_request()->add_token_observation();
token_observation->set_token_id(ToUrlSafe(token.token));
TokenSignals* signals = token_observation->add_signals();
signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND);
signals->set_observed_time_millis(base::Time::Now().ToJsTime());
}
} // namespace
// Public methods
......@@ -251,8 +262,15 @@ void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
DVLOG(3) << "Sending report request to server.";
// If we are unpublishing or unsubscribing, we need to stop those publish or
// subscribes right away, we don't need to wait for the server to tell us.
ProcessRemovedOperations(*request);
request->mutable_update_signals_request()->set_allocated_state(
GetDeviceCapabilities(*request).release());
AddPlayingTokens(request.get());
SendServerRequest(kReportRequestRpcName,
app_id,
request.Pass(),
......@@ -262,25 +280,15 @@ void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
status_callback));
}
void RpcHandler::ReportTokens(const std::vector<FullToken>& tokens) {
void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
DCHECK(!tokens.empty());
scoped_ptr<ReportRequest> request(new ReportRequest);
for (size_t i = 0; i < tokens.size(); ++i) {
const std::string& token = ToUrlSafe(tokens[i].token);
if (invalid_audio_token_cache_.HasKey(token))
if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
continue;
DVLOG(3) << "Sending token " << token << " to server.";
TokenObservation* token_observation =
request->mutable_update_signals_request()->add_token_observation();
token_observation->set_token_id(token);
TokenSignals* signals = token_observation->add_signals();
signals->set_medium(tokens[i].audible ? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND);
signals->set_observed_time_millis(base::Time::Now().ToJsTime());
DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
AddTokenToRequest(request.get(), tokens[i]);
}
SendReportRequest(request.Pass());
}
......@@ -415,6 +423,38 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
status_callback.Run(SUCCESS);
}
void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
// Remove unpublishes.
if (request.has_manage_messages_request()) {
const RepeatedPtrField<std::string>& unpublishes =
request.manage_messages_request().id_to_unpublish();
for (int i = 0; i < unpublishes.size(); ++i)
directive_handler_->RemoveDirectives(unpublishes.Get(i));
}
// Remove unsubscribes.
if (request.has_manage_subscriptions_request()) {
const RepeatedPtrField<std::string>& unsubscribes =
request.manage_subscriptions_request().id_to_unsubscribe();
for (int i = 0; i < unsubscribes.size(); ++i)
directive_handler_->RemoveDirectives(unsubscribes.Get(i));
}
}
void RpcHandler::AddPlayingTokens(ReportRequest* request) {
if (!directive_handler_)
return;
const std::string& audible_token = directive_handler_->CurrentAudibleToken();
const std::string& inaudible_token =
directive_handler_->CurrentInaudibleToken();
if (!audible_token.empty())
AddTokenToRequest(request, AudioToken(audible_token, true));
if (!inaudible_token.empty())
AddTokenToRequest(request, AudioToken(inaudible_token, false));
}
void RpcHandler::DispatchMessages(
const RepeatedPtrField<SubscribedMessage>& messages) {
if (messages.size() == 0)
......
......@@ -50,7 +50,7 @@ class RpcHandler {
const StatusCallback& callback);
// Report a set of tokens to the server for a given medium.
void ReportTokens(const std::vector<FullToken>& tokens);
void ReportTokens(const std::vector<AudioToken>& tokens);
// Create the directive handler and connect it to
// the whispernet client specified by the delegate.
......@@ -85,6 +85,15 @@ class RpcHandler {
int http_status_code,
const std::string& response_data);
// If the request has any unpublish or unsubscribe operations, it removes
// them from our directive handlers.
void ProcessRemovedOperations(const ReportRequest& request);
// Add all currently playing tokens to the update signals in this report
// request. This ensures that the server doesn't keep issueing new tokens to
// us when we're already playing valid tokens.
void AddPlayingTokens(ReportRequest* request);
void DispatchMessages(
const google::protobuf::RepeatedPtrField<SubscribedMessage>&
subscribed_messages);
......
......@@ -62,7 +62,7 @@ class FakeDirectiveHandler : public DirectiveHandler {
virtual void Initialize(
const AudioRecorder::DecodeSamplesCallback& decode_cb,
const AudioDirectiveList::EncodeTokenCallback& encode_cb) OVERRIDE {}
const AudioDirectiveHandler::EncodeTokenCallback& encode_cb) OVERRIDE {}
virtual void AddDirective(const Directive& directive) OVERRIDE {
added_directives_.push_back(directive);
......@@ -249,11 +249,15 @@ TEST_F(RpcHandlerTest, CreateRequestHeader) {
report->header().registered_device_id());
}
TEST_F(RpcHandlerTest, ReportTokens) {
std::vector<FullToken> test_tokens;
test_tokens.push_back(FullToken("token 1", false));
test_tokens.push_back(FullToken("token 2", true));
test_tokens.push_back(FullToken("token 3", false));
// TODO(ckehoe): Renable these after https://codereview.chromium.org/453203002/
// lands.
#define MAYBE_ReportTokens DISABLED_ReportTokens
TEST_F(RpcHandlerTest, MAYBE_ReportTokens) {
std::vector<AudioToken> test_tokens;
test_tokens.push_back(AudioToken("token 1", false));
test_tokens.push_back(AudioToken("token 2", true));
test_tokens.push_back(AudioToken("token 3", false));
AddInvalidToken("token 2");
rpc_handler_.ReportTokens(test_tokens);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment