Commit cb0454d5 authored by Steve Anton's avatar Steve Anton Committed by Commit Bot

Reflect P2PQuicStream changes in Blink bindings

Bug: 874296
Change-Id: Ic86b9420c7cbfc190a79fef13dfe4494bf039c3c
Reviewed-on: https://chromium-review.googlesource.com/c/1318392
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: default avatarHenrik Boström <hbos@chromium.org>
Cr-Commit-Position: refs/heads/master@{#605859}
parent ed886866
...@@ -49,16 +49,22 @@ void QuicStreamHost::Reset() { ...@@ -49,16 +49,22 @@ void QuicStreamHost::Reset() {
Delete(); Delete();
} }
// TODO(https://crbug.com/874296): When the blink binding (RTCQuicStream) is void QuicStreamHost::MarkReceivedDataConsumed(uint32_t amount) {
// updated to support reading/writing, remove this function.
void QuicStreamHost::Finish() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(p2p_stream_); DCHECK(p2p_stream_);
std::vector<uint8_t> data; p2p_stream_->MarkReceivedDataConsumed(amount);
p2p_stream_->WriteData(data, true); }
writeable_ = false;
if (!readable_ && !writeable_) { void QuicStreamHost::WriteData(std::vector<uint8_t> data, bool fin) {
Delete(); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(p2p_stream_);
p2p_stream_->WriteData(data, fin);
if (fin) {
DCHECK(writable_);
writable_ = false;
if (!readable_ && !writable_) {
Delete();
}
} }
} }
...@@ -70,23 +76,26 @@ void QuicStreamHost::OnRemoteReset() { ...@@ -70,23 +76,26 @@ void QuicStreamHost::OnRemoteReset() {
Delete(); Delete();
} }
// TODO(https://crbug.com/874296): When the blink binding (RTCQuicStream) is
// updated to support reading/writing, update this function to do more than just
// call OnRemoteFinish.
void QuicStreamHost::OnDataReceived(std::vector<uint8_t> data, bool fin) { void QuicStreamHost::OnDataReceived(std::vector<uint8_t> data, bool fin) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!fin) { PostCrossThreadTask(*proxy_thread(), FROM_HERE,
return; CrossThreadBind(&QuicStreamProxy::OnDataReceived,
} stream_proxy_, std::move(data), fin));
PostCrossThreadTask( if (fin) {
*proxy_thread(), FROM_HERE, readable_ = false;
CrossThreadBind(&QuicStreamProxy::OnRemoteFinish, stream_proxy_)); if (!readable_ && !writable_) {
readable_ = false; Delete();
if (!readable_ && !writeable_) { }
Delete();
} }
} }
void QuicStreamHost::OnWriteDataConsumed(uint32_t amount) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
PostCrossThreadTask(*proxy_thread(), FROM_HERE,
CrossThreadBind(&QuicStreamProxy::OnWriteDataConsumed,
stream_proxy_, amount));
}
void QuicStreamHost::Delete() { void QuicStreamHost::Delete() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(transport_host_); DCHECK(transport_host_);
......
...@@ -56,7 +56,8 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>, ...@@ -56,7 +56,8 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>,
scoped_refptr<base::SingleThreadTaskRunner> proxy_thread() const; scoped_refptr<base::SingleThreadTaskRunner> proxy_thread() const;
void Reset(); void Reset();
void Finish(); void MarkReceivedDataConsumed(uint32_t amount);
void WriteData(std::vector<uint8_t> data, bool fin);
private: private:
// Instruct the QuicTransportHost to remove and delete this stream host. // Instruct the QuicTransportHost to remove and delete this stream host.
...@@ -65,6 +66,7 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>, ...@@ -65,6 +66,7 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>,
// P2PQuicStream::Delegate overrides. // P2PQuicStream::Delegate overrides.
void OnRemoteReset() override; void OnRemoteReset() override;
void OnDataReceived(std::vector<uint8_t> data, bool fin) override; void OnDataReceived(std::vector<uint8_t> data, bool fin) override;
void OnWriteDataConsumed(uint32_t amount) override;
// Up reference. Owned by QuicTransportProxy. // Up reference. Owned by QuicTransportProxy.
QuicTransportHost* transport_host_ = nullptr; QuicTransportHost* transport_host_ = nullptr;
...@@ -73,10 +75,10 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>, ...@@ -73,10 +75,10 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>,
// Back reference. Owned by QuicTransportProxy. // Back reference. Owned by QuicTransportProxy.
base::WeakPtr<QuicStreamProxy> stream_proxy_; base::WeakPtr<QuicStreamProxy> stream_proxy_;
// |readable_| transitions to false when OnRemoteFinish() is called. // |readable_| transitions to false when OnDataReceived(_, true) is called.
bool readable_ = true; bool readable_ = true;
// |writeable_| transitions to false when Finish() is called. // |writable_| transitions to false when WriteData(_, true) is called.
bool writeable_ = true; bool writable_ = true;
THREAD_CHECKER(thread_checker_); THREAD_CHECKER(thread_checker_);
}; };
......
...@@ -51,13 +51,23 @@ void QuicStreamProxy::Reset() { ...@@ -51,13 +51,23 @@ void QuicStreamProxy::Reset() {
Delete(); Delete();
} }
void QuicStreamProxy::Finish() { void QuicStreamProxy::MarkReceivedDataConsumed(uint32_t amount) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
PostCrossThreadTask(*host_thread(), FROM_HERE, PostCrossThreadTask(*host_thread(), FROM_HERE,
CrossThreadBind(&QuicStreamHost::Finish, stream_host_)); CrossThreadBind(&QuicStreamHost::MarkReceivedDataConsumed,
writeable_ = false; stream_host_, amount));
if (!readable_ && !writeable_) { }
Delete();
void QuicStreamProxy::WriteData(std::vector<uint8_t> data, bool fin) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
PostCrossThreadTask(*host_thread(), FROM_HERE,
CrossThreadBind(&QuicStreamHost::WriteData, stream_host_,
std::move(data), fin));
if (fin) {
writable_ = false;
if (!readable_ && !writable_) {
Delete();
}
} }
} }
...@@ -70,16 +80,24 @@ void QuicStreamProxy::OnRemoteReset() { ...@@ -70,16 +80,24 @@ void QuicStreamProxy::OnRemoteReset() {
delegate_copy->OnRemoteReset(); delegate_copy->OnRemoteReset();
} }
void QuicStreamProxy::OnRemoteFinish() { void QuicStreamProxy::OnDataReceived(std::vector<uint8_t> data, bool fin) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(delegate_); DCHECK(delegate_);
// Need to copy the |delegate_| member since Delete() will destroy |this|. // Need to copy the |delegate_| member since Delete() will destroy |this|.
Delegate* delegate_copy = delegate_; Delegate* delegate_copy = delegate_;
readable_ = false; if (fin) {
if (!readable_ && !writeable_) { readable_ = false;
Delete(); if (!readable_ && !writable_) {
Delete();
}
} }
delegate_copy->OnRemoteFinish(); delegate_copy->OnDataReceived(std::move(data), fin);
}
void QuicStreamProxy::OnWriteDataConsumed(uint32_t amount) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(delegate_);
delegate_->OnWriteDataConsumed(amount);
} }
void QuicStreamProxy::Delete() { void QuicStreamProxy::Delete() {
......
...@@ -43,8 +43,10 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> { ...@@ -43,8 +43,10 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> {
// Called when the remote side resets the stream. // Called when the remote side resets the stream.
virtual void OnRemoteReset() {} virtual void OnRemoteReset() {}
// Called when the remote side finishes the stream. // Called when the remote side receives data and/or the finish bit.
virtual void OnRemoteFinish() {} virtual void OnDataReceived(std::vector<uint8_t> data, bool fin) {}
// Called when data written with WriteData() has been consumed by QUIC.
virtual void OnWriteDataConsumed(uint32_t amount) {}
}; };
QuicStreamProxy(); QuicStreamProxy();
...@@ -67,7 +69,8 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> { ...@@ -67,7 +69,8 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> {
scoped_refptr<base::SingleThreadTaskRunner> host_thread() const; scoped_refptr<base::SingleThreadTaskRunner> host_thread() const;
void Reset(); void Reset();
void Finish(); void MarkReceivedDataConsumed(uint32_t amount);
void WriteData(std::vector<uint8_t> data, bool fin);
private: private:
// Instruct the QuicTransportProxy to remove and delete this stream proxy. // Instruct the QuicTransportProxy to remove and delete this stream proxy.
...@@ -76,7 +79,8 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> { ...@@ -76,7 +79,8 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> {
// Callbacks from QuicStreamHost. // Callbacks from QuicStreamHost.
friend class QuicStreamHost; friend class QuicStreamHost;
void OnRemoteReset(); void OnRemoteReset();
void OnRemoteFinish(); void OnDataReceived(std::vector<uint8_t> data, bool fin);
void OnWriteDataConsumed(uint32_t amount);
// Up reference. Owned by the QuicTransportProxy client. // Up reference. Owned by the QuicTransportProxy client.
QuicTransportProxy* transport_proxy_ = nullptr; QuicTransportProxy* transport_proxy_ = nullptr;
...@@ -85,10 +89,10 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> { ...@@ -85,10 +89,10 @@ class QuicStreamProxy final : public base::SupportsWeakPtr<QuicStreamProxy> {
// Back reference. Owned by the RTCQuicTransport. // Back reference. Owned by the RTCQuicTransport.
Delegate* delegate_ = nullptr; Delegate* delegate_ = nullptr;
// |readable_| transitions to false when OnRemoteFinish() is called. // |readable_| transitions to false when OnDataReceived(_, true) is called.
bool readable_ = true; bool readable_ = true;
// |writeable_| transitions to false when Finish() is called. // |writable_| transitions to false when WriteData(_, true) is called.
bool writeable_ = true; bool writable_ = true;
THREAD_CHECKER(thread_checker_); THREAD_CHECKER(thread_checker_);
}; };
......
...@@ -86,6 +86,12 @@ struct CrossThreadCopier<std::pair<cricket::Candidate, cricket::Candidate>> ...@@ -86,6 +86,12 @@ struct CrossThreadCopier<std::pair<cricket::Candidate, cricket::Candidate>>
STATIC_ONLY(CrossThreadCopier); STATIC_ONLY(CrossThreadCopier);
}; };
template <>
struct CrossThreadCopier<std::vector<uint8_t>>
: public CrossThreadCopierPassThrough<std::vector<uint8_t>> {
STATIC_ONLY(CrossThreadCopier);
};
} // namespace blink } // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_PEERCONNECTION_ADAPTERS_WEB_RTC_CROSS_THREAD_COPIER_H_ #endif // THIRD_PARTY_BLINK_RENDERER_MODULES_PEERCONNECTION_ADAPTERS_WEB_RTC_CROSS_THREAD_COPIER_H_
...@@ -53,7 +53,7 @@ void RTCQuicStream::finish() { ...@@ -53,7 +53,7 @@ void RTCQuicStream::finish() {
if (!writeable_) { if (!writeable_) {
return; return;
} }
proxy_->Finish(); proxy_->WriteData({}, /*fin=*/true);
writeable_ = false; writeable_ = false;
if (readable_) { if (readable_) {
DCHECK_EQ(state_, RTCQuicStreamState::kOpen); DCHECK_EQ(state_, RTCQuicStreamState::kOpen);
...@@ -92,7 +92,10 @@ void RTCQuicStream::OnRemoteReset() { ...@@ -92,7 +92,10 @@ void RTCQuicStream::OnRemoteReset() {
DispatchEvent(*Event::Create(event_type_names::kStatechange)); DispatchEvent(*Event::Create(event_type_names::kStatechange));
} }
void RTCQuicStream::OnRemoteFinish() { void RTCQuicStream::OnDataReceived(std::vector<uint8_t> data, bool fin) {
if (!fin) {
return;
}
DCHECK_NE(state_, RTCQuicStreamState::kClosed); DCHECK_NE(state_, RTCQuicStreamState::kClosed);
DCHECK(readable_); DCHECK(readable_);
readable_ = false; readable_ = false;
...@@ -106,6 +109,8 @@ void RTCQuicStream::OnRemoteFinish() { ...@@ -106,6 +109,8 @@ void RTCQuicStream::OnRemoteFinish() {
DispatchEvent(*Event::Create(event_type_names::kStatechange)); DispatchEvent(*Event::Create(event_type_names::kStatechange));
} }
void RTCQuicStream::OnWriteDataConsumed(uint32_t amount) {}
const AtomicString& RTCQuicStream::InterfaceName() const { const AtomicString& RTCQuicStream::InterfaceName() const {
return event_target_names::kRTCQuicStream; return event_target_names::kRTCQuicStream;
} }
......
...@@ -55,7 +55,8 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData, ...@@ -55,7 +55,8 @@ class MODULES_EXPORT RTCQuicStream final : public EventTargetWithInlineData,
// QuicStreamProxy::Delegate overrides. // QuicStreamProxy::Delegate overrides.
void OnRemoteReset() override; void OnRemoteReset() override;
void OnRemoteFinish() override; void OnDataReceived(std::vector<uint8_t> data, bool fin) override;
void OnWriteDataConsumed(uint32_t amount) override;
Member<RTCQuicTransport> transport_; Member<RTCQuicTransport> transport_;
RTCQuicStreamState state_ = RTCQuicStreamState::kOpen; RTCQuicStreamState state_ = RTCQuicStreamState::kOpen;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment