Commit df423457 authored by Alex Chau's avatar Alex Chau Committed by Commit Bot

[Nearby] Implement incoming payload NearbyConnections interface

chrome/services/sharing/*
- Introduced PayloadListener::GetFileForPayload for requesting
  file from browser process based on payload_id
- Introduced PayloadListener::OnPayloadReceived for passing incoming
  payload

chrome/browser/nearby_sharing/nearby_connections_manager*
- Browser side usage of the above APIs
  NearbyConnectionsManager stores the payload in a map, and can be
  retrieved via GetIncomingPayload
- NearbyConnectionsManager provides API to register base::FilePath for
  specific payload_id. payload_id are sent via Nearby Share specific
  introduction frames from NearbyDecoder.

Bug: 1076008
Change-Id: I177986dba2a66805085539d2b30a282ec948cf43
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2352792Reviewed-by: default avatarAlex Gough <ajgo@chromium.org>
Reviewed-by: default avatarRichard Knoll <knollr@chromium.org>
Reviewed-by: default avatarHimanshu Jaju <himanshujaju@chromium.org>
Commit-Queue: Alex Chau <alexchau@chromium.org>
Cr-Commit-Position: refs/heads/master@{#800127}
parent 18572d12
...@@ -78,6 +78,14 @@ void FakeNearbyConnectionsManager::RegisterPayloadStatusListener( ...@@ -78,6 +78,14 @@ void FakeNearbyConnectionsManager::RegisterPayloadStatusListener(
// TODO(alexchau): Implement. // TODO(alexchau): Implement.
} }
void FakeNearbyConnectionsManager::RegisterPayloadPath(
int64_t payload_id,
const base::FilePath& file_path,
ConnectionsCallback callback) {
DCHECK(!IsShutdown());
// TODO(alexchau): Implement.
}
FakeNearbyConnectionsManager::Payload* FakeNearbyConnectionsManager::Payload*
FakeNearbyConnectionsManager::GetIncomingPayload(int64_t payload_id) { FakeNearbyConnectionsManager::GetIncomingPayload(int64_t payload_id) {
DCHECK(!IsShutdown()); DCHECK(!IsShutdown());
......
...@@ -44,6 +44,9 @@ class FakeNearbyConnectionsManager ...@@ -44,6 +44,9 @@ class FakeNearbyConnectionsManager
PayloadStatusListener* listener) override; PayloadStatusListener* listener) override;
void RegisterPayloadStatusListener(int64_t payload_id, void RegisterPayloadStatusListener(int64_t payload_id,
PayloadStatusListener* listener) override; PayloadStatusListener* listener) override;
void RegisterPayloadPath(int64_t payload_id,
const base::FilePath& file_path,
ConnectionsCallback callback) override;
Payload* GetIncomingPayload(int64_t payload_id) override; Payload* GetIncomingPayload(int64_t payload_id) override;
void Cancel(int64_t payload_id) override; void Cancel(int64_t payload_id) override;
void ClearIncomingPayloads() override; void ClearIncomingPayloads() override;
......
...@@ -87,6 +87,13 @@ class MockNearbyConnections : public NearbyConnectionsMojom { ...@@ -87,6 +87,13 @@ class MockNearbyConnections : public NearbyConnectionsMojom {
(const std::string& endpoint_id, (const std::string& endpoint_id,
InitiateBandwidthUpgradeCallback callback), InitiateBandwidthUpgradeCallback callback),
(override)); (override));
MOCK_METHOD(void,
RegisterPayloadFile,
(int64_t payload_id,
base::File input_file,
base::File output_file,
RegisterPayloadFileCallback callback),
(override));
}; };
#endif // CHROME_BROWSER_NEARBY_SHARING_MOCK_NEARBY_CONNECTIONS_H_ #endif // CHROME_BROWSER_NEARBY_SHARING_MOCK_NEARBY_CONNECTIONS_H_
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <vector> #include <vector>
#include "base/callback.h" #include "base/callback.h"
#include "base/files/file_path.h"
#include "base/optional.h" #include "base/optional.h"
#include "chrome/browser/nearby_sharing/common/nearby_share_enums.h" #include "chrome/browser/nearby_sharing/common/nearby_share_enums.h"
#include "chrome/browser/nearby_sharing/nearby_connection.h" #include "chrome/browser/nearby_sharing/nearby_connection.h"
...@@ -114,6 +115,11 @@ class NearbyConnectionsManager { ...@@ -114,6 +115,11 @@ class NearbyConnectionsManager {
int64_t payload_id, int64_t payload_id,
PayloadStatusListener* listener) = 0; PayloadStatusListener* listener) = 0;
// Register a |file_path| for receiving incoming payload with |payload_id|.
virtual void RegisterPayloadPath(int64_t payload_id,
const base::FilePath& file_path,
ConnectionsCallback callback) = 0;
// Gets the payload associated with |payload_id| if available. // Gets the payload associated with |payload_id| if available.
virtual Payload* GetIncomingPayload(int64_t payload_id) = 0; virtual Payload* GetIncomingPayload(int64_t payload_id) = 0;
......
...@@ -4,7 +4,9 @@ ...@@ -4,7 +4,9 @@
#include "chrome/browser/nearby_sharing/nearby_connections_manager_impl.h" #include "chrome/browser/nearby_sharing/nearby_connections_manager_impl.h"
#include "base/files/file_util.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "base/task/post_task.h"
#include "base/unguessable_token.h" #include "base/unguessable_token.h"
#include "chrome/browser/nearby_sharing/logging/logging.h" #include "chrome/browser/nearby_sharing/logging/logging.h"
#include "chrome/services/sharing/public/mojom/nearby_connections_types.mojom.h" #include "chrome/services/sharing/public/mojom/nearby_connections_types.mojom.h"
...@@ -44,6 +46,17 @@ bool ShouldEnableWebRtc(bool is_advertising, ...@@ -44,6 +46,17 @@ bool ShouldEnableWebRtc(bool is_advertising,
return true; return true;
} }
InitializeFileResult CreateAndOpenFile(base::FilePath file_path) {
base::FilePath unique_path = base::GetUniquePath(file_path);
InitializeFileResult result;
result.output_file.Initialize(
unique_path,
base::File::Flags::FLAG_CREATE_ALWAYS | base::File::Flags::FLAG_WRITE);
result.input_file.Initialize(
unique_path, base::File::Flags::FLAG_OPEN | base::File::Flags::FLAG_READ);
return result;
}
} // namespace } // namespace
NearbyConnectionsManagerImpl::NearbyConnectionsManagerImpl( NearbyConnectionsManagerImpl::NearbyConnectionsManagerImpl(
...@@ -239,10 +252,38 @@ void NearbyConnectionsManagerImpl::RegisterPayloadStatusListener( ...@@ -239,10 +252,38 @@ void NearbyConnectionsManagerImpl::RegisterPayloadStatusListener(
payload_status_listeners_.insert_or_assign(payload_id, listener); payload_status_listeners_.insert_or_assign(payload_id, listener);
} }
void NearbyConnectionsManagerImpl::RegisterPayloadPath(
int64_t payload_id,
const base::FilePath& file_path,
ConnectionsCallback callback) {
if (!nearby_connections_)
return;
DCHECK(!file_path.empty());
base::ThreadPool::PostTaskAndReplyWithResult(
FROM_HERE, {base::MayBlock()},
base::BindOnce(&CreateAndOpenFile, file_path),
base::BindOnce(&NearbyConnectionsManagerImpl::OnFileInitialized,
weak_ptr_factory_.GetWeakPtr(), payload_id,
std::move(callback)));
}
void NearbyConnectionsManagerImpl::OnFileInitialized(
int64_t payload_id,
ConnectionsCallback callback,
InitializeFileResult result) {
nearby_connections_->RegisterPayloadFile(
payload_id, std::move(result.input_file), std::move(result.output_file),
std::move(callback));
}
NearbyConnectionsManagerImpl::Payload* NearbyConnectionsManagerImpl::Payload*
NearbyConnectionsManagerImpl::GetIncomingPayload(int64_t payload_id) { NearbyConnectionsManagerImpl::GetIncomingPayload(int64_t payload_id) {
// TOOD(crbug/1076008): Implement. auto it = incoming_payloads_.find(payload_id);
return nullptr; if (it == incoming_payloads_.end())
return nullptr;
return it->second.get();
} }
void NearbyConnectionsManagerImpl::Cancel(int64_t payload_id) { void NearbyConnectionsManagerImpl::Cancel(int64_t payload_id) {
...@@ -271,7 +312,7 @@ void NearbyConnectionsManagerImpl::Cancel(int64_t payload_id) { ...@@ -271,7 +312,7 @@ void NearbyConnectionsManagerImpl::Cancel(int64_t payload_id) {
} }
void NearbyConnectionsManagerImpl::ClearIncomingPayloads() { void NearbyConnectionsManagerImpl::ClearIncomingPayloads() {
// TOOD(crbug/1076008): Implement. incoming_payloads_.clear();
} }
base::Optional<std::vector<uint8_t>> base::Optional<std::vector<uint8_t>>
...@@ -455,27 +496,57 @@ void NearbyConnectionsManagerImpl::OnBandwidthChanged( ...@@ -455,27 +496,57 @@ void NearbyConnectionsManagerImpl::OnBandwidthChanged(
// TODO(crbug/1111458): Support TransferManager. // TODO(crbug/1111458): Support TransferManager.
} }
void NearbyConnectionsManagerImpl::OnPayloadReceived(
const std::string& endpoint_id,
PayloadPtr payload) {
auto result = incoming_payloads_.emplace(payload->id, std::move(payload));
DCHECK(result.second);
}
void NearbyConnectionsManagerImpl::OnPayloadTransferUpdate( void NearbyConnectionsManagerImpl::OnPayloadTransferUpdate(
const std::string& endpoint_id, const std::string& endpoint_id,
PayloadTransferUpdatePtr update) { PayloadTransferUpdatePtr update) {
// If this is a payload we've registered for, then forward its status to the // If this is a payload we've registered for, then forward its status to the
// PayloadStatusListener. We don't need to do anything more with the payload. // PayloadStatusListener. We don't need to do anything more with the payload.
auto it = payload_status_listeners_.find(update->payload_id); auto listener_it = payload_status_listeners_.find(update->payload_id);
if (it != payload_status_listeners_.end()) { if (listener_it != payload_status_listeners_.end()) {
PayloadStatusListener* listener = it->second; PayloadStatusListener* listener = listener_it->second;
switch (update->status) { switch (update->status) {
case PayloadStatus::kInProgress: case PayloadStatus::kInProgress:
break; break;
case PayloadStatus::kSuccess: case PayloadStatus::kSuccess:
case PayloadStatus::kCanceled: case PayloadStatus::kCanceled:
case PayloadStatus::kFailure: case PayloadStatus::kFailure:
payload_status_listeners_.erase(it); payload_status_listeners_.erase(listener_it);
break; break;
} }
listener->OnStatusUpdate(std::move(update)); listener->OnStatusUpdate(std::move(update));
return;
}
// If this is an incoming payload that we have not registered for, then we'll
// treat it as a control frame (eg. IntroductionFrame) and forward it to the
// associated NearbyConnection.
auto payload_it = incoming_payloads_.find(update->payload_id);
if (payload_it == incoming_payloads_.end())
return;
if (!payload_it->second->content->is_bytes()) {
NS_LOG(WARNING) << "Received unknown payload of file type. Cancelling.";
nearby_connections_->CancelPayload(payload_it->first, base::DoNothing());
return;
} }
// TOOD(crbug/1076008): Handle incoming payload transfer. if (update->status != PayloadStatus::kSuccess)
return;
auto connections_it = connections_.find(endpoint_id);
if (connections_it == connections_.end())
return;
NS_LOG(INFO) << "Writing incoming byte message to NearbyConnection.";
connections_it->second->WriteMessage(
payload_it->second->content->get_bytes()->bytes);
} }
bool NearbyConnectionsManagerImpl::BindNearbyConnections() { bool NearbyConnectionsManagerImpl::BindNearbyConnections() {
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "base/containers/flat_map.h" #include "base/containers/flat_map.h"
#include "base/containers/flat_set.h" #include "base/containers/flat_set.h"
#include "base/files/file.h"
#include "base/gtest_prod_util.h" #include "base/gtest_prod_util.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "chrome/browser/nearby_sharing/nearby_connection_impl.h" #include "chrome/browser/nearby_sharing/nearby_connection_impl.h"
...@@ -19,6 +20,11 @@ ...@@ -19,6 +20,11 @@
class Profile; class Profile;
struct InitializeFileResult {
base::File input_file;
base::File output_file;
};
// Concrete NearbyConnectionsManager implementation. // Concrete NearbyConnectionsManager implementation.
class NearbyConnectionsManagerImpl class NearbyConnectionsManagerImpl
: public NearbyConnectionsManager, : public NearbyConnectionsManager,
...@@ -56,6 +62,9 @@ class NearbyConnectionsManagerImpl ...@@ -56,6 +62,9 @@ class NearbyConnectionsManagerImpl
PayloadStatusListener* listener) override; PayloadStatusListener* listener) override;
void RegisterPayloadStatusListener(int64_t payload_id, void RegisterPayloadStatusListener(int64_t payload_id,
PayloadStatusListener* listener) override; PayloadStatusListener* listener) override;
void RegisterPayloadPath(int64_t payload_id,
const base::FilePath& file_path,
ConnectionsCallback callback) override;
Payload* GetIncomingPayload(int64_t payload_id) override; Payload* GetIncomingPayload(int64_t payload_id) override;
void Cancel(int64_t payload_id) override; void Cancel(int64_t payload_id) override;
void ClearIncomingPayloads() override; void ClearIncomingPayloads() override;
...@@ -112,6 +121,8 @@ class NearbyConnectionsManagerImpl ...@@ -112,6 +121,8 @@ class NearbyConnectionsManagerImpl
int32_t quality) override; int32_t quality) override;
// PayloadListener: // PayloadListener:
void OnPayloadReceived(const std::string& endpoint_id,
PayloadPtr payload) override;
void OnPayloadTransferUpdate(const std::string& endpoint_id, void OnPayloadTransferUpdate(const std::string& endpoint_id,
PayloadTransferUpdatePtr update) override; PayloadTransferUpdatePtr update) override;
...@@ -121,6 +132,10 @@ class NearbyConnectionsManagerImpl ...@@ -121,6 +132,10 @@ class NearbyConnectionsManagerImpl
bool BindNearbyConnections(); bool BindNearbyConnections();
void Reset(); void Reset();
void OnFileInitialized(int64_t payload_id,
ConnectionsCallback callback,
InitializeFileResult result);
NearbyProcessManager* process_manager_; NearbyProcessManager* process_manager_;
Profile* profile_; Profile* profile_;
IncomingConnectionListener* incoming_connection_listener_ = nullptr; IncomingConnectionListener* incoming_connection_listener_ = nullptr;
...@@ -136,6 +151,8 @@ class NearbyConnectionsManagerImpl ...@@ -136,6 +151,8 @@ class NearbyConnectionsManagerImpl
connections_; connections_;
// A map of payload_id to PayloadStatusListener*. // A map of payload_id to PayloadStatusListener*.
base::flat_map<int64_t, PayloadStatusListener*> payload_status_listeners_; base::flat_map<int64_t, PayloadStatusListener*> payload_status_listeners_;
// A map of payload_id to PayloadPtr.
base::flat_map<int64_t, PayloadPtr> incoming_payloads_;
ScopedObserver<NearbyProcessManager, NearbyProcessManager::Observer> ScopedObserver<NearbyProcessManager, NearbyProcessManager::Observer>
nearby_process_observer_{this}; nearby_process_observer_{this};
......
...@@ -10,7 +10,6 @@ source_set("sharing") { ...@@ -10,7 +10,6 @@ source_set("sharing") {
deps = [ deps = [
"nearby", "nearby",
"nearby/platform_v2",
"webrtc", "webrtc",
"//jingle:webrtc_glue", "//jingle:webrtc_glue",
"//third_party/webrtc_overrides:webrtc_component", "//third_party/webrtc_overrides:webrtc_component",
......
...@@ -8,10 +8,12 @@ source_set("nearby") { ...@@ -8,10 +8,12 @@ source_set("nearby") {
"nearby_connections.h", "nearby_connections.h",
"nearby_connections_conversions.cc", "nearby_connections_conversions.cc",
"nearby_connections_conversions.h", "nearby_connections_conversions.h",
"platform.cc",
] ]
public_deps = [ public_deps = [
"decoder", "decoder",
"platform_v2",
"//base", "//base",
"//chrome/services/sharing/public/cpp", "//chrome/services/sharing/public/cpp",
"//chrome/services/sharing/public/mojom", "//chrome/services/sharing/public/mojom",
......
...@@ -4,12 +4,13 @@ ...@@ -4,12 +4,13 @@
#include "chrome/services/sharing/nearby/nearby_connections.h" #include "chrome/services/sharing/nearby/nearby_connections.h"
#include "base/files/file_util.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "chrome/browser/nearby_sharing/logging/logging.h" #include "chrome/browser/nearby_sharing/logging/logging.h"
#include "chrome/services/sharing/nearby/nearby_connections_conversions.h" #include "chrome/services/sharing/nearby/nearby_connections_conversions.h"
#include "chrome/services/sharing/nearby/platform_v2/input_file.h"
#include "chrome/services/sharing/public/mojom/nearby_connections_types.mojom.h" #include "chrome/services/sharing/public/mojom/nearby_connections_types.mojom.h"
#include "third_party/nearby/src/cpp/platform_v2/public/file.h"
#include "third_party/nearby/src/cpp/platform_v2/public/single_thread_executor.h"
namespace location { namespace location {
namespace nearby { namespace nearby {
...@@ -283,19 +284,66 @@ void NearbyConnections::AcceptConnection( ...@@ -283,19 +284,66 @@ void NearbyConnections::AcceptConnection(
mojo::PendingRemote<mojom::PayloadListener> listener, mojo::PendingRemote<mojom::PayloadListener> listener,
AcceptConnectionCallback callback) { AcceptConnectionCallback callback) {
mojo::SharedRemote<mojom::PayloadListener> remote(std::move(listener)); mojo::SharedRemote<mojom::PayloadListener> remote(std::move(listener));
// Capturing Core* is safe as Core owns PayloadListener.
PayloadListener payload_listener = { PayloadListener payload_listener = {
.payload_progress_cb = [remote](const std::string& endpoint_id, .payload_cb =
const PayloadProgressInfo& info) { [remote, core = core_.get()](const std::string& endpoint_id,
if (!remote) Payload payload) {
return; if (!remote)
return;
DCHECK_GE(info.total_bytes, 0);
DCHECK_GE(info.bytes_transferred, 0); switch (payload.GetType()) {
remote->OnPayloadTransferUpdate( case Payload::Type::kBytes: {
endpoint_id, mojom::PayloadTransferUpdate::New( mojom::BytesPayloadPtr bytes_payload = mojom::BytesPayload::New(
info.payload_id, PayloadStatusToMojom(info.status), ByteArrayToMojom(payload.AsBytes()));
info.total_bytes, info.bytes_transferred)); remote->OnPayloadReceived(
}}; endpoint_id,
mojom::Payload::New(payload.GetId(),
mojom::PayloadContent::NewBytes(
std::move(bytes_payload))));
break;
}
case Payload::Type::kFile: {
DCHECK(payload.AsFile());
// InputFile is created by Chrome, so it's safe to downcast.
chrome::InputFile& input_file = static_cast<chrome::InputFile&>(
payload.AsFile()->GetInputStream());
base::File file = input_file.ExtractUnderlyingFile();
if (!file.IsValid()) {
core->CancelPayload(payload.GetId(), /*callback=*/{});
return;
}
mojom::FilePayloadPtr file_payload =
mojom::FilePayload::New(std::move(file));
remote->OnPayloadReceived(
endpoint_id,
mojom::Payload::New(payload.GetId(),
mojom::PayloadContent::NewFile(
std::move(file_payload))));
break;
}
case Payload::Type::kStream:
// Stream payload is not supported.
case Payload::Type::kUnknown:
core->CancelPayload(payload.GetId(), /*callback=*/{});
return;
}
},
.payload_progress_cb =
[remote](const std::string& endpoint_id,
const PayloadProgressInfo& info) {
if (!remote)
return;
DCHECK_GE(info.total_bytes, 0);
DCHECK_GE(info.bytes_transferred, 0);
remote->OnPayloadTransferUpdate(
endpoint_id,
mojom::PayloadTransferUpdate::New(
info.payload_id, PayloadStatusToMojom(info.status),
info.total_bytes, info.bytes_transferred));
}};
core_->AcceptConnection(endpoint_id, std::move(payload_listener), core_->AcceptConnection(endpoint_id, std::move(payload_listener),
ResultCallbackFromMojom(std::move(callback))); ResultCallbackFromMojom(std::move(callback)));
...@@ -320,8 +368,11 @@ void NearbyConnections::SendPayload( ...@@ -320,8 +368,11 @@ void NearbyConnections::SendPayload(
break; break;
case mojom::PayloadContent::Tag::FILE: case mojom::PayloadContent::Tag::FILE:
int64_t file_size = payload->content->get_file()->file.GetLength(); int64_t file_size = payload->content->get_file()->file.GetLength();
outgoing_file_map_.insert_or_assign( {
payload->id, std::move(payload->content->get_file()->file)); base::AutoLock al(input_file_lock_);
input_file_map_.insert_or_assign(
payload->id, std::move(payload->content->get_file()->file));
}
core_payload = std::make_unique<Payload>( core_payload = std::make_unique<Payload>(
payload->id, InputFile(payload->id, file_size)); payload->id, InputFile(payload->id, file_size));
break; break;
...@@ -348,16 +399,49 @@ void NearbyConnections::InitiateBandwidthUpgrade( ...@@ -348,16 +399,49 @@ void NearbyConnections::InitiateBandwidthUpgrade(
ResultCallbackFromMojom(std::move(callback))); ResultCallbackFromMojom(std::move(callback)));
} }
base::File NearbyConnections::ExtractFileForPayload(int64_t payload_id) { void NearbyConnections::RegisterPayloadFile(
auto file_it = outgoing_file_map_.find(payload_id); int64_t payload_id,
if (file_it == outgoing_file_map_.end()) base::File input_file,
base::File output_file,
RegisterPayloadFileCallback callback) {
if (!input_file.IsValid() || !output_file.IsValid()) {
std::move(callback).Run(mojom::Status::kError);
return;
}
{
base::AutoLock al(input_file_lock_);
input_file_map_.insert_or_assign(payload_id, std::move(input_file));
}
{
base::AutoLock al(output_file_lock_);
output_file_map_.insert_or_assign(payload_id, std::move(output_file));
}
std::move(callback).Run(mojom::Status::kSuccess);
}
base::File NearbyConnections::ExtractInputFile(int64_t payload_id) {
base::AutoLock al(input_file_lock_);
auto file_it = input_file_map_.find(payload_id);
if (file_it == input_file_map_.end())
return base::File(); return base::File();
base::File file = std::move(file_it->second); base::File file = std::move(file_it->second);
outgoing_file_map_.erase(file_it); input_file_map_.erase(file_it);
return file; return file;
}
base::File NearbyConnections::ExtractOutputFile(int64_t payload_id) {
base::AutoLock al(output_file_lock_);
auto file_it = output_file_map_.find(payload_id);
if (file_it == output_file_map_.end())
return base::File();
// TOOD(crbug/1076008): Handle incoming file payload. base::File file = std::move(file_it->second);
output_file_map_.erase(file_it);
return file;
} }
} // namespace connections } // namespace connections
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
#include "base/files/file.h" #include "base/files/file.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/synchronization/lock.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/thread_annotations.h"
#include "chrome/services/sharing/public/mojom/nearby_connections.mojom.h" #include "chrome/services/sharing/public/mojom/nearby_connections.mojom.h"
#include "chrome/services/sharing/public/mojom/webrtc_signaling_messenger.mojom.h" #include "chrome/services/sharing/public/mojom/webrtc_signaling_messenger.mojom.h"
#include "device/bluetooth/public/mojom/adapter.mojom.h" #include "device/bluetooth/public/mojom/adapter.mojom.h"
...@@ -97,9 +99,16 @@ class NearbyConnections : public mojom::NearbyConnections { ...@@ -97,9 +99,16 @@ class NearbyConnections : public mojom::NearbyConnections {
void InitiateBandwidthUpgrade( void InitiateBandwidthUpgrade(
const std::string& endpoint_id, const std::string& endpoint_id,
InitiateBandwidthUpgradeCallback callback) override; InitiateBandwidthUpgradeCallback callback) override;
void RegisterPayloadFile(int64_t payload_id,
base::File input_file,
base::File output_file,
RegisterPayloadFileCallback callback) override;
// Return the file associated with |payload_id|. // Returns the file associated with |payload_id| for InputFile.
base::File ExtractFileForPayload(int64_t payload_id); base::File ExtractInputFile(int64_t payload_id);
// Returns the file associated with |payload_id| for OutputFile.
base::File ExtractOutputFile(int64_t payload_id);
private: private:
void OnDisconnect(); void OnDisconnect();
...@@ -116,8 +125,21 @@ class NearbyConnections : public mojom::NearbyConnections { ...@@ -116,8 +125,21 @@ class NearbyConnections : public mojom::NearbyConnections {
mojo::SharedRemote<sharing::mojom::WebRtcSignalingMessenger> mojo::SharedRemote<sharing::mojom::WebRtcSignalingMessenger>
webrtc_signaling_messenger_; webrtc_signaling_messenger_;
// Core is thread-safe as its operations are always dispatched to a
// single-thread executor.
std::unique_ptr<Core> core_; std::unique_ptr<Core> core_;
base::flat_map<int64_t, base::File> outgoing_file_map_;
// input_file_map_ is accessed from background threads.
base::Lock input_file_lock_;
// A map of payload_id to file for InputFile.
base::flat_map<int64_t, base::File> input_file_map_
GUARDED_BY(input_file_lock_);
// output_file_map_ is accessed from background threads.
base::Lock output_file_lock_;
// A map of payload_id to file for OutputFile.
base::flat_map<int64_t, base::File> output_file_map_
GUARDED_BY(output_file_lock_);
base::WeakPtrFactory<NearbyConnections> weak_ptr_factory_{this}; base::WeakPtrFactory<NearbyConnections> weak_ptr_factory_{this};
}; };
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "chrome/services/sharing/nearby/platform_v2/input_file.h" #include "chrome/services/sharing/nearby/platform_v2/input_file.h"
#include "chrome/services/sharing/nearby/platform_v2/log_message.h" #include "chrome/services/sharing/nearby/platform_v2/log_message.h"
#include "chrome/services/sharing/nearby/platform_v2/mutex.h" #include "chrome/services/sharing/nearby/platform_v2/mutex.h"
#include "chrome/services/sharing/nearby/platform_v2/output_file.h"
#include "chrome/services/sharing/nearby/platform_v2/recursive_mutex.h" #include "chrome/services/sharing/nearby/platform_v2/recursive_mutex.h"
#include "chrome/services/sharing/nearby/platform_v2/scheduled_executor.h" #include "chrome/services/sharing/nearby/platform_v2/scheduled_executor.h"
#include "chrome/services/sharing/nearby/platform_v2/submittable_executor.h" #include "chrome/services/sharing/nearby/platform_v2/submittable_executor.h"
...@@ -42,13 +43,6 @@ namespace location { ...@@ -42,13 +43,6 @@ namespace location {
namespace nearby { namespace nearby {
namespace api { namespace api {
namespace {
std::string GetPayloadPath(std::int64_t payload_id) {
// TODO(alexchau): Get file path mapping from connections::NearbyConnections.
return std::string();
}
} // namespace
int GetCurrentTid() { int GetCurrentTid() {
// SubmittableExecutor and ScheduledExecutor does not own a thread pool // SubmittableExecutor and ScheduledExecutor does not own a thread pool
// directly nor manages threads, thus cannot support this debug feature. // directly nor manages threads, thus cannot support this debug feature.
...@@ -106,16 +100,15 @@ std::unique_ptr<InputFile> ImplementationPlatform::CreateInputFile( ...@@ -106,16 +100,15 @@ std::unique_ptr<InputFile> ImplementationPlatform::CreateInputFile(
std::int64_t payload_id, std::int64_t payload_id,
std::int64_t total_size) { std::int64_t total_size) {
auto& connections = connections::NearbyConnections::GetInstance(); auto& connections = connections::NearbyConnections::GetInstance();
auto file = connections.ExtractFileForPayload(payload_id); return std::make_unique<chrome::InputFile>(
if (!file.IsValid()) connections.ExtractInputFile(payload_id));
return nullptr;
return std::make_unique<chrome::InputFile>(std::move(file));
} }
std::unique_ptr<OutputFile> ImplementationPlatform::CreateOutputFile( std::unique_ptr<OutputFile> ImplementationPlatform::CreateOutputFile(
std::int64_t payload_id) { std::int64_t payload_id) {
return std::make_unique<shared::OutputFile>(GetPayloadPath(payload_id)); auto& connections = connections::NearbyConnections::GetInstance();
return std::make_unique<chrome::OutputFile>(
connections.ExtractOutputFile(payload_id));
} }
std::unique_ptr<LogMessage> ImplementationPlatform::CreateLogMessage( std::unique_ptr<LogMessage> ImplementationPlatform::CreateLogMessage(
......
...@@ -29,7 +29,8 @@ source_set("platform_v2") { ...@@ -29,7 +29,8 @@ source_set("platform_v2") {
"log_message.h", "log_message.h",
"mutex.cc", "mutex.cc",
"mutex.h", "mutex.h",
"platform.cc", "output_file.cc",
"output_file.h",
"recursive_mutex.cc", "recursive_mutex.cc",
"recursive_mutex.h", "recursive_mutex.h",
"scheduled_executor.cc", "scheduled_executor.cc",
...@@ -48,7 +49,6 @@ source_set("platform_v2") { ...@@ -48,7 +49,6 @@ source_set("platform_v2") {
deps = [ deps = [
"//base", "//base",
"//chrome/services/sharing/nearby",
"//chrome/services/sharing/public/mojom", "//chrome/services/sharing/public/mojom",
"//chrome/services/sharing/webrtc", "//chrome/services/sharing/webrtc",
"//crypto", "//crypto",
......
...@@ -10,10 +10,7 @@ namespace location { ...@@ -10,10 +10,7 @@ namespace location {
namespace nearby { namespace nearby {
namespace chrome { namespace chrome {
InputFile::InputFile(base::File file) : file_(std::move(file)) { InputFile::InputFile(base::File file) : file_(std::move(file)) {}
DCHECK(file_.IsValid());
seek_succeeded_ = file_.Seek(base::File::FROM_BEGIN, 0) == 0;
}
InputFile::~InputFile() = default; InputFile::~InputFile() = default;
...@@ -23,26 +20,36 @@ std::string InputFile::GetFilePath() const { ...@@ -23,26 +20,36 @@ std::string InputFile::GetFilePath() const {
} }
std::int64_t InputFile::GetTotalSize() const { std::int64_t InputFile::GetTotalSize() const {
if (!file_.IsValid())
return 0;
return file_.GetLength(); return file_.GetLength();
} }
ExceptionOr<ByteArray> InputFile::Read(std::int64_t size) { ExceptionOr<ByteArray> InputFile::Read(std::int64_t size) {
if (!seek_succeeded_) if (!file_.IsValid())
return Exception::kFailed; return Exception::kIo;
ByteArray bytes(size); ByteArray bytes(size);
int bytes_read = file_.ReadAtCurrentPos(bytes.data(), bytes.size()); int bytes_read = file_.ReadAtCurrentPos(bytes.data(), bytes.size());
if (bytes_read != size) if (bytes_read != size)
return Exception::kFailed; return Exception::kIo;
return ExceptionOr<ByteArray>(std::move(bytes)); return ExceptionOr<ByteArray>(std::move(bytes));
} }
Exception InputFile::Close() { Exception InputFile::Close() {
if (!file_.IsValid())
return {Exception::kIo};
file_.Close(); file_.Close();
return {Exception::kSuccess}; return {Exception::kSuccess};
} }
base::File InputFile::ExtractUnderlyingFile() {
return std::move(file_);
}
} // namespace chrome } // namespace chrome
} // namespace nearby } // namespace nearby
} // namespace location } // namespace location
...@@ -27,10 +27,12 @@ class InputFile : public api::InputFile { ...@@ -27,10 +27,12 @@ class InputFile : public api::InputFile {
ExceptionOr<ByteArray> Read(std::int64_t size) override; ExceptionOr<ByteArray> Read(std::int64_t size) override;
Exception Close() override; Exception Close() override;
// Extract the underlying base::File.
base::File ExtractUnderlyingFile();
private: private:
// File::GetLength is not const but api::InputFile::GetTotalSize is const. // File::GetLength is not const but api::InputFile::GetTotalSize is const.
mutable base::File file_; mutable base::File file_;
bool seek_succeeded_;
}; };
} // namespace chrome } // namespace chrome
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/services/sharing/nearby/platform_v2/output_file.h"
#include "base/numerics/safe_conversions.h"
namespace location {
namespace nearby {
namespace chrome {
OutputFile::OutputFile(base::File file) : file_(std::move(file)) {}
OutputFile::~OutputFile() = default;
Exception OutputFile::Write(const ByteArray& data) {
if (!file_.IsValid())
return {Exception::kIo};
int bytes_written = file_.WriteAtCurrentPos(data.data(), data.size());
if (bytes_written != base::checked_cast<int>(data.size()))
return {Exception::kIo};
return {Exception::kSuccess};
}
Exception OutputFile::Flush() {
if (!file_.IsValid())
return {Exception::kIo};
if (!file_.Flush())
return {Exception::kIo};
return {Exception::kSuccess};
}
Exception OutputFile::Close() {
if (!file_.IsValid())
return {Exception::kIo};
file_.Close();
return {Exception::kSuccess};
}
} // namespace chrome
} // namespace nearby
} // namespace location
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROME_SERVICES_SHARING_NEARBY_PLATFORM_V2_OUTPUT_FILE_H_
#define CHROME_SERVICES_SHARING_NEARBY_PLATFORM_V2_OUTPUT_FILE_H_
#include "base/files/file.h"
#include "third_party/nearby/src/cpp/platform_v2/api/output_file.h"
namespace location {
namespace nearby {
namespace chrome {
// Concrete OutputFile implementation.
class OutputFile : public api::OutputFile {
public:
explicit OutputFile(base::File file);
~OutputFile() override;
OutputFile(const OutputFile&) = delete;
OutputFile& operator=(const OutputFile&) = delete;
// api::OutputFile:
Exception Write(const ByteArray& data) override;
Exception Flush() override;
Exception Close() override;
private:
base::File file_;
};
} // namespace chrome
} // namespace nearby
} // namespace location
#endif // CHROME_SERVICES_SHARING_NEARBY_PLATFORM_V2_OUTPUT_FILE_H_
...@@ -8,6 +8,7 @@ import "chrome/services/sharing/public/mojom/nearby_connections_types.mojom"; ...@@ -8,6 +8,7 @@ import "chrome/services/sharing/public/mojom/nearby_connections_types.mojom";
import "chrome/services/sharing/public/mojom/webrtc_signaling_messenger.mojom"; import "chrome/services/sharing/public/mojom/webrtc_signaling_messenger.mojom";
import "chrome/services/sharing/public/mojom/webrtc.mojom"; import "chrome/services/sharing/public/mojom/webrtc.mojom";
import "device/bluetooth/public/mojom/adapter.mojom"; import "device/bluetooth/public/mojom/adapter.mojom";
import "mojo/public/mojom/base/file.mojom";
import "services/network/public/mojom/p2p.mojom"; import "services/network/public/mojom/p2p.mojom";
import "services/network/public/mojom/mdns_responder.mojom"; import "services/network/public/mojom/mdns_responder.mojom";
...@@ -95,6 +96,16 @@ interface ConnectionLifecycleListener { ...@@ -95,6 +96,16 @@ interface ConnectionLifecycleListener {
// utiltiiy process, and is used by the browser process to listen for payload // utiltiiy process, and is used by the browser process to listen for payload
// status associated with remote endpoints. // status associated with remote endpoints.
interface PayloadListener { interface PayloadListener {
// Called when a Payload is received from a remote endpoint. Depending on the
// type of the Payload, all of the data may or may not have been received at
// the time of this call. Use OnPayloadTransferUpdate() to get updates on the
// status of the data received.
//
// endpoint_id - The identifier for the remote endpoint that sent the
// payload.
// payload - The Payload object received.
OnPayloadReceived(string endpoint_id, Payload payload);
// Called with progress information about an active Payload transfer, either // Called with progress information about an active Payload transfer, either
// incoming or outgoing. // incoming or outgoing.
// //
...@@ -254,9 +265,9 @@ interface NearbyConnections { ...@@ -254,9 +265,9 @@ interface NearbyConnections {
// Cancels a Payload currently in-flight to or from remote endpoint(s). // Cancels a Payload currently in-flight to or from remote endpoint(s).
// //
// payload_id - The identifier for the Payload to be canceled. // payload_id - The identifier for the Payload to be cancelled.
// Possible return values include: // Possible return values include:
// Status::kSuccess if the payload got canceled. // Status::kSuccess if the payload got cancelled.
CancelPayload(int64 payload_id) => (Status status); CancelPayload(int64 payload_id) => (Status status);
// Disconnects from, and removes all traces of, all connected and/or // Disconnects from, and removes all traces of, all connected and/or
...@@ -278,6 +289,17 @@ interface NearbyConnections { ...@@ -278,6 +289,17 @@ interface NearbyConnections {
// Possible return values include: // Possible return values include:
// Status::kSuccess if upgraded successfully. // Status::kSuccess if upgraded successfully.
InitiateBandwidthUpgrade(string endpoint_id) => (Status status); InitiateBandwidthUpgrade(string endpoint_id) => (Status status);
// Register a pair of input and output file for handling incoming file
// payload associated with |payload_id_|, which is determined by feature
// specific design. The |input_file| should be opened for read access, and
// |output_file| should be opened for write access.
// Possible return values include:
// Status::kSuccess if file is registered successfully.
// Status::kError if file is not opened correctly.
RegisterPayloadFile(int64 payload_id, mojo_base.mojom.File input_file,
mojo_base.mojom.File output_file)
=> (Status status);
}; };
// Provide all the dependencies that NearbyConnections library requires. // Provide all the dependencies that NearbyConnections library requires.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment