Commit f43db666 authored by sammc's avatar sammc Committed by Commit bot

Change data pipe wrappers used by SerialConnection to use message pipe.

Interprocess mojo data pipe is not ready yet. This converts the data
pipe wrappers to send the data via their control message pipes until
interprocess data pipe is ready.

BUG=389016

Review URL: https://codereview.chromium.org/646063003

Cr-Commit-Position: refs/heads/master@{#301582}
parent 0dfe9ba2
This diff is collapsed.
......@@ -5,7 +5,10 @@
#ifndef DEVICE_SERIAL_DATA_RECEIVER_H_
#define DEVICE_SERIAL_DATA_RECEIVER_H_
#include <queue>
#include "base/callback.h"
#include "base/memory/linked_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "device/serial/buffer.h"
......@@ -14,8 +17,6 @@
namespace device {
class AsyncWaiter;
// A DataReceiver receives data from a DataSource.
class DataReceiver : public base::RefCounted<DataReceiver>,
public serial::DataSourceClient,
......@@ -24,9 +25,9 @@ class DataReceiver : public base::RefCounted<DataReceiver>,
typedef base::Callback<void(scoped_ptr<ReadOnlyBuffer>)> ReceiveDataCallback;
typedef base::Callback<void(int32_t error)> ReceiveErrorCallback;
// Constructs a DataReceiver to receive data from |source|, using a data
// pipe with a buffer size of |buffer_size|, with connection errors reported
// as |fatal_error_value|.
// Constructs a DataReceiver to receive data from |source|, using a buffer
// size of |buffer_size|, with connection errors reported as
// |fatal_error_value|.
DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
uint32_t buffer_size,
int32_t fatal_error_value);
......@@ -40,14 +41,16 @@ class DataReceiver : public base::RefCounted<DataReceiver>,
private:
class PendingReceive;
struct PendingError;
struct DataFrame;
friend class base::RefCounted<DataReceiver>;
~DataReceiver() override;
// serial::DataSourceClient override. Invoked by the DataSource to report
// errors.
void OnError(uint32_t bytes_since_last_error, int32_t error) override;
// serial::DataSourceClient overrides.
// Invoked by the DataSource to report errors.
void OnError(int32_t error) override;
// Invoked by the DataSource transmit data.
void OnData(mojo::Array<uint8_t> data) override;
// mojo::ErrorHandler override. Calls ShutDown().
void OnConnectionError() override;
......@@ -56,10 +59,6 @@ class DataReceiver : public base::RefCounted<DataReceiver>,
// receive buffer, having read |bytes_read| bytes from it.
void Done(uint32_t bytes_read);
// Invoked when |handle_| is ready to read. Unless an error has occurred, this
// calls ReceiveInternal().
void OnDoneWaiting(MojoResult result);
// The implementation of Receive(). If a |pending_error_| is ready to
// dispatch, it does so. Otherwise, this attempts to read from |handle_| and
// dispatches the contents to |pending_receive_|. If |handle_| is not ready,
......@@ -67,13 +66,6 @@ class DataReceiver : public base::RefCounted<DataReceiver>,
// |handle_|, ShutDown() is called.
void ReceiveInternal();
// We may have been notified of an error that occurred at some future point in
// the stream. We should never be able to read past the point at which the
// error occurred until we have dealt with the error and called Resume() on
// the DataSource. If this has occurred, something bad has happened on the
// service side, so we shut down.
bool CheckErrorNotInReadRange(uint32_t num_bytes);
// Called when we encounter a fatal error. If a receive is in progress,
// |fatal_error_value_| will be reported to the user.
void ShutDown();
......@@ -81,26 +73,18 @@ class DataReceiver : public base::RefCounted<DataReceiver>,
// The control connection to the data source.
mojo::InterfacePtr<serial::DataSource> source_;
// The data connection to the data source.
mojo::ScopedDataPipeConsumerHandle handle_;
// The error value to report in the event of a fatal error.
const int32_t fatal_error_value_;
// The number of bytes received from the data source.
uint32_t bytes_received_;
// Whether we have encountered a fatal error and shut down.
bool shut_down_;
// A waiter used to wait until |handle_| is readable if we are waiting.
scoped_ptr<AsyncWaiter> waiter_;
// The current pending receive operation if there is one.
scoped_ptr<PendingReceive> pending_receive_;
// The current pending error if there is one.
scoped_ptr<PendingError> pending_error_;
// The queue of pending data frames (data or an error) received from the
// DataSource.
std::queue<linked_ptr<DataFrame>> pending_data_frames_;
base::WeakPtrFactory<DataReceiver> weak_factory_;
......
......@@ -4,9 +4,10 @@
#include "device/serial/data_sender.h"
#include <algorithm>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"
namespace device {
......@@ -34,10 +35,8 @@ class DataSender::PendingSend {
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
// Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
// if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
// or the error if one is encountered writing to |handle|.
MojoResult SendData(mojo::DataPipeProducerHandle handle);
// Attempts to send any data not yet sent to |sink|.
bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
private:
// Invoked to update |bytes_acked_| and |num_bytes|.
......@@ -55,7 +54,7 @@ class DataSender::PendingSend {
// The error value to report when DispatchFatalError() is called.
const int32_t fatal_error_value_;
// The number of bytes sent to the data pipe.
// The number of bytes sent to the DataSink.
uint32_t bytes_sent_;
// The number of bytes acked.
......@@ -67,17 +66,11 @@ DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
int32_t fatal_error_value)
: sink_(sink.Pass()),
fatal_error_value_(fatal_error_value),
available_buffer_capacity_(buffer_size),
shut_down_(false) {
sink_.set_error_handler(this);
MojoCreateDataPipeOptions options = {
sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
};
options.struct_size = sizeof(options);
mojo::ScopedDataPipeConsumerHandle remote_handle;
MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
DCHECK_EQ(MOJO_RESULT_OK, result);
sink_->Init(remote_handle.Pass());
sink_.set_client(this);
sink_->Init(buffer_size);
}
DataSender::~DataSender() {
......@@ -115,6 +108,7 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
if (shut_down_)
return;
available_buffer_capacity_ += bytes_sent;
while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
sends_awaiting_ack_.pop();
......@@ -133,27 +127,29 @@ void DataSender::ReportBytesSent(uint32_t bytes_sent) {
}
if (pending_sends_.empty() && sends_awaiting_ack_.empty())
RunCancelCallback();
SendInternal();
}
void DataSender::ReportBytesSentAndError(
uint32_t bytes_sent,
int32_t error,
const mojo::Callback<void(uint32_t)>& callback) {
const mojo::Callback<void()>& callback) {
if (shut_down_)
return;
uint32_t bytes_to_flush = 0;
available_buffer_capacity_ += bytes_sent;
while (!sends_awaiting_ack_.empty()) {
bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
&bytes_sent, error);
available_buffer_capacity_ +=
sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
error);
sends_awaiting_ack_.pop();
}
while (!pending_sends_.empty()) {
bytes_to_flush +=
available_buffer_capacity_ +=
pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
pending_sends_.pop();
}
callback.Run(bytes_to_flush);
callback.Run();
RunCancelCallback();
}
......@@ -162,33 +158,15 @@ void DataSender::OnConnectionError() {
}
void DataSender::SendInternal() {
while (!pending_sends_.empty()) {
MojoResult result = pending_sends_.front()->SendData(handle_.get());
if (result == MOJO_RESULT_OK) {
while (!pending_sends_.empty() && available_buffer_capacity_) {
if (pending_sends_.front()->SendData(sink_.get(),
&available_buffer_capacity_)) {
sends_awaiting_ack_.push(pending_sends_.front());
pending_sends_.pop();
} else if (result == MOJO_RESULT_SHOULD_WAIT) {
waiter_.reset(new AsyncWaiter(
handle_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
return;
} else {
ShutDown();
return;
}
}
}
void DataSender::OnDoneWaiting(MojoResult result) {
waiter_.reset();
if (result != MOJO_RESULT_OK) {
ShutDown();
return;
}
SendInternal();
}
void DataSender::RunCancelCallback() {
DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
if (pending_cancel_.is_null())
......@@ -200,7 +178,6 @@ void DataSender::RunCancelCallback() {
}
void DataSender::ShutDown() {
waiter_.reset();
shut_down_ = true;
while (!pending_sends_.empty()) {
pending_sends_.front()->DispatchFatalError();
......@@ -253,18 +230,17 @@ void DataSender::PendingSend::DispatchFatalError() {
FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
}
MojoResult DataSender::PendingSend::SendData(
mojo::DataPipeProducerHandle handle) {
uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
MojoResult result = mojo::WriteDataRaw(handle,
data_.data() + bytes_sent_,
&bytes_to_send,
MOJO_WRITE_DATA_FLAG_NONE);
if (result != MOJO_RESULT_OK)
return result;
bytes_sent_ += bytes_to_send;
return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
bool DataSender::PendingSend::SendData(serial::DataSink* sink,
uint32_t* available_buffer_size) {
uint32_t num_bytes_to_send =
std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
*available_buffer_size);
mojo::Array<uint8_t> bytes(num_bytes_to_send);
memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
bytes_sent_ += num_bytes_to_send;
*available_buffer_size -= num_bytes_to_send;
sink->OnData(bytes.Pass());
return bytes_sent_ == data_.size();
}
void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
......
......@@ -16,8 +16,6 @@
namespace device {
class AsyncWaiter;
// A DataSender sends data to a DataSink.
class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler {
public:
......@@ -26,14 +24,13 @@ class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler {
SendErrorCallback;
typedef base::Callback<void()> CancelCallback;
// Constructs a DataSender to send data to |sink|, using a data pipe with a
// buffer size of |buffer_size|, with connection errors reported as
// |fatal_error_value|.
// Constructs a DataSender to send data to |sink|, using a buffer size of
// |buffer_size|, with connection errors reported as |fatal_error_value|.
DataSender(mojo::InterfacePtr<serial::DataSink> sink,
uint32_t buffer_size,
int32_t fatal_error_value);
~DataSender() override;
~DataSender();
// Starts an asynchronous send of |data|. If the send completes successfully,
// |callback| will be called. Otherwise, |error_callback| will be called with
......@@ -54,22 +51,19 @@ class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler {
// serial::DataSinkClient overrides.
void ReportBytesSent(uint32_t bytes_sent) override;
void ReportBytesSentAndError(
uint32_t bytes_sent,
int32_t error,
const mojo::Callback<void(uint32_t)>& callback) override;
void ReportBytesSentAndError(uint32_t bytes_sent,
int32_t error,
const mojo::Callback<void()>& callback) override;
// mojo::ErrorHandler override.
void OnConnectionError() override;
// Copies data from |pending_sends_| into the data pipe and starts |waiter_|
// waiting if the pipe is full. When a PendingSend in |pending_sends_| has
// been fully copied into the data pipe, it moves to |sends_awaiting_ack_|.
// Sends up to |available_buffer_capacity_| bytes of data from
// |pending_sends_| to |sink_|. When a PendingSend in |pending_sends_| has
// been fully copied transmitted to |sink_|, it moves to
// |sends_awaiting_ack_|.
void SendInternal();
// Invoked when |handle_| is ready for writes. Calls SendInternal().
void OnDoneWaiting(MojoResult result);
// Dispatches a cancel callback if one is pending.
void RunCancelCallback();
......@@ -80,27 +74,23 @@ class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler {
// The control connection to the data sink.
mojo::InterfacePtr<serial::DataSink> sink_;
// The data connection to the data sink.
mojo::ScopedDataPipeProducerHandle handle_;
// The error value to report in the event of a fatal error.
const int32_t fatal_error_value_;
// A waiter used to wait until |handle_| is writable if we are waiting.
scoped_ptr<AsyncWaiter> waiter_;
// A queue of PendingSend that have not yet been fully written to the data
// pipe.
// A queue of PendingSend that have not yet been fully sent to |sink_|.
std::queue<linked_ptr<PendingSend> > pending_sends_;
// A queue of PendingSend that have been written to the data pipe, but have
// not yet been acked by the DataSink.
// A queue of PendingSend that have been sent to |sink_|, but have not yet
// been acked by the DataSink.
std::queue<linked_ptr<PendingSend> > sends_awaiting_ack_;
// The callback to report cancel completion if a cancel operation is in
// progress.
CancelCallback pending_cancel_;
// The number of bytes available for buffering in the DataSink.
uint32_t available_buffer_capacity_;
// Whether we have encountered a fatal error and shut down.
bool shut_down_;
......
This diff is collapsed.
......@@ -17,8 +17,6 @@
namespace device {
class AsyncWaiter;
class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>,
public mojo::InterfaceImpl<serial::DataSink> {
public:
......@@ -43,21 +41,19 @@ class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>,
private:
class Buffer;
class PendingFlush;
class DataFrame;
friend class base::RefCounted<DataSinkReceiver>;
~DataSinkReceiver() override;
// mojo::InterfaceImpl<serial::DataSink> overrides.
void Init(mojo::ScopedDataPipeConsumerHandle handle) override;
void Init(uint32_t buffer_size) override;
void Cancel(int32_t error) override;
void OnData(mojo::Array<uint8_t> data) override;
void OnConnectionError() override;
// Starts waiting for |handle_| to be ready for reads.
void StartWaiting();
// Invoked when |handle_| is ready for reads.
void OnDoneWaiting(MojoResult result);
// Dispatches data to |ready_callback_|.
void RunReadyCallback();
// Reports a successful read of |bytes_read|.
void Done(uint32_t bytes_read);
......@@ -66,23 +62,20 @@ class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>,
// with an error of |error|.
void DoneWithError(uint32_t bytes_read, int32_t error);
// Finishes the two-phase data pipe read.
// Marks |bytes_read| bytes as being read.
bool DoneInternal(uint32_t bytes_read);
// Sends an ReportBytesSentAndError message to the client.
void ReportBytesSentAndError(uint32_t bytes_read, int32_t error);
// Invoked in response to an ReportBytesSentAndError call to the client with
// the number of bytes to flush.
void SetNumBytesToFlush(uint32_t bytes_to_flush);
// Invoked in response to an ReportBytesSentAndError call to the client at
// the point in the data stream to flush.
void DoFlush();
// Reports a fatal error to the client and shuts down.
void DispatchFatalError();
// The data connection to the data sender.
mojo::ScopedDataPipeConsumerHandle handle_;
// The callback to call when |handle_| has data ready to read.
// The callback to call when there is data ready to read.
const ReadyCallback ready_callback_;
// The callback to call when the client has requested cancellation.
......@@ -91,16 +84,23 @@ class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>,
// The callback to call if a fatal error occurs.
const ErrorCallback error_callback_;
// The queue of pending flushes.
std::queue<linked_ptr<PendingFlush> > pending_flushes_;
// A waiter used to wait until |handle_| is readable if we are waiting.
scoped_ptr<AsyncWaiter> waiter_;
// Whether we are waiting for a flush.
bool flush_pending_;
// The buffer passed to |ready_callback_| if one exists. This is not owned,
// but the Buffer will call Done or DoneWithError before being deleted.
Buffer* buffer_in_use_;
// Whether this has received an Init() call from the client.
bool initialized_;
// The remaining number of bytes of data that we can buffer.
uint32_t available_buffer_capacity_;
// The data we have received from the client that has not been passed to
// |ready_callback_|.
std::queue<linked_ptr<DataFrame>> pending_data_buffers_;
// Whether we have encountered a fatal error and shut down.
bool shut_down_;
......
......@@ -4,11 +4,11 @@
#include "device/serial/data_source_sender.h"
#include <algorithm>
#include <limits>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"
namespace device {
......@@ -17,9 +17,9 @@ class DataSourceSender::PendingSend {
public:
PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
// Asynchronously fills |data| with up to |num_bytes| of data. Following this,
// one of Done() and DoneWithError() will be called with the result.
void GetData(void* data, uint32_t num_bytes);
// Asynchronously fills |data_| with up to |num_bytes| of data. Following
// this, one of Done() and DoneWithError() will be called with the result.
void GetData(uint32_t num_bytes);
private:
class Buffer;
......@@ -39,9 +39,12 @@ class DataSourceSender::PendingSend {
// Whether the buffer specified by GetData() has been passed to |callback_|,
// but has not yet called Done() or DoneWithError().
bool buffer_in_use_;
// The data obtained using |callback_| to be dispatched to the client.
std::vector<char> data_;
};
// A Writable implementation that provides a view of a data pipe owned by a
// A Writable implementation that provides a view of a buffer owned by a
// DataSourceSender.
class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
public:
......@@ -58,7 +61,7 @@ class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
void DoneWithError(uint32_t bytes_written, int32_t error) override;
private:
// The DataSourceSender whose data pipe we are providing a view.
// The DataSourceSender of whose buffer we are providing a view.
scoped_refptr<DataSourceSender> sender_;
// The PendingSend to which this buffer has been created in response.
......@@ -72,14 +75,15 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
const ErrorCallback& error_callback)
: ready_callback_(ready_callback),
error_callback_(error_callback),
bytes_sent_(0),
shut_down_(false) {
available_buffer_capacity_(0),
paused_(false),
shut_down_(false),
weak_factory_(this) {
DCHECK(!ready_callback.is_null() && !error_callback.is_null());
}
void DataSourceSender::ShutDown() {
shut_down_ = true;
waiter_.reset();
ready_callback_.Reset();
error_callback_.Reset();
}
......@@ -88,84 +92,70 @@ DataSourceSender::~DataSourceSender() {
DCHECK(shut_down_);
}
void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
// This should never occur. |handle_| is only valid and |pending_send_| is
// only set after Init is called.
if (pending_send_ || handle_.is_valid() || shut_down_) {
DispatchFatalError();
return;
}
handle_ = handle.Pass();
pending_send_.reset(new PendingSend(this, ready_callback_));
StartWaiting();
void DataSourceSender::Init(uint32_t buffer_size) {
available_buffer_capacity_ = buffer_size;
GetMoreData();
}
void DataSourceSender::Resume() {
if (pending_send_ || !handle_.is_valid()) {
if (pending_send_) {
DispatchFatalError();
return;
}
pending_send_.reset(new PendingSend(this, ready_callback_));
StartWaiting();
paused_ = false;
GetMoreData();
}
void DataSourceSender::OnConnectionError() {
DispatchFatalError();
void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent) {
available_buffer_capacity_ += bytes_sent;
if (!pending_send_ && !paused_)
GetMoreData();
}
void DataSourceSender::StartWaiting() {
DCHECK(pending_send_ && !waiter_);
waiter_.reset(
new AsyncWaiter(handle_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&DataSourceSender::OnDoneWaiting, this)));
void DataSourceSender::OnConnectionError() {
DispatchFatalError();
}
void DataSourceSender::OnDoneWaiting(MojoResult result) {
DCHECK(pending_send_ && !shut_down_ && waiter_);
waiter_.reset();
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
void DataSourceSender::GetMoreData() {
if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
return;
}
void* data = NULL;
uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
result = mojo::BeginWriteDataRaw(
handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
return;
}
pending_send_->GetData(static_cast<char*>(data), num_bytes);
pending_send_.reset(new PendingSend(this, ready_callback_));
pending_send_->GetData(available_buffer_capacity_);
}
void DataSourceSender::Done(uint32_t bytes_written) {
DoneInternal(bytes_written);
if (!shut_down_)
StartWaiting();
void DataSourceSender::Done(const std::vector<char>& data) {
DoneInternal(data);
if (!shut_down_ && available_buffer_capacity_) {
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
}
}
void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
DoneInternal(bytes_written);
pending_send_.reset();
void DataSourceSender::DoneWithError(const std::vector<char>& data,
int32_t error) {
DoneInternal(data);
if (!shut_down_)
client()->OnError(bytes_sent_, error);
// We don't call StartWaiting here so we don't send any additional data until
client()->OnError(error);
paused_ = true;
// We don't call GetMoreData here so we don't send any additional data until
// Resume() is called.
}
void DataSourceSender::DoneInternal(uint32_t bytes_written) {
void DataSourceSender::DoneInternal(const std::vector<char>& data) {
DCHECK(pending_send_);
if (shut_down_)
return;
bytes_sent_ += bytes_written;
MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
if (result != MOJO_RESULT_OK) {
DispatchFatalError();
return;
available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
if (!data.empty()) {
mojo::Array<uint8_t> data_to_send(data.size());
std::copy(data.begin(), data.end(), &data_to_send[0]);
client()->OnData(data_to_send.Pass());
}
pending_send_.reset();
}
void DataSourceSender::DispatchFatalError() {
......@@ -181,24 +171,30 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
: sender_(sender), callback_(callback), buffer_in_use_(false) {
}
void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
DCHECK(num_bytes);
DCHECK(!buffer_in_use_);
buffer_in_use_ = true;
data_.resize(num_bytes);
callback_.Run(scoped_ptr<WritableBuffer>(
new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
new Buffer(sender_, this, &data_[0], num_bytes)));
}
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
DCHECK(buffer_in_use_);
DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
sender_->Done(bytes_written);
data_.resize(bytes_written);
sender_->Done(data_);
}
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
int32_t error) {
DCHECK(buffer_in_use_);
DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
sender_->DoneWithError(bytes_written, error);
data_.resize(bytes_written);
sender_->DoneWithError(data_, error);
}
DataSourceSender::PendingSend::Buffer::Buffer(
......@@ -213,7 +209,7 @@ DataSourceSender::PendingSend::Buffer::Buffer(
}
DataSourceSender::PendingSend::Buffer::~Buffer() {
if (sender_.get())
if (pending_send_)
pending_send_->Done(0);
}
......@@ -227,22 +223,20 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
DCHECK(sender_.get());
pending_send_->Done(bytes_written);
sender_ = NULL;
pending_send_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
PendingSend* send = pending_send_;
pending_send_ = nullptr;
send->Done(bytes_written);
sender_ = nullptr;
}
void DataSourceSender::PendingSend::Buffer::DoneWithError(
uint32_t bytes_written,
int32_t error) {
DCHECK(sender_.get());
pending_send_->DoneWithError(bytes_written, error);
sender_ = NULL;
pending_send_ = NULL;
buffer_ = NULL;
buffer_size_ = 0;
PendingSend* send = pending_send_;
pending_send_ = nullptr;
send->DoneWithError(bytes_written, error);
sender_ = nullptr;
}
} // namespace device
......@@ -5,17 +5,19 @@
#ifndef DEVICE_SERIAL_DATA_SOURCE_SENDER_H_
#define DEVICE_SERIAL_DATA_SOURCE_SENDER_H_
#include <vector>
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "device/serial/buffer.h"
#include "device/serial/data_stream.mojom.h"
#include "mojo/public/cpp/system/data_pipe.h"
namespace device {
class AsyncWaiter;
// A DataSourceSender is an interface between a source of data and a data pipe.
// A DataSourceSender is an interface between a source of data and a
// DataSourceClient.
class DataSourceSender : public base::RefCounted<DataSourceSender>,
public mojo::InterfaceImpl<serial::DataSource> {
public:
......@@ -41,34 +43,29 @@ class DataSourceSender : public base::RefCounted<DataSourceSender>,
~DataSourceSender() override;
// mojo::InterfaceImpl<serial::DataSourceSender> overrides.
void Init(mojo::ScopedDataPipeProducerHandle handle) override;
void Init(uint32_t buffer_size) override;
void Resume() override;
void ReportBytesReceived(uint32_t bytes_sent) override;
// Invoked in the event of a connection error. Calls DispatchFatalError().
void OnConnectionError() override;
// Starts waiting for |handle_| to be ready for writes.
void StartWaiting();
// Invoked when |handle_| is ready for writes.
void OnDoneWaiting(MojoResult result);
// Gets more data to send to the DataSourceClient.
void GetMoreData();
// Reports a successful write of |bytes_written|.
void Done(uint32_t bytes_written);
// Invoked to pass |data| obtained in response to |ready_callback_|.
void Done(const std::vector<char>& data);
// Reports a partially successful or unsuccessful write of |bytes_written|
// with an error of |error|.
void DoneWithError(uint32_t bytes_written, int32_t error);
// Invoked to pass |data| and |error| obtained in response to
// |ready_callback_|.
void DoneWithError(const std::vector<char>& data, int32_t error);
// Finishes the two-phase data pipe write.
void DoneInternal(uint32_t bytes_written);
// Dispatches |data| to the client.
void DoneInternal(const std::vector<char>& data);
// Reports a fatal error to the client and shuts down.
void DispatchFatalError();
// The data connection to the data receiver.
mojo::ScopedDataPipeProducerHandle handle_;
// The callback to call when |handle_| is ready for more data.
// The callback to call when the client is ready for more data.
ReadyCallback ready_callback_;
// The callback to call if a fatal error occurs.
......@@ -77,15 +74,17 @@ class DataSourceSender : public base::RefCounted<DataSourceSender>,
// The current pending send operation if there is one.
scoped_ptr<PendingSend> pending_send_;
// A waiter used to wait until |handle_| is writable if we are waiting.
scoped_ptr<AsyncWaiter> waiter_;
// The number of bytes available for buffering in the client.
uint32_t available_buffer_capacity_;
// The number of bytes sent to the data receiver.
uint32_t bytes_sent_;
// Whether sending is paused due to an error.
bool paused_;
// Whether we have encountered a fatal error and shut down.
bool shut_down_;
base::WeakPtrFactory<DataSourceSender> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(DataSourceSender);
};
......
......@@ -6,30 +6,39 @@ module device.serial {
[Client=DataSourceClient]
interface DataSource {
// Initializes this DataSource with a data pipe handle to use for data
// transmission.
Init(handle<data_pipe_producer> producer_handle);
// Initializes this DataSource with the amount of data its client will
// buffer.
Init(uint32 buffer_size);
// Resumes sending data after it has been stopped due to an error.
Resume();
// Reports that |bytes_sent| bytes have been successfully passed to the
// client.
ReportBytesReceived(uint32 bytes_sent);
};
interface DataSourceClient {
// Invoked to report |error| from the DataSource, at |error_location| bytes
// into the data stream. No further bytes beyond |error_location| will be
// Invoked to report |error| from the DataSource. No further bytes will be
// transmitted from the DataSource until Resume() is called.
OnError(uint32 error_location, int32 error);
OnError(int32 error);
// Invoked to transmit data from the DataSource.
OnData(array<uint8> data);
};
[Client=DataSinkClient]
interface DataSink {
// Initializes this DataSink with a data pipe handle to use for data
// transmission.
Init(handle<data_pipe_consumer> consumer_handle);
// Initializes this DataSink with the amount of data it is expected to
// buffer.
Init(uint32 buffer_size);
// Requests the cancellation of any data that has been written to the pipe,
// but has not yet been sent to the sink.
Cancel(int32 error);
// Invoked to pass |data| to the sink.
OnData(array<uint8> data);
};
interface DataSinkClient {
......@@ -37,10 +46,10 @@ interface DataSinkClient {
ReportBytesSent(uint32 bytes_sent);
// Reports that the sink has received |bytes_sent| bytes of data (possibly 0)
// and encountered an error: |error|. The client should respond with
// |bytes_to_flush|, the number of bytes enqueued in the data pipe but not yet
// acked so the correct number of bytes can be flushed from the pipe.
ReportBytesSentAndError(uint32 bytes_sent, int32 error) => (uint32 bytes_to_flush);
// and encountered an error: |error|. Any OnData messages received by the
// DataSink before the response will be discarded. The client should respond
// when it is ready to resume sending data.
ReportBytesSentAndError(uint32 bytes_sent, int32 error) => ();
};
}
......@@ -11,11 +11,11 @@ struct SerializedDataSender {
// The control channel to the DataSink to which this DataSender sends data.
DataSink sink;
// The data pipe this DataSender uses to send data to the DataSink.
handle<data_pipe_producer> data_pipe;
// The error to report for sends in progress when a fatal error occurs.
int32 fatal_error_value;
// The size of the send buffer.
uint32 buffer_size;
};
// A pending receive error.
......@@ -33,9 +33,6 @@ struct SerializedDataReceiver {
// data.
DataSource source;
// The data pipe this DataReceiver uses to receive data from the DataSource.
handle<data_pipe_consumer> data_pipe;
// The error to report for a receive in progress when a fatal error occurs.
int32 fatal_error_value;
......@@ -48,6 +45,8 @@ struct SerializedDataReceiver {
// The pending receive error if there is one.
PendingReceiveError? pending_error;
array<array<uint8>> pending_data;
};
}
......@@ -269,13 +269,18 @@ TEST_F(SerialConnectionTest, Flush) {
EXPECT_EQ(1, io_handler_->flushes());
}
TEST_F(SerialConnectionTest, Disconnect) {
TEST_F(SerialConnectionTest, DisconnectWithSend) {
connection_.reset();
io_handler_->set_send_callback(base::Bind(base::DoNothing));
ASSERT_NO_FATAL_FAILURE(Send("data"));
WaitForEvent(EVENT_SEND_ERROR);
EXPECT_EQ(serial::SEND_ERROR_DISCONNECTED, send_error_);
EXPECT_EQ(0, bytes_sent_);
EXPECT_TRUE(io_handler_->HasOneRef());
}
TEST_F(SerialConnectionTest, DisconnectWithReceive) {
connection_.reset();
ASSERT_NO_FATAL_FAILURE(Receive());
WaitForEvent(EVENT_RECEIVE_ERROR);
EXPECT_EQ(serial::RECEIVE_ERROR_DISCONNECTED, receive_error_);
......
......@@ -63,11 +63,6 @@ class DataReceiverTest : public ApiTestBase {
error = error_to_send_.front();
error_to_send_.pop();
}
if (error == 2) {
sender_->ShutDown();
sender_ = NULL;
return;
}
DCHECK(buffer->GetSize() >= static_cast<uint32_t>(data.size()));
memcpy(buffer->GetData(), data.c_str(), data.size());
if (error)
......@@ -136,9 +131,4 @@ TEST_F(DataReceiverTest, SerializeAfterClose) {
RunTest("data_receiver_unittest.js", "testSerializeAfterClose");
}
TEST_F(DataReceiverTest, SourceShutdown) {
error_to_send_.push(2);
RunTest("data_receiver_unittest.js", "testSourceShutdown");
}
} // namespace extensions
......@@ -3,12 +3,11 @@
// found in the LICENSE file.
define('data_receiver', [
'async_waiter',
'device/serial/data_stream.mojom',
'device/serial/data_stream_serialization.mojom',
'mojo/public/js/bindings/core',
'mojo/public/js/bindings/router',
], function(asyncWaiter, dataStream, serialization, core, router) {
], function(dataStream, serialization, core, router) {
/**
* @module data_receiver
*/
......@@ -65,8 +64,8 @@ define('data_receiver', [
* @param {!PendingReceiveError} error The error to dispatch.
* @param {number} bytesReceived The number of bytes that have been received.
*/
PendingReceive.prototype.dispatchError = function(error, bytesReceived) {
if (bytesReceived != error.offset)
PendingReceive.prototype.dispatchError = function(error) {
if (error.queuePosition > 0)
return false;
var e = new Error();
......@@ -88,22 +87,15 @@ define('data_receiver', [
/**
* A DataReceiver that receives data from a DataSource.
* @param {!MojoHandle} handle The handle to the DataSource.
* @param {number} bufferSize How large a buffer the data pipe should use.
* @param {number} bufferSize How large a buffer to use.
* @param {number} fatalErrorValue The receive error value to report in the
* event of a fatal error.
* @constructor
* @alias module:data_receiver.DataReceiver
*/
function DataReceiver(handle, bufferSize, fatalErrorValue) {
var dataPipeOptions = {
flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
elementNumBytes: 1,
capacityNumBytes: bufferSize,
};
var receivePipe = core.createDataPipe(dataPipeOptions);
this.init_(
handle, receivePipe.consumerHandle, fatalErrorValue, 0, null, false);
this.source_.init(receivePipe.producerHandle);
this.init_(handle, fatalErrorValue, 0, null, [], false);
this.source_.init(bufferSize);
}
DataReceiver.prototype =
......@@ -117,8 +109,6 @@ define('data_receiver', [
return;
this.shutDown_ = true;
this.router_.close();
this.waiter_.stop();
core.close(this.receivePipe_);
if (this.receive_) {
this.receive_.dispatchFatalError(this.fatalErrorValue_);
this.receive_ = null;
......@@ -128,21 +118,21 @@ define('data_receiver', [
/**
* Initialize this DataReceiver.
* @param {!MojoHandle} source A handle to the DataSource
* @param {!MojoHandle} dataPipe A handle to use for receiving data from the
* DataSource.
* @param {number} fatalErrorValue The error to dispatch in the event of a
* fatal error.
* @param {number} bytesReceived The number of bytes already received.
* @param {PendingReceiveError} pendingError The pending error if there is
* one.
* one.
* @param {!Array.<!ArrayBuffer>} pendingData Data received from the
* DataSource not yet requested by the client.
* @param {boolean} paused Whether the DataSource is paused.
* @private
*/
DataReceiver.prototype.init_ = function(source,
dataPipe,
fatalErrorValue,
bytesReceived,
pendingError,
pendingData,
paused) {
/**
* The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
......@@ -156,11 +146,6 @@ define('data_receiver', [
*/
this.source_ = new dataStream.DataSource.proxyClass(this.router_);
this.router_.setIncomingReceiver(this);
/**
* The handle to the data pipe to use for receiving data.
* @private
*/
this.receivePipe_ = dataPipe;
/**
* The current receive operation.
* @type {module:data_receiver~PendingReceive}
......@@ -173,22 +158,6 @@ define('data_receiver', [
* @private
*/
this.fatalErrorValue_ = fatalErrorValue;
/**
* The async waiter used to wait for
* |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| to
* be readable.
* @type {!module:async_waiter.AsyncWaiter}
* @private
*/
this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_,
core.HANDLE_SIGNAL_READABLE,
this.onHandleReady_.bind(this));
/**
* The number of bytes received from the DataSource.
* @type {number}
* @private
*/
this.bytesReceived_ = bytesReceived;
/**
* The pending error if there is one.
* @type {PendingReceiveError}
......@@ -201,6 +170,13 @@ define('data_receiver', [
* @private
*/
this.paused_ = paused;
/**
* A queue of data that has been received from the DataSource, but not
* consumed by the client.
* @type {module:data_receiver~PendingData[]}
* @private
*/
this.pendingDataBuffers_ = pendingData;
/**
* Whether this DataReceiver has shut down.
* @type {boolean}
......@@ -220,18 +196,19 @@ define('data_receiver', [
if (this.shutDown_)
return Promise.resolve(null);
this.waiter_.stop();
if (this.receive_) {
this.receive_.dispatchFatalError(this.fatalErrorValue_);
this.receive_ = null;
}
var serialized = new serialization.SerializedDataReceiver();
serialized.source = this.router_.connector_.handle_;
serialized.data_pipe = this.receivePipe_;
serialized.fatal_error_value = this.fatalErrorValue_;
serialized.bytes_received = this.bytesReceived_;
serialized.paused = this.paused_;
serialized.pending_error = this.pendingError_;
serialized.pending_data = [];
$Array.forEach(this.pendingDataBuffers_, function(buffer) {
serialized.pending_data.push(new Uint8Array(buffer));
});
this.router_.connector_.handle_ = null;
this.router_.close();
this.shutDown_ = true;
......@@ -259,11 +236,17 @@ define('data_receiver', [
this.shutDown_ = true;
return;
}
var pendingData = [];
$Array.forEach(serialized.pending_data, function(data) {
var buffer = new Uint8Array(data.length);
buffer.set(data);
pendingData.push(buffer.buffer);
});
this.init_(serialized.source,
serialized.data_pipe,
serialized.fatal_error_value,
serialized.bytes_received,
serialized.pending_error,
pendingData,
serialized.paused);
};
......@@ -283,7 +266,7 @@ define('data_receiver', [
var receive = new PendingReceive();
var promise = receive.getPromise();
if (this.pendingError_ &&
receive.dispatchError(this.pendingError_, this.bytesReceived_)) {
receive.dispatchError(this.pendingError_)) {
this.pendingError_ = null;
this.paused_ = true;
return promise;
......@@ -293,32 +276,22 @@ define('data_receiver', [
this.paused_ = false;
}
this.receive_ = receive;
this.waiter_.start();
this.dispatchData_();
return promise;
};
/**
* Invoked when
* |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| is
* ready to read. Reads from the data pipe if the wait is successful.
* @param {number} waitResult The result of the asynchronous wait.
* @private
*/
DataReceiver.prototype.onHandleReady_ = function(waitResult) {
if (waitResult != core.RESULT_OK || !this.receive_) {
DataReceiver.prototype.dispatchData_ = function() {
if (!this.receive_) {
this.close();
return;
}
var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE);
if (result.result == core.RESULT_OK) {
// TODO(sammc): Handle overflow in the same fashion as the C++ receiver.
this.bytesReceived_ += result.buffer.byteLength;
this.receive_.dispatchData(result.buffer);
if (this.pendingDataBuffers_.length) {
this.receive_.dispatchData(this.pendingDataBuffers_[0]);
this.source_.reportBytesReceived(this.pendingDataBuffers_[0].byteLength);
this.receive_ = null;
} else if (result.result == core.RESULT_SHOULD_WAIT) {
this.waiter_.start();
} else {
this.close();
this.pendingDataBuffers_.shift();
if (this.pendingError_)
this.pendingError_.queuePosition--;
}
};
......@@ -328,22 +301,29 @@ define('data_receiver', [
* @param {number} error The error that occurred.
* @private
*/
DataReceiver.prototype.onError = function(offset, error) {
DataReceiver.prototype.onError = function(error) {
if (this.shutDown_)
return;
var pendingError = new serialization.PendingReceiveError();
pendingError.error = error;
pendingError.offset = offset;
if (this.receive_ &&
this.receive_.dispatchError(pendingError, this.bytesReceived_)) {
pendingError.queuePosition = this.pendingDataBuffers_.length;
if (this.receive_ && this.receive_.dispatchError(pendingError)) {
this.receive_ = null;
this.waiter_.stop();
this.paused_ = true;
return;
}
this.pendingError_ = pendingError;
};
DataReceiver.prototype.onData = function(data) {
var buffer = new ArrayBuffer(data.length);
var uintView = new Uint8Array(buffer);
uintView.set(data);
this.pendingDataBuffers_.push(buffer);
if (this.receive_)
this.dispatchData_();
};
return {DataReceiver: DataReceiver};
});
......@@ -175,11 +175,4 @@ unittestBindings.exportTests([
.then(test.succeed, test.fail);
},
function testSourceShutdown() {
createReceiver()
.then(receiveAndCheckError(FATAL_ERROR))
.then(closeReceiver)
.then(test.succeed, test.fail);
},
], test.runTests, exports);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment