Commit b5120650 authored by Reilly Grant's avatar Reilly Grant Committed by Commit Bot

serial: Flush and drain writable on abort() and close()

This change enhances the abort() and close() methods on the writable
attribute of a SerialPort object so that it is possible to request
either a flush[1] (discarding buffered data) or drain[2] (waiting until
buffered data has been transmitted).

[1]: https://reillyeon.github.io/serial/#serialportsinkabortalgorithm-port
[2]: https://reillyeon.github.io/serial/#serialportsinkclosealgorithm-port

Bug: 989653, 989656
Change-Id: I95226e6f3069214b483935abdf4ae1e87d5961a9
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2277074
Commit-Queue: Reilly Grant <reillyg@chromium.org>
Reviewed-by: default avatarDominick Ng <dominickn@chromium.org>
Reviewed-by: default avatarOvidio de Jesús Ruiz-Henríquez <odejesush@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#789724}
parent 806fbc53
......@@ -136,6 +136,8 @@ class FakeSerialPort : public device::mojom::SerialPort {
NOTREACHED();
}
void Drain(DrainCallback callback) override { NOTREACHED(); }
void GetControlSignals(GetControlSignalsCallback callback) override {
auto signals = device::mojom::SerialPortControlSignals::New();
signals->dcd = true;
......
......@@ -52,6 +52,8 @@ class FakeSerialPort : public mojom::SerialPort {
NOTREACHED();
}
void Drain(DrainCallback callback) override { NOTREACHED(); }
void GetControlSignals(GetControlSignalsCallback callback) override {
NOTREACHED();
}
......
......@@ -74,6 +74,10 @@ enum SerialPortFlushMode {
// Flushes the receive buffers and discards data in the data_pipe_producer by
// closing it.
kReceive,
// Flushes the send buffers and discards data in the data_pipe_consumer by
// closing it.
kTransmit,
};
struct SerialConnectionOptions {
......@@ -159,6 +163,10 @@ interface SerialPort {
// Flushes buffers according to the selected |mode|.
Flush(SerialPortFlushMode mode) => ();
// Waits for the data_pipe_consumer passed to StartWriting() to be closed and
// all buffered data to be transmitted by the port.
Drain() => ();
// Reads current control signals (DCD, CTS, etc.).
GetControlSignals() => (SerialPortControlSignals signals);
......
......@@ -82,6 +82,9 @@ class SerialIoHandler : public base::RefCountedThreadSafe<SerialIoHandler> {
// Flushes input and output buffers.
virtual void Flush(mojom::SerialPortFlushMode mode) const = 0;
// Drains output buffers.
virtual void Drain() = 0;
// Reads current control signals (DCD, CTS, etc.) into an existing
// DeviceControlSignals structure. Returns |true| iff the signals were
// successfully read.
......
......@@ -437,12 +437,20 @@ void SerialIoHandlerPosix::Flush(mojom::SerialPortFlushMode mode) const {
case mojom::SerialPortFlushMode::kReceive:
queue_selector = TCIFLUSH;
break;
case mojom::SerialPortFlushMode::kTransmit:
queue_selector = TCOFLUSH;
break;
}
if (tcflush(file().GetPlatformFile(), queue_selector) != 0)
VPLOG(1) << "Failed to flush port";
}
void SerialIoHandlerPosix::Drain() {
if (tcdrain(file().GetPlatformFile()) != 0)
VPLOG(1) << "Failed to drain port";
}
mojom::SerialPortControlSignalsPtr SerialIoHandlerPosix::GetControlSignals()
const {
int status;
......
......@@ -32,6 +32,7 @@ class SerialIoHandlerPosix : public SerialIoHandler {
bool PostOpen() override;
void PreClose() override;
void Flush(mojom::SerialPortFlushMode mode) const override;
void Drain() override;
mojom::SerialPortControlSignalsPtr GetControlSignals() const override;
bool SetControlSignals(
const mojom::SerialHostControlSignals& control_signals) override;
......
......@@ -472,12 +472,20 @@ void SerialIoHandlerWin::Flush(mojom::SerialPortFlushMode mode) const {
case mojom::SerialPortFlushMode::kReceive:
flags = PURGE_RXCLEAR;
break;
case mojom::SerialPortFlushMode::kTransmit:
flags = PURGE_TXCLEAR;
break;
}
if (!PurgeComm(file().GetPlatformFile(), flags))
VPLOG(1) << "Failed to flush serial port";
}
void SerialIoHandlerWin::Drain() {
if (!FlushFileBuffers(file().GetPlatformFile()))
VPLOG(1) << "Failed to drain serial port";
}
mojom::SerialPortControlSignalsPtr SerialIoHandlerWin::GetControlSignals()
const {
DWORD status;
......
......@@ -27,6 +27,7 @@ class SerialIoHandlerWin : public SerialIoHandler,
void CancelWriteImpl() override;
bool ConfigurePortImpl() override;
void Flush(mojom::SerialPortFlushMode mode) const override;
void Drain() override;
mojom::SerialPortControlSignalsPtr GetControlSignals() const override;
bool SetControlSignals(
const mojom::SerialHostControlSignals& control_signals) override;
......
......@@ -105,27 +105,62 @@ void SerialPortImpl::StartReading(mojo::ScopedDataPipeProducerHandle producer) {
void SerialPortImpl::Flush(mojom::SerialPortFlushMode mode,
FlushCallback callback) {
if (mode == mojom::SerialPortFlushMode::kReceive) {
io_handler_->CancelRead(mojom::SerialReceiveError::NONE);
switch (mode) {
case mojom::SerialPortFlushMode::kReceiveAndTransmit:
// Do nothing. This case exists to support the chrome.serial.flush()
// method.
break;
case mojom::SerialPortFlushMode::kReceive:
io_handler_->CancelRead(mojom::SerialReceiveError::NONE);
break;
case mojom::SerialPortFlushMode::kTransmit:
io_handler_->CancelWrite(mojom::SerialSendError::NONE);
break;
}
io_handler_->Flush(mode);
if (mode == mojom::SerialPortFlushMode::kReceive) {
if (io_handler_->IsReadPending()) {
// Delay closing |out_stream_| because |io_handler_| still holds a pointer
// into the shared memory owned by the pipe.
read_flush_callback_ = std::move(callback);
return;
}
out_stream_watcher_.Cancel();
out_stream_.reset();
switch (mode) {
case mojom::SerialPortFlushMode::kReceiveAndTransmit:
// Do nothing. This case exists to support the chrome.serial.flush()
// method.
break;
case mojom::SerialPortFlushMode::kReceive:
if (io_handler_->IsReadPending()) {
// Delay closing |out_stream_| because |io_handler_| still holds a
// pointer into the shared memory owned by the pipe.
read_flush_callback_ = std::move(callback);
return;
}
out_stream_watcher_.Cancel();
out_stream_.reset();
break;
case mojom::SerialPortFlushMode::kTransmit:
if (io_handler_->IsWritePending()) {
// Delay closing |in_stream_| because |io_handler_| still holds a
// pointer into the shared memory owned by the pipe.
write_flush_callback_ = std::move(callback);
return;
}
in_stream_watcher_.Cancel();
in_stream_.reset();
break;
}
std::move(callback).Run();
}
void SerialPortImpl::Drain(DrainCallback callback) {
if (!in_stream_) {
std::move(callback).Run();
return;
}
drain_callback_ = std::move(callback);
}
void SerialPortImpl::GetControlSignals(GetControlSignalsCallback callback) {
std::move(callback).Run(io_handler_->GetControlSignals());
}
......@@ -178,6 +213,11 @@ void SerialPortImpl::WriteToPort(MojoResult result,
// The |in_stream_| has been closed.
in_stream_watcher_.Cancel();
in_stream_.reset();
if (drain_callback_) {
io_handler_->Drain();
std::move(drain_callback_).Run();
}
return;
}
// The code should not reach other cases.
......
......@@ -58,6 +58,7 @@ class SerialPortImpl : public mojom::SerialPort {
void StartWriting(mojo::ScopedDataPipeConsumerHandle consumer) override;
void StartReading(mojo::ScopedDataPipeProducerHandle producer) override;
void Flush(mojom::SerialPortFlushMode mode, FlushCallback callback) override;
void Drain(DrainCallback callback) override;
void GetControlSignals(GetControlSignalsCallback callback) override;
void SetControlSignals(mojom::SerialHostControlSignalsPtr signals,
SetControlSignalsCallback callback) override;
......@@ -89,9 +90,11 @@ class SerialPortImpl : public mojom::SerialPort {
mojo::ScopedDataPipeProducerHandle out_stream_;
mojo::SimpleWatcher out_stream_watcher_;
// Holds the callback for a pending flush until pending read operations have
// been completed.
// Holds the callback for a flush or drain until pending operations have been
// completed.
FlushCallback read_flush_callback_;
FlushCallback write_flush_callback_;
DrainCallback drain_callback_;
base::WeakPtrFactory<SerialPortImpl> weak_factory_{this};
DISALLOW_COPY_AND_ASSIGN(SerialPortImpl);
......
......@@ -29,6 +29,7 @@ class FakeSerialIoHandler : public SerialIoHandler {
}
void Flush(mojom::SerialPortFlushMode mode) const override {}
void Drain() override {}
mojom::SerialPortControlSignalsPtr GetControlSignals() const override {
return mojom::SerialPortControlSignals::New();
......@@ -164,4 +165,53 @@ TEST_F(SerialPortImplTest, FlushRead) {
watcher_loop.Run();
}
TEST_F(SerialPortImplTest, FlushWrite) {
mojo::Remote<mojom::SerialPort> serial_port;
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher> watcher;
CreatePort(&serial_port, &watcher);
mojo::ScopedDataPipeProducerHandle producer;
mojo::ScopedDataPipeConsumerHandle consumer;
CreateDataPipe(&producer, &consumer);
serial_port->StartWriting(std::move(consumer));
// Calling Flush(kTransmit) should cause the data pipe to close.
base::RunLoop watcher_loop;
mojo::SimpleWatcher pipe_watcher(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
EXPECT_EQ(pipe_watcher.Watch(producer.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindLambdaForTesting(
[&](MojoResult result,
const mojo::HandleSignalsState& state) {
EXPECT_EQ(result, MOJO_RESULT_OK);
EXPECT_TRUE(state.peer_closed());
watcher_loop.Quit();
})),
MOJO_RESULT_OK);
base::RunLoop loop;
serial_port->Flush(mojom::SerialPortFlushMode::kTransmit, loop.QuitClosure());
loop.Run();
watcher_loop.Run();
}
TEST_F(SerialPortImplTest, Drain) {
mojo::Remote<mojom::SerialPort> serial_port;
mojo::SelfOwnedReceiverRef<mojom::SerialPortConnectionWatcher> watcher;
CreatePort(&serial_port, &watcher);
mojo::ScopedDataPipeProducerHandle producer;
mojo::ScopedDataPipeConsumerHandle consumer;
CreateDataPipe(&producer, &consumer);
serial_port->StartWriting(std::move(consumer));
// Drain() will wait for the data pipe to close before replying.
producer.reset();
base::RunLoop loop;
serial_port->Drain(loop.QuitClosure());
loop.Run();
}
} // namespace device
......@@ -456,6 +456,12 @@ void SerialPort::Flush(
port_->Flush(mode, std::move(callback));
}
void SerialPort::Drain(
device::mojom::blink::SerialPort::DrainCallback callback) {
DCHECK(port_.is_bound());
port_->Drain(std::move(callback));
}
void SerialPort::UnderlyingSourceClosed() {
DCHECK(readable_);
readable_ = nullptr;
......
......@@ -64,6 +64,7 @@ class SerialPort final : public ScriptWrappable,
void Flush(device::mojom::blink::SerialPortFlushMode mode,
device::mojom::blink::SerialPort::FlushCallback callback);
void Drain(device::mojom::blink::SerialPort::DrainCallback callback);
void UnderlyingSourceClosed();
void UnderlyingSinkClosed();
......
......@@ -38,7 +38,7 @@ ScriptPromise SerialPortUnderlyingSink::write(
// There can only be one call to write() in progress at a time.
DCHECK(buffer_source_.IsNull());
DCHECK_EQ(0u, offset_);
DCHECK(!pending_write_);
DCHECK(!pending_operation_);
if (pending_exception_) {
DOMException* exception = pending_exception_;
......@@ -54,8 +54,9 @@ ScriptPromise SerialPortUnderlyingSink::write(
if (exception_state.HadException())
return ScriptPromise();
pending_write_ = MakeGarbageCollected<ScriptPromiseResolver>(script_state);
ScriptPromise promise = pending_write_->Promise();
pending_operation_ =
MakeGarbageCollected<ScriptPromiseResolver>(script_state);
ScriptPromise promise = pending_operation_->Promise();
WriteData();
return promise;
......@@ -65,22 +66,24 @@ ScriptPromise SerialPortUnderlyingSink::close(ScriptState* script_state,
ExceptionState& exception_state) {
// The specification guarantees that this will only be called after all
// pending writes have been completed.
DCHECK(!pending_write_);
DCHECK(!pending_operation_);
watcher_.Cancel();
data_pipe_.reset();
serial_port_->UnderlyingSinkClosed();
if (pending_exception_) {
DOMException* exception = pending_exception_;
pending_exception_ = nullptr;
exception_state.RethrowV8Exception(ToV8(exception, script_state));
serial_port_->UnderlyingSinkClosed();
return ScriptPromise();
}
// TODO(crbug.com/989656): close() should wait for data to be flushed before
// resolving. This will require waiting for |data_pipe_| to close.
return ScriptPromise::CastUndefined(script_state);
pending_operation_ =
MakeGarbageCollected<ScriptPromiseResolver>(script_state);
serial_port_->Drain(WTF::Bind(&SerialPortUnderlyingSink::OnFlushOrDrain,
WrapPersistent(this)));
return pending_operation_->Promise();
}
ScriptPromise SerialPortUnderlyingSink::abort(ScriptState* script_state,
......@@ -88,22 +91,38 @@ ScriptPromise SerialPortUnderlyingSink::abort(ScriptState* script_state,
ExceptionState& exception_state) {
// The specification guarantees that this will only be called after all
// pending writes have been completed.
// TODO(crbug.com/969653): abort() should trigger a purge of the serial write
// buffers.
return close(script_state, exception_state);
DCHECK(!pending_operation_);
watcher_.Cancel();
data_pipe_.reset();
if (pending_exception_) {
DOMException* exception = pending_exception_;
pending_exception_ = nullptr;
exception_state.RethrowV8Exception(ToV8(exception, script_state));
serial_port_->UnderlyingSinkClosed();
return ScriptPromise();
}
pending_operation_ =
MakeGarbageCollected<ScriptPromiseResolver>(script_state);
serial_port_->Flush(device::mojom::blink::SerialPortFlushMode::kTransmit,
WTF::Bind(&SerialPortUnderlyingSink::OnFlushOrDrain,
WrapPersistent(this)));
return pending_operation_->Promise();
}
void SerialPortUnderlyingSink::SignalErrorOnClose(DOMException* exception) {
if (data_pipe_ || !pending_write_) {
if (data_pipe_ || !pending_operation_) {
// Pipe is still open or we don't have a write operation that can be failed.
// Wait for PipeClosed() to be called.
pending_exception_ = exception;
return;
}
if (pending_write_) {
pending_write_->Reject(exception);
pending_write_ = nullptr;
if (pending_operation_) {
pending_operation_->Reject(exception);
pending_operation_ = nullptr;
serial_port_->UnderlyingSinkClosed();
}
}
......@@ -112,7 +131,7 @@ void SerialPortUnderlyingSink::Trace(Visitor* visitor) const {
visitor->Trace(serial_port_);
visitor->Trace(pending_exception_);
visitor->Trace(buffer_source_);
visitor->Trace(pending_write_);
visitor->Trace(pending_operation_);
UnderlyingSinkBase::Trace(visitor);
}
......@@ -130,9 +149,25 @@ void SerialPortUnderlyingSink::OnHandleReady(MojoResult result,
}
}
void SerialPortUnderlyingSink::OnFlushOrDrain() {
ScriptPromiseResolver* resolver = pending_operation_;
pending_operation_ = nullptr;
DOMException* exception = pending_exception_;
pending_exception_ = nullptr;
serial_port_->UnderlyingSinkClosed();
if (exception) {
resolver->Reject(exception);
} else {
resolver->Resolve();
}
}
void SerialPortUnderlyingSink::WriteData() {
DCHECK(data_pipe_);
DCHECK(pending_write_);
DCHECK(pending_operation_);
DCHECK(!buffer_source_.IsNull());
const uint8_t* data = nullptr;
......@@ -167,8 +202,8 @@ void SerialPortUnderlyingSink::WriteData() {
if (offset_ == length) {
buffer_source_ = ArrayBufferOrArrayBufferView();
offset_ = 0;
pending_write_->Resolve();
pending_write_ = nullptr;
pending_operation_->Resolve();
pending_operation_ = nullptr;
break;
}
FALLTHROUGH;
......@@ -184,7 +219,7 @@ void SerialPortUnderlyingSink::WriteData() {
}
void SerialPortUnderlyingSink::PipeClosed() {
DCHECK(pending_write_);
DCHECK(pending_operation_);
watcher_.Cancel();
data_pipe_.reset();
......@@ -193,8 +228,8 @@ void SerialPortUnderlyingSink::PipeClosed() {
DOMException* exception = pending_exception_;
pending_exception_ = nullptr;
serial_port_->UnderlyingSinkClosed();
pending_write_->Reject(exception);
pending_write_ = nullptr;
pending_operation_->Reject(exception);
pending_operation_ = nullptr;
}
}
......
......@@ -41,6 +41,7 @@ class SerialPortUnderlyingSink final : public UnderlyingSinkBase {
private:
void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);
void OnFlushOrDrain();
void WriteData();
void PipeClosed();
......@@ -51,7 +52,11 @@ class SerialPortUnderlyingSink final : public UnderlyingSinkBase {
ArrayBufferOrArrayBufferView buffer_source_;
uint32_t offset_ = 0;
Member<ScriptPromiseResolver> pending_write_;
// Only one outstanding call to write(), close() or abort() is allowed at a
// time. This holds the ScriptPromiseResolver for the Promise returned by any
// of these functions.
Member<ScriptPromiseResolver> pending_operation_;
};
} // namespace blink
......
......@@ -28,7 +28,7 @@ async function readWithLength(reader, targetLength) {
let actualLength = 0;
while (true) {
let { value, done } = await reader.read();
let {value, done} = await reader.read();
chunks.push(value);
actualLength += value.byteLength;
......@@ -69,7 +69,8 @@ class DataPipeSource {
}
cancel() {
this.watcher_.cancel();
if (this.watcher_)
this.watcher_.cancel();
this.consumer_.close();
}
......@@ -151,11 +152,14 @@ class FakeSerialPort {
return this.writer_.write(data);
}
async read() {
let reader = this.readable_.getReader();
let result = await reader.read();
reader.releaseLock();
return result;
read() {
return this.reader_.read();
}
// Reads from the port until at least |targetLength| is read or the stream is
// closed. The data is returned as a combined Uint8Array.
readWithLength(targetLength) {
return readWithLength(this.reader_, targetLength);
}
simulateReadError(error) {
......@@ -175,7 +179,8 @@ class FakeSerialPort {
}
simulateWriteError(error) {
this.readable_.cancel();
this.reader_.cancel();
this.reader_ = undefined;
this.readable_ = undefined;
this.client_.onSendError(error);
}
......@@ -232,6 +237,7 @@ class FakeSerialPort {
async startWriting(in_stream) {
this.readable_ = new ReadableStream(new DataPipeSource(in_stream));
this.reader_ = this.readable_.getReader();
if (this.readableResolver_) {
this.readableResolver_();
this.readableResolver_ = undefined;
......@@ -257,9 +263,18 @@ class FakeSerialPort {
this.writer_ = undefined;
this.writable_ = undefined;
break;
case device.mojom.SerialPortFlushMode.kTransmit:
this.reader_.cancel();
this.reader_ = undefined;
this.readable_ = undefined;
break;
}
}
async drain() {
await this.reader_.closed;
}
async getControlSignals() {
return { signals: this.inputSignals_ };
}
......
......@@ -45,7 +45,7 @@ serial_test(async (t, fake) => {
const writer = port.writable.getWriter();
const data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
let writePromise = writer.write(data);
writer.close();
writer.releaseLock();
await fakePort.readable();
let {value, done} = await fakePort.read();
......@@ -66,12 +66,10 @@ serial_test(async (t, fake) => {
for (let i = 0; i < data.byteLength; ++i)
data[i] = i & 0xff;
writer.write(data);
writer.close();
writer.releaseLock();
await fakePort.readable();
const reader = fakePort.readable_.getReader();
const value = await readWithLength(reader, data.byteLength);
reader.releaseLock();
const value = await fakePort.readWithLength(data.byteLength);
compareArrays(data, value);
await port.close();
......@@ -95,7 +93,7 @@ serial_test(async (t, fake) => {
writer = port.writable.getWriter();
let writePromise = writer.write(data);
writer.close();
writer.releaseLock();
await fakePort.readable();
let {value, done} = await fakePort.read();
await writePromise;
......@@ -121,6 +119,64 @@ serial_test(async (t, fake) => {
await port.close();
}, 'Disconnect error closes writable and sets it to null');
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
await port.open({baudrate: 9600, buffersize: 64});
const originalWritable = port.writable;
assert_true(originalWritable instanceof WritableStream);
let writer = originalWritable.getWriter();
let data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
// The buffer size is large enough to allow this write to complete without
// the data being read from the fake port.
await writer.write(data);
await writer.abort();
assert_true(port.writable instanceof WritableStream);
assert_true(port.writable !== originalWritable);
writer = port.writable.getWriter();
data = new Uint8Array([9, 10, 11, 12, 13, 14, 15, 16]);
const writePromise = writer.write(data);
writer.releaseLock();
await fakePort.readable();
const {value, done} = await fakePort.read();
await writePromise;
compareArrays(value, data);
await port.close();
assert_equals(port.writable, null);
}, 'abort() discards the write buffer');
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
// Select a buffer size smaller than the amount of data transferred.
await port.open({baudrate: 9600, buffersize: 64});
const writer = port.writable.getWriter();
const data = new Uint8Array(1024); // Much larger than buffersize above.
for (let i = 0; i < data.byteLength; ++i)
data[i] = i & 0xff;
writer.write(data);
let readComplete = false;
let writePromise = writer.close().then(() => {
assert_true(readComplete);
});
await fakePort.readable();
let readPromise = fakePort.readWithLength(data.byteLength).then(result => {
readComplete = true;
return result;
});
const value = await readPromise;
compareArrays(data, value);
await writePromise;
await port.close();
}, 'close() waits for the write buffer to be cleared');
serial_test(async (t, fake) => {
const {port, fakePort} = await getFakeSerialPort(fake);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment