Commit 89533a51 authored by Reilly Grant's avatar Reilly Grant Committed by Commit Bot

[serial] Flush buffers when readable is canceled

This change enhances the cancel() method on SerialPort.readable so that
is requests a flush of the serial port's receive buffer as required by
the draft specification[1].

[1]: https://reillyeon.github.io/serial/#dfn-serialportsourcecancelalgorithm

Bug: 989653
Change-Id: I8f6550ea6ac7093a04ca814b1f1b6b8d5d08ee76
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2250366
Commit-Queue: Reilly Grant <reillyg@chromium.org>
Reviewed-by: default avatarOvidio de Jesús Ruiz-Henríquez <odejesush@chromium.org>
Reviewed-by: default avatarDominick Ng <dominickn@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#789677}
parent d5b47048
......@@ -278,12 +278,13 @@ ExtensionFunction::ResponseAction SerialFlushFunction::Run() {
if (!connection)
return RespondNow(Error(kErrorSerialConnectionNotFound));
connection->Flush(base::BindOnce(&SerialFlushFunction::OnFlushed, this));
connection->Flush(device::mojom::SerialPortFlushMode::kReceiveAndTransmit,
base::BindOnce(&SerialFlushFunction::OnFlushed, this));
return RespondLater();
}
void SerialFlushFunction::OnFlushed(bool success) {
Respond(OneArgument(std::make_unique<base::Value>(success)));
void SerialFlushFunction::OnFlushed() {
Respond(OneArgument(std::make_unique<base::Value>(true)));
}
SerialSetPausedFunction::SerialSetPausedFunction() = default;
......
......@@ -186,7 +186,7 @@ class SerialFlushFunction : public SerialExtensionFunction {
ResponseAction Run() override;
private:
void OnFlushed(bool success);
void OnFlushed();
};
class SerialGetControlSignalsFunction : public SerialExtensionFunction {
......
......@@ -126,7 +126,16 @@ class FakeSerialPort : public device::mojom::SerialPort {
out_stream_watcher_.ArmOrNotify();
}
void Flush(FlushCallback callback) override { std::move(callback).Run(true); }
void Flush(device::mojom::SerialPortFlushMode mode,
FlushCallback callback) override {
if (mode == device::mojom::SerialPortFlushMode::kReceiveAndTransmit) {
std::move(callback).Run();
return;
}
NOTREACHED();
}
void GetControlSignals(GetControlSignalsCallback callback) override {
auto signals = device::mojom::SerialPortControlSignals::New();
signals->dcd = true;
......
......@@ -474,10 +474,11 @@ void SerialConnection::GetInfo(GetInfoCompleteCallback callback) const {
std::move(resp_callback), nullptr));
}
void SerialConnection::Flush(FlushCompleteCallback callback) const {
void SerialConnection::Flush(device::mojom::SerialPortFlushMode mode,
FlushCompleteCallback callback) const {
DCHECK(serial_port_);
return serial_port_->Flush(
mojo::WrapCallbackWithDefaultInvokeIfNotRun(std::move(callback), false));
mode, mojo::WrapCallbackWithDefaultInvokeIfNotRun(std::move(callback)));
}
void SerialConnection::GetControlSignals(
......
......@@ -110,7 +110,8 @@ class SerialConnection : public ApiResource,
virtual void StartPolling(const ReceiveEventCallback& callback);
// Flushes input and output buffers.
void Flush(FlushCompleteCallback callback) const;
void Flush(device::mojom::SerialPortFlushMode mode,
FlushCompleteCallback callback) const;
// Configures some subset of port options for this connection.
// Omitted options are unchanged.
......
......@@ -47,7 +47,10 @@ class FakeSerialPort : public mojom::SerialPort {
out_stream_ = std::move(producer);
}
void Flush(FlushCallback callback) override { NOTREACHED(); }
void Flush(device::mojom::SerialPortFlushMode mode,
FlushCallback callback) override {
NOTREACHED();
}
void GetControlSignals(GetControlSignalsCallback callback) override {
NOTREACHED();
......
......@@ -66,6 +66,16 @@ enum SerialStopBits {
TWO,
};
enum SerialPortFlushMode {
// Flushes both receive and transmit buffers without discarding any bytes in
// the data pipes. This is for compatibility with chrome.serial.flush().
kReceiveAndTransmit,
// Flushes the receive buffers and discards data in the data_pipe_producer by
// closing it.
kReceive,
};
struct SerialConnectionOptions {
uint32 bitrate = 0;
SerialDataBits data_bits = NONE;
......@@ -146,8 +156,8 @@ interface SerialPort {
// called on |client| to indicate an error.
StartReading(handle<data_pipe_producer> producer);
// Flushes input and output buffers.
Flush() => (bool success);
// Flushes buffers according to the selected |mode|.
Flush(SerialPortFlushMode mode) => ();
// Reads current control signals (DCD, CTS, etc.).
GetControlSignals() => (SerialPortControlSignals signals);
......
......@@ -80,7 +80,7 @@ class SerialIoHandler : public base::RefCountedThreadSafe<SerialIoHandler> {
void CancelWrite(mojom::SerialSendError reason);
// Flushes input and output buffers.
virtual bool Flush() const = 0;
virtual void Flush(mojom::SerialPortFlushMode mode) const = 0;
// Reads current control signals (DCD, CTS, etc.) into an existing
// DeviceControlSignals structure. Returns |true| iff the signals were
......
......@@ -428,12 +428,19 @@ void SerialIoHandlerPosix::StopWatchingFileWrite() {
}
}
bool SerialIoHandlerPosix::Flush() const {
if (tcflush(file().GetPlatformFile(), TCIOFLUSH) != 0) {
VPLOG(1) << "Failed to flush port";
return false;
void SerialIoHandlerPosix::Flush(mojom::SerialPortFlushMode mode) const {
int queue_selector;
switch (mode) {
case mojom::SerialPortFlushMode::kReceiveAndTransmit:
queue_selector = TCIOFLUSH;
break;
case mojom::SerialPortFlushMode::kReceive:
queue_selector = TCIFLUSH;
break;
}
return true;
if (tcflush(file().GetPlatformFile(), queue_selector) != 0)
VPLOG(1) << "Failed to flush port";
}
mojom::SerialPortControlSignalsPtr SerialIoHandlerPosix::GetControlSignals()
......
......@@ -31,7 +31,7 @@ class SerialIoHandlerPosix : public SerialIoHandler {
bool ConfigurePortImpl() override;
bool PostOpen() override;
void PreClose() override;
bool Flush() const override;
void Flush(mojom::SerialPortFlushMode mode) const override;
mojom::SerialPortControlSignalsPtr GetControlSignals() const override;
bool SetControlSignals(
const mojom::SerialHostControlSignals& control_signals) override;
......
......@@ -268,6 +268,11 @@ void SerialIoHandlerWin::ReadImpl() {
DCHECK(pending_read_buffer());
DCHECK(file().IsValid());
if (is_comm_pending_) {
// Reuse the call to WaitCommEvent() from a canceled read.
return;
}
if (!SetCommMask(file().GetPlatformFile(), EV_RXCHAR)) {
VPLOG(1) << "Failed to set serial event flags";
}
......@@ -299,13 +304,23 @@ void SerialIoHandlerWin::WriteImpl() {
void SerialIoHandlerWin::CancelReadImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(file().IsValid());
::CancelIo(file().GetPlatformFile());
if (is_comm_pending_) {
// Clearing the event mask will cause an overlapped call to WaitCommEvent()
// to complete immediately.
if (!SetCommMask(file().GetPlatformFile(), 0))
VPLOG(1) << "Failed to clear event mask";
} else {
if (!PurgeComm(file().GetPlatformFile(), PURGE_RXABORT))
VPLOG(1) << "RX abort failed";
}
}
void SerialIoHandlerWin::CancelWriteImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(file().IsValid());
::CancelIo(file().GetPlatformFile());
if (!PurgeComm(file().GetPlatformFile(), PURGE_TXABORT))
VPLOG(1) << "TX abort failed";
}
bool SerialIoHandlerWin::ConfigurePortImpl() {
......@@ -370,6 +385,8 @@ void SerialIoHandlerWin::OnIOCompleted(
DWORD error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (context == comm_context_.get()) {
is_comm_pending_ = false;
DWORD errors;
if (!ClearCommError(file().GetPlatformFile(), &errors, nullptr)) {
VPLOG(1) << "Failed to clear communication error";
......@@ -446,12 +463,19 @@ void SerialIoHandlerWin::OnIOCompleted(
}
}
bool SerialIoHandlerWin::Flush() const {
if (!PurgeComm(file().GetPlatformFile(), PURGE_RXCLEAR | PURGE_TXCLEAR)) {
VPLOG(1) << "Failed to flush serial port";
return false;
void SerialIoHandlerWin::Flush(mojom::SerialPortFlushMode mode) const {
DWORD flags;
switch (mode) {
case mojom::SerialPortFlushMode::kReceiveAndTransmit:
flags = PURGE_RXCLEAR | PURGE_TXCLEAR;
break;
case mojom::SerialPortFlushMode::kReceive:
flags = PURGE_RXCLEAR;
break;
}
return true;
if (!PurgeComm(file().GetPlatformFile(), flags))
VPLOG(1) << "Failed to flush serial port";
}
mojom::SerialPortControlSignalsPtr SerialIoHandlerWin::GetControlSignals()
......
......@@ -26,7 +26,7 @@ class SerialIoHandlerWin : public SerialIoHandler,
void CancelReadImpl() override;
void CancelWriteImpl() override;
bool ConfigurePortImpl() override;
bool Flush() const override;
void Flush(mojom::SerialPortFlushMode mode) const override;
mojom::SerialPortControlSignalsPtr GetControlSignals() const override;
bool SetControlSignals(
const mojom::SerialHostControlSignals& control_signals) override;
......
......@@ -22,17 +22,27 @@ void SerialPortImpl::Create(
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher,
scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner) {
// This SerialPortImpl is owned by |receiver| and |watcher|.
new SerialPortImpl(path, std::move(receiver), std::move(watcher),
std::move(ui_task_runner));
new SerialPortImpl(
device::SerialIoHandler::Create(path, std::move(ui_task_runner)),
std::move(receiver), std::move(watcher));
}
// static
void SerialPortImpl::CreateForTesting(
scoped_refptr<SerialIoHandler> io_handler,
mojo::PendingReceiver<mojom::SerialPort> receiver,
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher) {
// This SerialPortImpl is owned by |receiver| and |watcher|.
new SerialPortImpl(std::move(io_handler), std::move(receiver),
std::move(watcher));
}
SerialPortImpl::SerialPortImpl(
const base::FilePath& path,
scoped_refptr<SerialIoHandler> io_handler,
mojo::PendingReceiver<mojom::SerialPort> receiver,
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher,
scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner)
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher)
: receiver_(this, std::move(receiver)),
io_handler_(device::SerialIoHandler::Create(path, ui_task_runner)),
io_handler_(std::move(io_handler)),
watcher_(std::move(watcher)),
in_stream_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
out_stream_watcher_(FROM_HERE,
......@@ -93,8 +103,27 @@ void SerialPortImpl::StartReading(mojo::ScopedDataPipeProducerHandle producer) {
out_stream_watcher_.ArmOrNotify();
}
void SerialPortImpl::Flush(FlushCallback callback) {
std::move(callback).Run(io_handler_->Flush());
void SerialPortImpl::Flush(mojom::SerialPortFlushMode mode,
FlushCallback callback) {
if (mode == mojom::SerialPortFlushMode::kReceive) {
io_handler_->CancelRead(mojom::SerialReceiveError::NONE);
}
io_handler_->Flush(mode);
if (mode == mojom::SerialPortFlushMode::kReceive) {
if (io_handler_->IsReadPending()) {
// Delay closing |out_stream_| because |io_handler_| still holds a pointer
// into the shared memory owned by the pipe.
read_flush_callback_ = std::move(callback);
return;
}
out_stream_watcher_.Cancel();
out_stream_.reset();
}
std::move(callback).Run();
}
void SerialPortImpl::GetControlSignals(GetControlSignalsCallback callback) {
......@@ -203,7 +232,7 @@ void SerialPortImpl::ReadFromPortAndWriteOut(
return;
}
// The code should not reach other cases.
NOTREACHED();
NOTREACHED() << "Unexpected Mojo result: " << result;
}
void SerialPortImpl::WriteToOutStream(uint32_t bytes_read,
......@@ -214,11 +243,20 @@ void SerialPortImpl::WriteToOutStream(uint32_t bytes_read,
if (error != mojom::SerialReceiveError::NONE) {
out_stream_watcher_.Cancel();
out_stream_.reset();
if (client_) {
if (client_)
client_->OnReadError(error);
}
if (read_flush_callback_)
std::move(read_flush_callback_).Run();
return;
}
if (read_flush_callback_) {
std::move(read_flush_callback_).Run();
out_stream_watcher_.Cancel();
out_stream_.reset();
return;
}
out_stream_watcher_.ArmOrNotify();
}
......
......@@ -39,12 +39,16 @@ class SerialPortImpl : public mojom::SerialPort {
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher,
scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner);
static void CreateForTesting(
scoped_refptr<SerialIoHandler> io_handler,
mojo::PendingReceiver<mojom::SerialPort> receiver,
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher);
private:
SerialPortImpl(
const base::FilePath& path,
scoped_refptr<SerialIoHandler> io_handler,
mojo::PendingReceiver<mojom::SerialPort> receiver,
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher,
scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner);
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher);
~SerialPortImpl() override;
// mojom::SerialPort methods:
......@@ -53,7 +57,7 @@ class SerialPortImpl : public mojom::SerialPort {
OpenCallback callback) override;
void StartWriting(mojo::ScopedDataPipeConsumerHandle consumer) override;
void StartReading(mojo::ScopedDataPipeProducerHandle producer) override;
void Flush(FlushCallback callback) override;
void Flush(mojom::SerialPortFlushMode mode, FlushCallback callback) override;
void GetControlSignals(GetControlSignalsCallback callback) override;
void SetControlSignals(mojom::SerialHostControlSignalsPtr signals,
SetControlSignalsCallback callback) override;
......@@ -85,6 +89,10 @@ class SerialPortImpl : public mojom::SerialPort {
mojo::ScopedDataPipeProducerHandle out_stream_;
mojo::SimpleWatcher out_stream_watcher_;
// Holds the callback for a pending flush until pending read operations have
// been completed.
FlushCallback read_flush_callback_;
base::WeakPtrFactory<SerialPortImpl> weak_factory_{this};
DISALLOW_COPY_AND_ASSIGN(SerialPortImpl);
};
......
......@@ -4,74 +4,164 @@
#include "services/device/serial/serial_port_impl.h"
#include "base/macros.h"
#include "base/test/bind_test_util.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "services/device/device_service_test_base.h"
#include "services/device/public/mojom/serial.mojom.h"
#include "services/device/serial/serial_io_handler.h"
namespace device {
namespace {
class FakeSerialIoHandler : public SerialIoHandler {
public:
FakeSerialIoHandler()
: SerialIoHandler(base::FilePath(), /*ui_thread_task_runner=*/nullptr) {}
void Open(const mojom::SerialConnectionOptions& options,
OpenCompleteCallback callback) override {
std::move(callback).Run(true);
}
void Flush(mojom::SerialPortFlushMode mode) const override {}
mojom::SerialPortControlSignalsPtr GetControlSignals() const override {
return mojom::SerialPortControlSignals::New();
}
bool SetControlSignals(
const mojom::SerialHostControlSignals& control_signals) override {
return true;
}
mojom::SerialConnectionInfoPtr GetPortInfo() const override {
return mojom::SerialConnectionInfo::New();
}
void ReadImpl() override {}
void WriteImpl() override {}
void CancelReadImpl() override {
QueueReadCompleted(/*bytes_read=*/0, mojom::SerialReceiveError::NONE);
}
void CancelWriteImpl() override {
QueueWriteCompleted(/*bytes_written=*/0, mojom::SerialSendError::NONE);
}
bool ConfigurePortImpl() override { return true; }
private:
~FakeSerialIoHandler() override = default;
};
} // namespace
class SerialPortImplTest : public DeviceServiceTestBase {
public:
SerialPortImplTest() = default;
SerialPortImplTest(const SerialPortImplTest& other) = delete;
void operator=(const SerialPortImplTest& other) = delete;
~SerialPortImplTest() override = default;
protected:
DISALLOW_COPY_AND_ASSIGN(SerialPortImplTest);
void CreatePort(
mojo::Remote<mojom::SerialPort>* port,
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher>* watcher) {
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher_remote;
*watcher = mojo::MakeSelfOwnedReceiver(
std::make_unique<mojom::SerialPortConnectionWatcher>(),
watcher_remote.InitWithNewPipeAndPassReceiver());
SerialPortImpl::CreateForTesting(
base::MakeRefCounted<FakeSerialIoHandler>(),
port->BindNewPipeAndPassReceiver(), std::move(watcher_remote));
}
void CreateDataPipe(mojo::ScopedDataPipeProducerHandle* producer,
mojo::ScopedDataPipeConsumerHandle* consumer) {
MojoCreateDataPipeOptions options;
options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1;
options.capacity_num_bytes = 64;
MojoResult result = mojo::CreateDataPipe(&options, producer, consumer);
DCHECK_EQ(result, MOJO_RESULT_OK);
}
};
TEST_F(SerialPortImplTest, WatcherClosedWhenPortClosed) {
mojo::Remote<mojom::SerialPort> serial_port;
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher;
auto watcher_receiver = mojo::MakeSelfOwnedReceiver(
std::make_unique<mojom::SerialPortConnectionWatcher>(),
watcher.InitWithNewPipeAndPassReceiver());
SerialPortImpl::Create(
base::FilePath(), serial_port.BindNewPipeAndPassReceiver(),
std::move(watcher), base::ThreadTaskRunnerHandle::Get());
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher> watcher;
CreatePort(&serial_port, &watcher);
// To start with both the serial port connection and the connection watcher
// connection should remain open.
serial_port.FlushForTesting();
EXPECT_TRUE(serial_port.is_connected());
watcher_receiver->FlushForTesting();
EXPECT_TRUE(watcher_receiver);
watcher->FlushForTesting();
EXPECT_TRUE(watcher);
// When the serial port connection is closed the watcher connection should be
// closed.
serial_port.reset();
watcher_receiver->FlushForTesting();
EXPECT_FALSE(watcher_receiver);
watcher->FlushForTesting();
EXPECT_FALSE(watcher);
}
TEST_F(SerialPortImplTest, PortClosedWhenWatcherClosed) {
mojo::Remote<mojom::SerialPort> serial_port;
mojo::PendingRemote<mojom::SerialPortConnectionWatcher> watcher;
auto watcher_receiver = mojo::MakeSelfOwnedReceiver(
std::make_unique<mojom::SerialPortConnectionWatcher>(),
watcher.InitWithNewPipeAndPassReceiver());
SerialPortImpl::Create(
base::FilePath(), serial_port.BindNewPipeAndPassReceiver(),
std::move(watcher), base::ThreadTaskRunnerHandle::Get());
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher> watcher;
CreatePort(&serial_port, &watcher);
// To start with both the serial port connection and the connection watcher
// connection should remain open.
serial_port.FlushForTesting();
EXPECT_TRUE(serial_port.is_connected());
watcher_receiver->FlushForTesting();
EXPECT_TRUE(watcher_receiver);
watcher->FlushForTesting();
EXPECT_TRUE(watcher);
// When the watcher connection is closed, for safety, the serial port
// connection should also be closed.
watcher_receiver->Close();
watcher->Close();
serial_port.FlushForTesting();
EXPECT_FALSE(serial_port.is_connected());
}
} // namespace
TEST_F(SerialPortImplTest, FlushRead) {
mojo::Remote<mojom::SerialPort> serial_port;
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher> watcher;
CreatePort(&serial_port, &watcher);
mojo::ScopedDataPipeProducerHandle producer;
mojo::ScopedDataPipeConsumerHandle consumer;
CreateDataPipe(&producer, &consumer);
serial_port->StartReading(std::move(producer));
// Calling Flush(kReceive) should cause the data pipe to close.
base::RunLoop watcher_loop;
mojo::SimpleWatcher pipe_watcher(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
EXPECT_EQ(pipe_watcher.Watch(consumer.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindLambdaForTesting(
[&](MojoResult result,
const mojo::HandleSignalsState& state) {
EXPECT_EQ(result, MOJO_RESULT_OK);
EXPECT_TRUE(state.peer_closed());
watcher_loop.Quit();
})),
MOJO_RESULT_OK);
base::RunLoop loop;
serial_port->Flush(mojom::SerialPortFlushMode::kReceive, loop.QuitClosure());
loop.Run();
watcher_loop.Run();
}
} // namespace device
......@@ -449,12 +449,21 @@ void SerialPort::AbortClose() {
closing_ = false;
}
void SerialPort::Flush(
device::mojom::blink::SerialPortFlushMode mode,
device::mojom::blink::SerialPort::FlushCallback callback) {
DCHECK(port_.is_bound());
port_->Flush(mode, std::move(callback));
}
void SerialPort::UnderlyingSourceClosed() {
DCHECK(readable_);
readable_ = nullptr;
underlying_source_ = nullptr;
}
void SerialPort::UnderlyingSinkClosed() {
DCHECK(writable_);
writable_ = nullptr;
underlying_sink_ = nullptr;
}
......
......@@ -59,11 +59,14 @@ class SerialPort final : public ScriptWrappable,
const base::UnguessableToken& token() const { return info_->token; }
void UnderlyingSourceClosed();
void UnderlyingSinkClosed();
ScriptPromise ContinueClose(ScriptState*);
void AbortClose();
void Flush(device::mojom::blink::SerialPortFlushMode mode,
device::mojom::blink::SerialPort::FlushCallback callback);
void UnderlyingSourceClosed();
void UnderlyingSinkClosed();
void ContextDestroyed();
void Trace(Visitor*) const override;
......
......@@ -4,10 +4,12 @@
#include "third_party/blink/renderer/modules/serial/serial_port_underlying_source.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/modules/serial/serial_port.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
namespace blink {
......@@ -41,12 +43,16 @@ ScriptPromise SerialPortUnderlyingSource::pull(ScriptState* script_state) {
ScriptPromise SerialPortUnderlyingSource::Cancel(ScriptState* script_state,
ScriptValue reason) {
// TODO(crbug.com/989653): Rather than calling Close(), cancel() should
// trigger a purge of the serial read buffer and wait for the pipe to close to
// indicate the purge has been completed.
DCHECK(data_pipe_);
Close();
ExpectPipeClose();
return ScriptPromise::CastUndefined(script_state);
auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state);
serial_port_->Flush(
device::mojom::blink::SerialPortFlushMode::kReceive,
WTF::Bind(&SerialPortUnderlyingSource::OnFlush, WrapPersistent(this),
WrapPersistent(resolver)));
return resolver->Promise();
}
void SerialPortUnderlyingSource::ContextDestroyed() {
......@@ -136,6 +142,11 @@ void SerialPortUnderlyingSource::OnHandleReady(
}
}
void SerialPortUnderlyingSource::OnFlush(ScriptPromiseResolver* resolver) {
serial_port_->UnderlyingSourceClosed();
resolver->Resolve();
}
void SerialPortUnderlyingSource::ExpectPipeClose() {
if (data_pipe_) {
// The pipe is still open. Wait for PipeClosed() to be called.
......
......@@ -12,6 +12,7 @@
namespace blink {
class DOMException;
class ScriptPromiseResolver;
class SerialPort;
class SerialPortUnderlyingSource : public UnderlyingSourceBase {
......@@ -37,6 +38,7 @@ class SerialPortUnderlyingSource : public UnderlyingSourceBase {
void ArmWatcher();
void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);
void OnFlush(ScriptPromiseResolver*);
void ExpectPipeClose();
void PipeClosed();
void Close();
......
......@@ -93,15 +93,18 @@ class DataPipeSink {
}
async write(chunk, controller) {
let {result, numBytes} = this._producer.writeData(chunk);
if (result == Mojo.RESULT_OK) {
if (numBytes < chunk.byteLength)
return this.write(chunk.slice(numBytes), controller);
} else if (result == Mojo.RESULT_FAILED_PRECONDITION) {
throw new DOMException("The pipe is closed.", "InvalidStateError");
} else if (result == Mojo.RESULT_SHOULD_WAIT) {
await this.writable();
return this.write(chunk, controller);
while (true) {
let {result, numBytes} = this._producer.writeData(chunk);
if (result == Mojo.RESULT_OK) {
if (numBytes == chunk.byteLength) {
return;
}
chunk = chunk.slice(numBytes);
} else if (result == Mojo.RESULT_FAILED_PRECONDITION) {
throw new DOMException('The pipe is closed.', 'InvalidStateError');
} else if (result == Mojo.RESULT_SHOULD_WAIT) {
await this.writable();
}
}
}
......@@ -145,7 +148,7 @@ class FakeSerialPort {
}
write(data) {
this.writer_.write(data);
return this.writer_.write(data);
}
async read() {
......@@ -246,8 +249,15 @@ class FakeSerialPort {
}
}
async flush() {
return { success: false };
async flush(mode) {
switch (mode) {
case device.mojom.SerialPortFlushMode.kReceive:
this.writer_.abort();
this.writer_.releaseLock();
this.writer_ = undefined;
this.writable_ = undefined;
break;
}
}
async getControlSignals() {
......
......@@ -7,7 +7,6 @@
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
// Select a buffer size smaller than the amount of data transferred.
await port.open({baudrate: 9600, buffersize: 64});
const reader = port.readable.getReader();
......@@ -19,3 +18,37 @@ serial_test(async (t, fake) => {
await port.close();
}, 'Can cancel while reading');
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
await port.open({baudrate: 9600, buffersize: 64});
const reader = port.readable.getReader();
await fakePort.writable();
const data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
await fakePort.write(data);
await reader.cancel();
await port.close();
}, 'Cancel discards a small amount of data waiting to be read');
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
// Select a buffer size smaller than the amount of data transferred.
await port.open({baudrate: 9600, buffersize: 64});
const reader = port.readable.getReader();
await fakePort.writable();
const data = new Uint8Array(1024);
// Writing will fail because there was more data to send than could fit in the
// buffer and none of it was read.
const writePromise =
promise_rejects_dom(t, 'InvalidStateError', fakePort.write(data));
await reader.cancel();
await writePromise;
await port.close();
}, 'Cancel discards a large amount of data waiting to be read');
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment