Commit 4a058243 authored by ckehoe's avatar ckehoe Committed by Commit bot

Moving the DirectiveHandler to be owned by CopresenceManager. This also adds...

Moving the DirectiveHandler to be owned by CopresenceManager. This also adds the ability to queue Directives. Tests will be forthcoming.

BUG=424253

Review URL: https://codereview.chromium.org/684273004

Cr-Commit-Position: refs/heads/master@{#302362}
parent a34136a3
...@@ -6,7 +6,8 @@ ...@@ -6,7 +6,8 @@
#include "base/bind.h" #include "base/bind.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "components/copresence/public/copresence_delegate.h" #include "components/copresence/handlers/directive_handler.h"
#include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/whispernet_client.h" #include "components/copresence/public/whispernet_client.h"
#include "components/copresence/rpc/rpc_handler.h" #include "components/copresence/rpc/rpc_handler.h"
...@@ -37,7 +38,7 @@ scoped_ptr<CopresenceManager> CopresenceManager::Create( ...@@ -37,7 +38,7 @@ scoped_ptr<CopresenceManager> CopresenceManager::Create(
} }
// Public methods // Public functions.
CopresenceManagerImpl::~CopresenceManagerImpl() { CopresenceManagerImpl::~CopresenceManagerImpl() {
whispernet_init_callback_.Cancel(); whispernet_init_callback_.Cancel();
...@@ -57,7 +58,6 @@ void CopresenceManagerImpl::ExecuteReportRequest( ...@@ -57,7 +58,6 @@ void CopresenceManagerImpl::ExecuteReportRequest(
// Check if we are initialized enough to execute this request. // Check if we are initialized enough to execute this request.
// If we haven't seen this auth token yet, we need to register for it. // If we haven't seen this auth token yet, we need to register for it.
// TODO(ckehoe): Queue per device ID instead of globally. // TODO(ckehoe): Queue per device ID instead of globally.
DCHECK(rpc_handler_);
const std::string& auth_token = delegate_->GetAuthToken(); const std::string& auth_token = delegate_->GetAuthToken();
if (!rpc_handler_->IsRegisteredForToken(auth_token)) { if (!rpc_handler_->IsRegisteredForToken(auth_token)) {
std::string token_str = auth_token.empty() ? "(anonymous)" : std::string token_str = auth_token.empty() ? "(anonymous)" :
...@@ -87,18 +87,20 @@ void CopresenceManagerImpl::ExecuteReportRequest( ...@@ -87,18 +87,20 @@ void CopresenceManagerImpl::ExecuteReportRequest(
} }
} }
// Private methods
// Private functions.
CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
: init_failed_(false), : delegate_(delegate),
pending_init_operations_(0),
// This callback gets cancelled when we are destroyed. // This callback gets cancelled when we are destroyed.
whispernet_init_callback_( whispernet_init_callback_(
base::Bind(&CopresenceManagerImpl::InitStepComplete, base::Bind(&CopresenceManagerImpl::InitStepComplete,
base::Unretained(this), base::Unretained(this),
"Whispernet proxy initialization")), "Whispernet proxy initialization")),
pending_init_operations_(0), init_failed_(false),
delegate_(delegate), directive_handler_(new DirectiveHandler),
rpc_handler_(new RpcHandler(delegate)) { rpc_handler_(new RpcHandler(delegate, directive_handler_.get())) {
DCHECK(delegate); DCHECK(delegate);
DCHECK(delegate->GetWhispernetClient()); DCHECK(delegate->GetWhispernetClient());
...@@ -111,9 +113,14 @@ void CopresenceManagerImpl::CompleteInitialization() { ...@@ -111,9 +113,14 @@ void CopresenceManagerImpl::CompleteInitialization() {
if (pending_init_operations_) if (pending_init_operations_)
return; return;
DCHECK(rpc_handler_.get()); if (!init_failed_) {
if (!init_failed_) // When RpcHandler is destroyed, it disconnects this callback.
rpc_handler_->ConnectToWhispernet(); // TODO(ckehoe): Use a CancelableCallback instead.
delegate_->GetWhispernetClient()->RegisterTokensCallback(
base::Bind(&RpcHandler::ReportTokens,
base::Unretained(rpc_handler_.get())));
directive_handler_->Start(delegate_->GetWhispernetClient());
}
// Not const because SendReportRequest takes ownership of the ReportRequests. // Not const because SendReportRequest takes ownership of the ReportRequests.
// This is ok though, as the entire queue is deleted afterwards. // This is ok though, as the entire queue is deleted afterwards.
......
...@@ -6,14 +6,11 @@ ...@@ -6,14 +6,11 @@
#define COMPONENTS_COPRESENCE_COPRESENCE_MANAGER_IMPL_H_ #define COMPONENTS_COPRESENCE_COPRESENCE_MANAGER_IMPL_H_
#include <string> #include <string>
#include <vector>
#include "base/callback.h"
#include "base/cancelable_callback.h" #include "base/cancelable_callback.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "base/memory/scoped_vector.h" #include "base/memory/scoped_vector.h"
#include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/copresence_manager.h" #include "components/copresence/public/copresence_manager.h"
namespace net { namespace net {
...@@ -22,6 +19,8 @@ class URLContextGetter; ...@@ -22,6 +19,8 @@ class URLContextGetter;
namespace copresence { namespace copresence {
class DirectiveHandler;
class ReportRequest;
class RpcHandler; class RpcHandler;
struct PendingRequest { struct PendingRequest {
...@@ -55,18 +54,17 @@ class CopresenceManagerImpl : public CopresenceManager { ...@@ -55,18 +54,17 @@ class CopresenceManagerImpl : public CopresenceManager {
void CompleteInitialization(); void CompleteInitialization();
void InitStepComplete(const std::string& step, bool success); void InitStepComplete(const std::string& step, bool success);
bool init_failed_; // Belongs to the caller.
ScopedVector<PendingRequest> pending_requests_queue_; CopresenceDelegate* const delegate_;
int pending_init_operations_;
base::CancelableCallback<void(bool)> whispernet_init_callback_; base::CancelableCallback<void(bool)> whispernet_init_callback_;
bool init_failed_;
// TODO(rkc): This code is almost identical to what we use in feedback to ScopedVector<PendingRequest> pending_requests_queue_;
// perform multiple blocking tasks and then run a post process method. Look
// into refactoring it all out to a common construct, like maybe a
// PostMultipleTasksAndReply?
int pending_init_operations_;
CopresenceDelegate* const delegate_; // The RpcHandler depends on the directive handler.
scoped_ptr<DirectiveHandler> directive_handler_;
scoped_ptr<RpcHandler> rpc_handler_; scoped_ptr<RpcHandler> rpc_handler_;
DISALLOW_COPY_AND_ASSIGN(CopresenceManagerImpl); DISALLOW_COPY_AND_ASSIGN(CopresenceManagerImpl);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "components/copresence/handlers/directive_handler.h" #include "components/copresence/handlers/directive_handler.h"
#include "base/bind.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "components/copresence/handlers/audio/audio_directive_handler.h" #include "components/copresence/handlers/audio/audio_directive_handler.h"
...@@ -11,20 +12,38 @@ ...@@ -11,20 +12,38 @@
namespace copresence { namespace copresence {
DirectiveHandler::DirectiveHandler() {} // Public functions
void DirectiveHandler::Initialize( DirectiveHandler::DirectiveHandler()
const AudioManager::DecodeSamplesCallback& decode_cb, : audio_handler_(new AudioDirectiveHandler),
const AudioManager::EncodeTokenCallback& encode_cb) { whispernet_client_(nullptr) {}
audio_handler_.reset(new AudioDirectiveHandler());
audio_handler_->Initialize(decode_cb, encode_cb); DirectiveHandler::~DirectiveHandler() {}
}
void DirectiveHandler::Start(WhispernetClient* whispernet_client) {
DCHECK(whispernet_client);
whispernet_client_ = whispernet_client;
// TODO(ckehoe): Just pass Whispernet all the way down to the AudioManager.
// We shouldn't be concerned with these details here.
audio_handler_->Initialize(
base::Bind(&WhispernetClient::DecodeSamples,
base::Unretained(whispernet_client_)),
base::Bind(&DirectiveHandler::EncodeToken,
base::Unretained(this)));
DirectiveHandler::~DirectiveHandler() { // Run all the queued directives.
for (const auto& op_id : pending_directives_) {
for (const Directive& directive : op_id.second) {
StartDirective(op_id.first, directive);
}
}
pending_directives_.clear();
} }
void DirectiveHandler::AddDirective(const Directive& directive) { void DirectiveHandler::AddDirective(const Directive& directive) {
// We only handle Token directives; wifi/ble requests aren't implemented. // We only handle transmit and receive directives.
// WiFi and BLE scans aren't implemented.
DCHECK_EQ(directive.instruction_type(), TOKEN); DCHECK_EQ(directive.instruction_type(), TOKEN);
std::string op_id; std::string op_id;
...@@ -37,25 +56,57 @@ void DirectiveHandler::AddDirective(const Directive& directive) { ...@@ -37,25 +56,57 @@ void DirectiveHandler::AddDirective(const Directive& directive) {
return; return;
} }
if (!whispernet_client_) {
pending_directives_[op_id].push_back(directive);
} else {
StartDirective(op_id, directive);
}
}
void DirectiveHandler::RemoveDirectives(const std::string& op_id) {
// If whispernet_client_ is null, audio_handler_ hasn't been Initialized.
if (whispernet_client_) {
audio_handler_->RemoveInstructions(op_id);
} else {
pending_directives_.erase(op_id);
}
}
const std::string DirectiveHandler::GetCurrentAudioToken(AudioType type) const {
// If whispernet_client_ is null, audio_handler_ hasn't been Initialized.
return whispernet_client_ ? audio_handler_->PlayingToken(type) : "";
}
// Private functions
void DirectiveHandler::StartDirective(const std::string& op_id,
const Directive& directive) {
const TokenInstruction& ti = directive.token_instruction(); const TokenInstruction& ti = directive.token_instruction();
DCHECK(audio_handler_.get()) << "Clients must call Initialize() before "
<< "any other DirectiveHandler methods.";
// We currently only support audio.
if (ti.medium() == AUDIO_ULTRASOUND_PASSBAND || if (ti.medium() == AUDIO_ULTRASOUND_PASSBAND ||
ti.medium() == AUDIO_AUDIBLE_DTMF) { ti.medium() == AUDIO_AUDIBLE_DTMF) {
audio_handler_->AddInstruction( audio_handler_->AddInstruction(
ti, op_id, base::TimeDelta::FromMilliseconds(directive.ttl_millis())); ti, op_id, base::TimeDelta::FromMilliseconds(directive.ttl_millis()));
} else {
// We should only get audio directives.
NOTREACHED() << "Received directive for unimplemented medium "
<< ti.medium();
} }
} }
void DirectiveHandler::RemoveDirectives(const std::string& op_id) { // TODO(ckehoe): We don't need to re-register the samples callback
DCHECK(audio_handler_.get()) << "Clients must call Initialize() before " // every time. Which means this whole function is unnecessary.
<< "any other DirectiveHandler methods."; void DirectiveHandler::EncodeToken(
audio_handler_->RemoveInstructions(op_id); const std::string& token,
} AudioType type,
const WhispernetClient::SamplesCallback& samples_callback) {
const std::string DirectiveHandler::GetCurrentAudioToken(AudioType type) const { DCHECK(type == AUDIBLE || type == INAUDIBLE);
return audio_handler_->PlayingToken(type); // TODO(ckehoe): This null check shouldn't be necessary.
// It's only here for tests.
if (whispernet_client_) {
whispernet_client_->RegisterSamplesCallback(samples_callback);
whispernet_client_->EncodeToken(token, type);
}
} }
} // namespace copresence } // namespace copresence
...@@ -5,41 +5,57 @@ ...@@ -5,41 +5,57 @@
#ifndef COMPONENTS_COPRESENCE_HANDLERS_DIRECTIVE_HANDLER_H_ #ifndef COMPONENTS_COPRESENCE_HANDLERS_DIRECTIVE_HANDLER_H_
#define COMPONENTS_COPRESENCE_HANDLERS_DIRECTIVE_HANDLER_H_ #define COMPONENTS_COPRESENCE_HANDLERS_DIRECTIVE_HANDLER_H_
#include <map>
#include <string> #include <string>
#include <vector>
#include "base/callback.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "components/copresence/handlers/audio/audio_directive_handler.h" #include "components/copresence/public/whispernet_client.h"
#include "components/copresence/mediums/audio/audio_manager.h"
namespace copresence { namespace copresence {
class AudioDirectiveHandler;
class Directive; class Directive;
// The directive handler manages transmit and receive directives // The directive handler manages transmit and receive directives.
// given to it by the manager. // TODO(ckehoe): Add tests for this class.
// TODO(ckehoe): Turn this into an interface.
class DirectiveHandler { class DirectiveHandler {
public: public:
DirectiveHandler(); DirectiveHandler();
virtual ~DirectiveHandler(); virtual ~DirectiveHandler();
// Initialize the |audio_handler_| with the appropriate callbacks. // Starts processing directives with the provided Whispernet client.
// This function must be called before any others. // Directives will be queued until this function is called.
// TODO(ckehoe): Instead of this, use a static Create() method // |whispernet_client| is owned by the caller
// and make the constructor private. // and must outlive the DirectiveHandler.
virtual void Initialize(const AudioManager::DecodeSamplesCallback& decode_cb, virtual void Start(WhispernetClient* whispernet_client);
const AudioManager::EncodeTokenCallback& encode_cb);
// Adds a directive to handle. // Adds a directive to handle.
virtual void AddDirective(const copresence::Directive& directive); virtual void AddDirective(const Directive& directive);
// Removes any directives associated with the given operation id. // Removes any directives associated with the given operation id.
virtual void RemoveDirectives(const std::string& op_id); virtual void RemoveDirectives(const std::string& op_id);
const std::string GetCurrentAudioToken(AudioType type) const; virtual const std::string GetCurrentAudioToken(AudioType type) const;
private: private:
// Starts actually running a directive.
void StartDirective(const std::string& op_id, const Directive& directive);
// Forwards the request to encode a token to whispernet,
// and instructs it to call samples_callback when encoding is complete.
void EncodeToken(
const std::string& token,
AudioType type,
const WhispernetClient::SamplesCallback& samples_callback);
scoped_ptr<AudioDirectiveHandler> audio_handler_; scoped_ptr<AudioDirectiveHandler> audio_handler_;
std::map<std::string, std::vector<Directive>> pending_directives_;
// Belongs to the caller.
WhispernetClient* whispernet_client_;
DISALLOW_COPY_AND_ASSIGN(DirectiveHandler); DISALLOW_COPY_AND_ASSIGN(DirectiveHandler);
}; };
......
...@@ -29,13 +29,14 @@ class CopresenceDelegate { ...@@ -29,13 +29,14 @@ class CopresenceDelegate {
public: public:
virtual ~CopresenceDelegate() {} virtual ~CopresenceDelegate() {}
// This method will be called when we have subscribed messages that need to // This method will be called when we have subscribed messages
// be sent to their respective apps. // that need to be sent to their respective apps.
virtual void HandleMessages( virtual void HandleMessages(
const std::string& app_id, const std::string& app_id,
const std::string& subscription_id, const std::string& subscription_id,
const std::vector<Message>& message) = 0; const std::vector<Message>& message) = 0;
// Thw URLRequestContextGetter must outlive the CopresenceManager.
virtual net::URLRequestContextGetter* GetRequestContext() const = 0; virtual net::URLRequestContextGetter* GetRequestContext() const = 0;
virtual const std::string GetPlatformVersionString() const = 0; virtual const std::string GetPlatformVersionString() const = 0;
...@@ -43,6 +44,7 @@ class CopresenceDelegate { ...@@ -43,6 +44,7 @@ class CopresenceDelegate {
virtual const std::string GetAPIKey(const std::string& app_id) const = 0; virtual const std::string GetAPIKey(const std::string& app_id) const = 0;
virtual const std::string GetAuthToken() const = 0; virtual const std::string GetAuthToken() const = 0;
// Thw WhispernetClient must outlive the CopresenceManager.
virtual WhispernetClient* GetWhispernetClient() = 0; virtual WhispernetClient* GetWhispernetClient() = 0;
}; };
......
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#include "components/copresence/proto/rpcs.pb.h" #include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/copresence_constants.h" #include "components/copresence/public/copresence_constants.h"
#include "components/copresence/public/copresence_delegate.h" #include "components/copresence/public/copresence_delegate.h"
#include "components/copresence/public/whispernet_client.h"
#include "components/copresence/rpc/http_post.h"
#include "net/http/http_status_code.h" #include "net/http/http_status_code.h"
// TODO(ckehoe): Return error messages for bad requests. // TODO(ckehoe): Return error messages for bad requests.
...@@ -150,20 +152,26 @@ void AddTokenToRequest(const AudioToken& token, ReportRequest* request) { ...@@ -150,20 +152,26 @@ void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
// Public methods // Public methods
RpcHandler::RpcHandler(CopresenceDelegate* delegate) RpcHandler::RpcHandler(CopresenceDelegate* delegate,
DirectiveHandler* directive_handler)
: delegate_(delegate), : delegate_(delegate),
directive_handler_(directive_handler),
invalid_audio_token_cache_( invalid_audio_token_cache_(
base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
kMaxInvalidTokens), kMaxInvalidTokens),
server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
base::Unretained(this))) {} base::Unretained(this))) {
DCHECK(delegate_);
DCHECK(directive_handler_);
}
RpcHandler::~RpcHandler() { RpcHandler::~RpcHandler() {
for (HttpPost* post : pending_posts_) { for (HttpPost* post : pending_posts_) {
delete post; delete post;
} }
if (delegate_ && delegate_->GetWhispernetClient()) { if (delegate_->GetWhispernetClient()) {
// TODO(ckehoe): Use CancelableCallbacks instead.
delegate_->GetWhispernetClient()->RegisterTokensCallback( delegate_->GetWhispernetClient()->RegisterTokensCallback(
WhispernetClient::TokensCallback()); WhispernetClient::TokensCallback());
delegate_->GetWhispernetClient()->RegisterSamplesCallback( delegate_->GetWhispernetClient()->RegisterSamplesCallback(
...@@ -277,27 +285,6 @@ void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) { ...@@ -277,27 +285,6 @@ void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
} }
} }
void RpcHandler::ConnectToWhispernet() {
// Check if we are already connected.
if (directive_handler_)
return;
WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
// |directive_handler_| will be destructed with us, so unretained is safe.
directive_handler_.reset(new DirectiveHandler);
directive_handler_->Initialize(
base::Bind(&WhispernetClient::DecodeSamples,
base::Unretained(whispernet_client)),
base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
base::Unretained(this)));
whispernet_client->RegisterTokensCallback(
base::Bind(&RpcHandler::ReportTokens,
// On destruction, this callback will be disconnected.
base::Unretained(this)));
}
// Private methods // Private methods
void RpcHandler::RegisterResponseHandler( void RpcHandler::RegisterResponseHandler(
...@@ -383,12 +370,8 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, ...@@ -383,12 +370,8 @@ void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
response.update_signals_response(); response.update_signals_response();
DispatchMessages(update_response.message()); DispatchMessages(update_response.message());
if (directive_handler_.get()) { for (const Directive& directive : update_response.directive())
for (const Directive& directive : update_response.directive()) directive_handler_->AddDirective(directive);
directive_handler_->AddDirective(directive);
} else {
DVLOG(1) << "No directive handler.";
}
for (const Token& token : update_response.token()) { for (const Token& token : update_response.token()) {
switch (token.status()) { switch (token.status()) {
...@@ -433,9 +416,6 @@ void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) { ...@@ -433,9 +416,6 @@ void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
} }
void RpcHandler::AddPlayingTokens(ReportRequest* request) { void RpcHandler::AddPlayingTokens(ReportRequest* request) {
if (!directive_handler_)
return;
const std::string& audible_token = const std::string& audible_token =
directive_handler_->GetCurrentAudioToken(AUDIBLE); directive_handler_->GetCurrentAudioToken(AUDIBLE);
const std::string& inaudible_token = const std::string& inaudible_token =
...@@ -472,6 +452,8 @@ void RpcHandler::DispatchMessages( ...@@ -472,6 +452,8 @@ void RpcHandler::DispatchMessages(
} }
} }
// TODO(ckehoe): Pass in the version string and
// group this with the local functions up top.
RequestHeader* RpcHandler::CreateRequestHeader( RequestHeader* RpcHandler::CreateRequestHeader(
const std::string& client_name, const std::string& client_name,
const std::string& device_id) const { const std::string& device_id) const {
...@@ -538,16 +520,4 @@ void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, ...@@ -538,16 +520,4 @@ void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
pending_posts_.insert(http_post); pending_posts_.insert(http_post);
} }
void RpcHandler::AudioDirectiveListToWhispernetConnector(
const std::string& token,
AudioType type,
const WhispernetClient::SamplesCallback& samples_callback) {
DCHECK(type == AUDIBLE || type == INAUDIBLE);
WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
if (whispernet_client) {
whispernet_client->RegisterSamplesCallback(samples_callback);
whispernet_client->EncodeToken(token, type);
}
}
} // namespace copresence } // namespace copresence
...@@ -9,17 +9,18 @@ ...@@ -9,17 +9,18 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "base/callback.h" #include "base/callback_forward.h"
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "components/copresence/proto/enums.pb.h" #include "components/copresence/proto/enums.pb.h"
#include "components/copresence/public/copresence_delegate.h" #include "components/copresence/public/copresence_delegate.h"
#include "components/copresence/public/whispernet_client.h"
#include "components/copresence/rpc/http_post.h"
#include "components/copresence/timed_map.h" #include "components/copresence/timed_map.h"
namespace copresence { namespace copresence {
struct AudioToken;
class CopresenceDelegate;
class DirectiveHandler; class DirectiveHandler;
class HttpPost;
class ReportRequest; class ReportRequest;
class RequestHeader; class RequestHeader;
class SubscribedMessage; class SubscribedMessage;
...@@ -33,9 +34,9 @@ class RpcHandler { ...@@ -33,9 +34,9 @@ class RpcHandler {
// Report rpc name to send to Apiary. // Report rpc name to send to Apiary.
static const char kReportRequestRpcName[]; static const char kReportRequestRpcName[];
// Constructor. |delegate| is owned by the caller, // Constructor. |delegate| and |directive_handler|
// and must be valid as long as the RpcHandler exists. // are owned by the caller and must outlive the RpcHandler.
explicit RpcHandler(CopresenceDelegate* delegate); RpcHandler(CopresenceDelegate* delegate, DirectiveHandler* directive_handler);
virtual ~RpcHandler(); virtual ~RpcHandler();
...@@ -64,10 +65,6 @@ class RpcHandler { ...@@ -64,10 +65,6 @@ class RpcHandler {
// Uses all active auth tokens (if any). // Uses all active auth tokens (if any).
void ReportTokens(const std::vector<AudioToken>& tokens); void ReportTokens(const std::vector<AudioToken>& tokens);
// Create the directive handler and connect it to
// the whispernet client specified by the delegate.
void ConnectToWhispernet();
private: private:
// An HttpPost::ResponseCallback along with an HttpPost object to be deleted. // An HttpPost::ResponseCallback along with an HttpPost object to be deleted.
// Arguments: // Arguments:
...@@ -85,7 +82,7 @@ class RpcHandler { ...@@ -85,7 +82,7 @@ class RpcHandler {
// string: The auth token to pass with the request. // string: The auth token to pass with the request.
// MessageLite: Contents of POST request to be sent. This needs to be // MessageLite: Contents of POST request to be sent. This needs to be
// a (scoped) pointer to ease handling of the abstract MessageLite class. // a (scoped) pointer to ease handling of the abstract MessageLite class.
// ResponseCallback: Receives the response to the request. // PostCleanupCallback: Receives the response to the request.
typedef base::Callback<void(net::URLRequestContextGetter*, typedef base::Callback<void(net::URLRequestContextGetter*,
const std::string&, const std::string&,
const std::string&, const std::string&,
...@@ -140,19 +137,16 @@ class RpcHandler { ...@@ -140,19 +137,16 @@ class RpcHandler {
scoped_ptr<google::protobuf::MessageLite> request_proto, scoped_ptr<google::protobuf::MessageLite> request_proto,
const PostCleanupCallback& callback); const PostCleanupCallback& callback);
// This method receives the request to encode a token and forwards it to // These belong to the caller.
// whispernet, setting the samples return callback to samples_callback. CopresenceDelegate* delegate_;
void AudioDirectiveListToWhispernetConnector( DirectiveHandler* directive_handler_;
const std::string& token,
AudioType type,
const WhispernetClient::SamplesCallback& samples_callback);
CopresenceDelegate* delegate_; // Belongs to the caller.
TimedMap<std::string, bool> invalid_audio_token_cache_; TimedMap<std::string, bool> invalid_audio_token_cache_;
// TODO(ckehoe): Allow passing this into the constructor for testing.
PostCallback server_post_callback_; PostCallback server_post_callback_;
std::map<std::string, std::string> device_id_by_auth_token_; std::map<std::string, std::string> device_id_by_auth_token_;
scoped_ptr<DirectiveHandler> directive_handler_;
std::set<HttpPost*> pending_posts_; std::set<HttpPost*> pending_posts_;
DISALLOW_COPY_AND_ASSIGN(RpcHandler); DISALLOW_COPY_AND_ASSIGN(RpcHandler);
......
...@@ -18,11 +18,15 @@ ...@@ -18,11 +18,15 @@
#include "components/copresence/proto/enums.pb.h" #include "components/copresence/proto/enums.pb.h"
#include "components/copresence/proto/rpcs.pb.h" #include "components/copresence/proto/rpcs.pb.h"
#include "net/http/http_status_code.h" #include "net/http/http_status_code.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gmock/include/gmock/gmock.h"
using google::protobuf::MessageLite; using google::protobuf::MessageLite;
using google::protobuf::RepeatedPtrField; using google::protobuf::RepeatedPtrField;
using testing::Property;
using testing::SizeIs;
using testing::ElementsAre;
namespace copresence { namespace copresence {
namespace { namespace {
...@@ -48,8 +52,8 @@ class FakeDirectiveHandler : public DirectiveHandler { ...@@ -48,8 +52,8 @@ class FakeDirectiveHandler : public DirectiveHandler {
return added_directives_; return added_directives_;
} }
void Initialize(const AudioManager::DecodeSamplesCallback& decode_cb, void Start(WhispernetClient* whispernet_client) override {
const AudioManager::EncodeTokenCallback& encode_cb) override { NOTREACHED();
} }
void AddDirective(const Directive& directive) override { void AddDirective(const Directive& directive) override {
...@@ -57,7 +61,11 @@ class FakeDirectiveHandler : public DirectiveHandler { ...@@ -57,7 +61,11 @@ class FakeDirectiveHandler : public DirectiveHandler {
} }
void RemoveDirectives(const std::string& op_id) override { void RemoveDirectives(const std::string& op_id) override {
// TODO(ckehoe): Add a parallel implementation when prod has one. NOTREACHED();
}
const std::string GetCurrentAudioToken(AudioType type) const override {
return type == AUDIBLE ? "current audible" : "current inaudible";
} }
private: private:
...@@ -70,7 +78,7 @@ class FakeDirectiveHandler : public DirectiveHandler { ...@@ -70,7 +78,7 @@ class FakeDirectiveHandler : public DirectiveHandler {
class RpcHandlerTest : public testing::Test, public CopresenceDelegate { class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
public: public:
RpcHandlerTest() : rpc_handler_(this), status_(SUCCESS) { RpcHandlerTest() : rpc_handler_(this, &directive_handler_), status_(SUCCESS) {
rpc_handler_.server_post_callback_ = rpc_handler_.server_post_callback_ =
base::Bind(&RpcHandlerTest::CaptureHttpPost, base::Unretained(this)); base::Bind(&RpcHandlerTest::CaptureHttpPost, base::Unretained(this));
} }
...@@ -112,12 +120,6 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate { ...@@ -112,12 +120,6 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
response); response);
} }
FakeDirectiveHandler* InstallFakeDirectiveHandler() {
FakeDirectiveHandler* handler = new FakeDirectiveHandler;
rpc_handler_.directive_handler_.reset(handler);
return handler;
}
void SetDeviceIdAndAuthToken(const std::string& device_id, void SetDeviceIdAndAuthToken(const std::string& device_id,
const std::string& auth_token) { const std::string& auth_token) {
rpc_handler_.device_id_by_auth_token_[auth_token] = device_id; rpc_handler_.device_id_by_auth_token_[auth_token] = device_id;
...@@ -135,6 +137,7 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate { ...@@ -135,6 +137,7 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
// For rpc_handler_.invalid_audio_token_cache_ // For rpc_handler_.invalid_audio_token_cache_
base::MessageLoop message_loop_; base::MessageLoop message_loop_;
FakeDirectiveHandler directive_handler_;
RpcHandler rpc_handler_; RpcHandler rpc_handler_;
CopresenceStatus status_; CopresenceStatus status_;
...@@ -166,7 +169,7 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate { ...@@ -166,7 +169,7 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
TEST_F(RpcHandlerTest, RegisterDevice) { TEST_F(RpcHandlerTest, RegisterDevice) {
EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("")); EXPECT_FALSE(rpc_handler_.IsRegisteredForToken(""));
rpc_handler_.RegisterForToken("", RpcHandler::SuccessCallback()); rpc_handler_.RegisterForToken("", RpcHandler::SuccessCallback());
EXPECT_EQ(1u, request_protos_.size()); EXPECT_THAT(request_protos_, SizeIs(1));
const RegisterDeviceRequest* registration = const RegisterDeviceRequest* registration =
static_cast<RegisterDeviceRequest*>(request_protos_[0]); static_cast<RegisterDeviceRequest*>(request_protos_[0]);
Identity identity = registration->device_identifiers().registrant(); Identity identity = registration->device_identifiers().registrant();
...@@ -175,7 +178,7 @@ TEST_F(RpcHandlerTest, RegisterDevice) { ...@@ -175,7 +178,7 @@ TEST_F(RpcHandlerTest, RegisterDevice) {
EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("abc")); EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("abc"));
rpc_handler_.RegisterForToken("abc", RpcHandler::SuccessCallback()); rpc_handler_.RegisterForToken("abc", RpcHandler::SuccessCallback());
EXPECT_EQ(2u, request_protos_.size()); EXPECT_THAT(request_protos_, SizeIs(2));
registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]); registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]);
EXPECT_FALSE(registration->has_device_identifiers()); EXPECT_FALSE(registration->has_device_identifiers());
} }
...@@ -206,8 +209,8 @@ TEST_F(RpcHandlerTest, CreateRequestHeader) { ...@@ -206,8 +209,8 @@ TEST_F(RpcHandlerTest, CreateRequestHeader) {
TEST_F(RpcHandlerTest, ReportTokens) { TEST_F(RpcHandlerTest, ReportTokens) {
std::vector<AudioToken> test_tokens; std::vector<AudioToken> test_tokens;
test_tokens.push_back(AudioToken("token 1", false)); test_tokens.push_back(AudioToken("token 1", false));
test_tokens.push_back(AudioToken("token 2", true)); test_tokens.push_back(AudioToken("token 2", false));
test_tokens.push_back(AudioToken("token 3", false)); test_tokens.push_back(AudioToken("token 3", true));
AddInvalidToken("token 2"); AddInvalidToken("token 2");
SetDeviceIdAndAuthToken("ReportTokens Device 1", ""); SetDeviceIdAndAuthToken("ReportTokens Device 1", "");
...@@ -216,13 +219,15 @@ TEST_F(RpcHandlerTest, ReportTokens) { ...@@ -216,13 +219,15 @@ TEST_F(RpcHandlerTest, ReportTokens) {
rpc_handler_.ReportTokens(test_tokens); rpc_handler_.ReportTokens(test_tokens);
EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_); EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_);
EXPECT_EQ(" API Key", api_key_); EXPECT_EQ(" API Key", api_key_);
EXPECT_EQ(2u, request_protos_.size()); EXPECT_THAT(request_protos_, SizeIs(2));
const ReportRequest* report = static_cast<ReportRequest*>(request_protos_[0]); const ReportRequest* report = static_cast<ReportRequest*>(request_protos_[0]);
RepeatedPtrField<TokenObservation> tokens_sent = RepeatedPtrField<TokenObservation> tokens_sent =
report->update_signals_request().token_observation(); report->update_signals_request().token_observation();
ASSERT_EQ(2, tokens_sent.size()); EXPECT_THAT(tokens_sent, ElementsAre(
EXPECT_EQ("token 1", tokens_sent.Get(0).token_id()); Property(&TokenObservation::token_id, "token 1"),
EXPECT_EQ("token 3", tokens_sent.Get(1).token_id()); Property(&TokenObservation::token_id, "token 3"),
Property(&TokenObservation::token_id, "current audible"),
Property(&TokenObservation::token_id, "current inaudible")));
} }
TEST_F(RpcHandlerTest, ReportResponseHandler) { TEST_F(RpcHandlerTest, ReportResponseHandler) {
...@@ -259,7 +264,6 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) { ...@@ -259,7 +264,6 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) {
update_response->add_directive()->set_subscription_id("Subscription 2"); update_response->add_directive()->set_subscription_id("Subscription 2");
messages_by_subscription_.clear(); messages_by_subscription_.clear();
FakeDirectiveHandler* directive_handler = InstallFakeDirectiveHandler();
std::string serialized_proto; std::string serialized_proto;
ASSERT_TRUE(test_response.SerializeToString(&serialized_proto)); ASSERT_TRUE(test_response.SerializeToString(&serialized_proto));
status_ = FAIL; status_ = FAIL;
...@@ -267,23 +271,18 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) { ...@@ -267,23 +271,18 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) {
EXPECT_EQ(SUCCESS, status_); EXPECT_EQ(SUCCESS, status_);
EXPECT_TRUE(TokenIsInvalid("bad token")); EXPECT_TRUE(TokenIsInvalid("bad token"));
ASSERT_EQ(2U, messages_by_subscription_.size());
ASSERT_EQ(2U, messages_by_subscription_["Subscription 1"].size()); EXPECT_THAT(messages_by_subscription_["Subscription 1"], ElementsAre(
ASSERT_EQ(2U, messages_by_subscription_["Subscription 2"].size()); Property(&Message::payload, "Message A"),
EXPECT_EQ("Message A", Property(&Message::payload, "Message C")));
messages_by_subscription_["Subscription 1"][0].payload());
EXPECT_EQ("Message B", EXPECT_THAT(messages_by_subscription_["Subscription 2"], ElementsAre(
messages_by_subscription_["Subscription 2"][0].payload()); Property(&Message::payload, "Message B"),
EXPECT_EQ("Message C", Property(&Message::payload, "Message C")));
messages_by_subscription_["Subscription 1"][1].payload());
EXPECT_EQ("Message C", EXPECT_THAT(directive_handler_.added_directives(), ElementsAre(
messages_by_subscription_["Subscription 2"][1].payload()); Property(&Directive::subscription_id, "Subscription 1"),
Property(&Directive::subscription_id, "Subscription 2")));
ASSERT_EQ(2U, directive_handler->added_directives().size());
EXPECT_EQ("Subscription 1",
directive_handler->added_directives()[0].subscription_id());
EXPECT_EQ("Subscription 2",
directive_handler->added_directives()[1].subscription_id());
} }
} // namespace copresence } // namespace copresence
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment