Commit 8cf4d09e authored by Tom Anderson's avatar Tom Anderson Committed by Chromium LUCI CQ

[XProto] Refactor response handling

* Now that every request is made by XProto,
  x11::Connection::Request::sequence is no longer needed.  Now sequences
  can be determined by (their index in the queue) +
  x11::Connection::first_request_id_.
  * This makes it easier to work with requests out-of-order, which will
    be necessary for creating property/window caches in the future.
* Move some xcb calls from xproto_types.cc into connection.cc.  I want
  to dlopen() libxcb in the future to make it possible to run Chrome on
  systems without any X11 libraries, so having everything consolidated
  will make that easier.
* Add Future::Wait().  Similar to Sync(), this blocks until a response
  is received, but instead of returning the response, it calls the
  installed response handler out-of-order.

R=sky

Bug: 739898
Change-Id: I10c4c18d3c3090ba09d660a5090fd7c0f01830f5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2565440
Commit-Queue: Thomas Anderson <thomasanderson@chromium.org>
Reviewed-by: default avatarScott Violet <sky@chromium.org>
Cr-Commit-Position: refs/heads/master@{#834386}
parent 6f438eb0
......@@ -16,6 +16,7 @@
#include "base/no_destructor.h"
#include "base/strings/string16.h"
#include "base/threading/thread_local.h"
#include "base/trace_event/trace_event.h"
#include "ui/gfx/x/bigreq.h"
#include "ui/gfx/x/event.h"
#include "ui/gfx/x/keyboard_state.h"
......@@ -160,11 +161,16 @@ Connection::Connection(const std::string& address)
for (const auto& format : setup_.pixmap_formats)
formats[format.depth] = &format;
std::vector<std::pair<VisualId, VisualInfo>> default_screen_visuals;
for (const auto& depth : default_screen().allowed_depths) {
const Format* format = formats[depth.depth];
for (const auto& visual : depth.visuals)
default_screen_visuals_[visual.visual_id] = VisualInfo{format, &visual};
for (const auto& visual : depth.visuals) {
default_screen_visuals.emplace_back(visual.visual_id,
VisualInfo{format, &visual});
}
}
default_screen_visuals_ =
base::flat_map<VisualId, VisualInfo>(std::move(default_screen_visuals));
keyboard_state_ = CreateKeyboardState(this);
......@@ -192,16 +198,16 @@ XlibDisplayWrapper Connection::GetXlibDisplay(XlibDisplayType type) {
return XlibDisplayWrapper(xlib_display_->display_, type);
}
Connection::Request::Request(unsigned int sequence,
FutureBase::ResponseCallback callback)
: sequence(sequence), callback(std::move(callback)) {}
Connection::Request::Request(FutureBase::ResponseCallback callback,
const char* request_name_for_tracing,
bool generates_reply)
: callback(std::move(callback)),
request_name_for_tracing(request_name_for_tracing),
generates_reply(generates_reply) {
DCHECK(this->callback);
}
Connection::Request::Request(Request&& other)
: sequence(other.sequence),
callback(std::move(other.callback)),
have_response(other.have_response),
reply(std::move(other.reply)),
error(std::move(other.error)) {}
Connection::Request::Request(Request&& other) = default;
Connection::Request::~Request() = default;
......@@ -215,7 +221,7 @@ bool Connection::HasNextResponse() {
void* reply = nullptr;
xcb_generic_error_t* error = nullptr;
request.have_response =
xcb_poll_for_reply(XcbConnection(), request.sequence, &reply, &error);
xcb_poll_for_reply(XcbConnection(), first_request_id_, &reply, &error);
if (reply)
request.reply = base::MakeRefCounted<MallocedRefCountedMemory>(reply);
if (error)
......@@ -346,8 +352,14 @@ void Connection::Dispatch(Delegate* delegate) {
DCHECK(requests_.front().have_response);
Request request = std::move(requests_.front());
requests_.pop();
std::move(request.callback).Run(request.reply, request.error);
requests_.pop_front();
if (last_non_void_request_id_.has_value() &&
last_non_void_request_id_.value() == first_request_id_) {
last_non_void_request_id_ = base::nullopt;
}
first_request_id_++;
if (request.callback)
std::move(request.callback).Run(request.reply, request.error);
};
auto process_next_event = [&] {
......@@ -370,7 +382,7 @@ void Connection::Dispatch(Delegate* delegate) {
continue;
}
auto next_response_sequence = requests_.front().sequence;
auto next_response_sequence = first_request_id_;
auto next_event_sequence = events_.front().sequence();
// All events have the sequence number of the last processed request
......@@ -415,13 +427,137 @@ void Connection::InitRootDepthAndVisual() {
NOTREACHED();
}
void Connection::AddRequest(unsigned int sequence,
FutureBase::ResponseCallback callback) {
void Connection::AddRequest(SequenceType sequence,
FutureBase::ResponseCallback callback,
const char* request_name_for_tracing,
bool generates_reply) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(callback);
SequenceType next_request_id = first_request_id_ + requests_.size();
DCHECK_EQ(CompareSequenceIds(next_request_id, sequence), 0);
// If we ever reach 2^32 outstanding requests, then bail because sequence IDs
// would no longer be unique.
next_request_id++;
CHECK_NE(next_request_id, first_request_id_);
requests_.emplace_back(std::move(callback), request_name_for_tracing,
generates_reply);
if (generates_reply)
last_non_void_request_id_ = sequence;
if (synchronous_)
Sync();
}
void Connection::UpdateRequestHandler(SequenceType sequence,
FutureBase::ResponseCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(callback);
auto* request = GetRequestForSequence(sequence);
// Make sure we haven't processed this request yet.
DCHECK(request->callback);
request->callback = std::move(callback);
}
void Connection::WaitForResponse(SequenceType sequence) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto* request = GetRequestForSequence(sequence);
DCHECK(request->callback);
if (request->have_response)
return;
if (request->generates_reply) {
xcb_generic_error_t* error = nullptr;
void* reply = nullptr;
if (!xcb_poll_for_reply(XcbConnection(), sequence, &reply, &error)) {
TRACE_EVENT1("ui", "xcb_wait_for_reply", "request",
request->request_name_for_tracing);
reply = xcb_wait_for_reply(XcbConnection(), sequence, &error);
}
if (reply)
request->reply = base::MakeRefCounted<MallocedRefCountedMemory>(reply);
if (error)
request->error = base::MakeRefCounted<MallocedRefCountedMemory>(error);
} else {
// There's a special case here. This request doesn't generate a reply, and
// may not generate an error, so the only way to know if it finished is to
// send another request that we know will generate a reply or error. Once
// the new request finishes, we know this request has finished, since the
// server is guaranteed to process requests in order. Normally, the
// xcb_request_check() below would do this for us automatically, but we need
// to keep track of the sequence count ourselves, so we explicitly make a
// GetInputFocus request if necessary (which is the request xcb would have
// made -- GetInputFocus is chosen since it has the minimum size request and
// reply, and can be made at any time).
if (NeedsExtraRequestForCheck(sequence)) {
GetInputFocus({}).IgnoreError();
// The circular_deque may have swapped buffers, so we need to get a fresh
// pointer to the request.
request = GetRequestForSequence(sequence);
}
// libxcb has a bug where it doesn't flush in xcb_request_check() under some
// circumstances, leading to deadlock [1], so always perform a manual flush.
// [1] https://gitlab.freedesktop.org/xorg/lib/libxcb/-/issues/53
Flush();
xcb_generic_error_t* error = nullptr;
{
TRACE_EVENT1("ui", "xcb_request_check", "request",
request->request_name_for_tracing);
error = xcb_request_check(XcbConnection(), {sequence});
}
if (error)
request->error = base::MakeRefCounted<MallocedRefCountedMemory>(error);
}
request->have_response = true;
}
void Connection::ProcessResponse(SequenceType sequence) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto* request = GetRequestForSequence(sequence);
DCHECK(request->callback);
DCHECK(request->have_response);
std::move(request->callback).Run(request->reply, request->error);
}
void Connection::TakeResponse(SequenceType sequence,
FutureBase::RawReply* raw_reply,
FutureBase::RawError* raw_error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
auto* request = GetRequestForSequence(sequence);
DCHECK(request->callback);
DCHECK(request->have_response);
*raw_reply = std::move(request->reply);
*raw_error = std::move(request->error);
request->callback.Reset();
}
bool Connection::NeedsExtraRequestForCheck(SequenceType sequence) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!last_non_void_request_id_.has_value())
return true;
SequenceType last_non_void_offset =
last_non_void_request_id_.value() - first_request_id_;
SequenceType sequence_offset = sequence - first_request_id_;
return sequence_offset > last_non_void_offset;
}
Connection::Request* Connection::GetRequestForSequence(SequenceType sequence) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(requests_.empty() ||
CompareSequenceIds(requests_.back().sequence, sequence) < 0);
requests_.emplace(sequence, std::move(callback));
SequenceType offset = sequence - first_request_id_;
DCHECK_LT(offset, requests_.size());
return &requests_[offset];
}
void Connection::PreDispatchEvent(const Event& event) {
......
......@@ -9,6 +9,9 @@
#include <queue>
#include "base/component_export.h"
#include "base/containers/circular_deque.h"
#include "base/containers/flat_map.h"
#include "base/optional.h"
#include "base/sequence_checker.h"
#include "ui/events/platform/platform_event_source.h"
#include "ui/gfx/x/event.h"
......@@ -119,8 +122,6 @@ class COMPONENT_EXPORT(X11) Connection : public XProto,
// If |synchronous| is true, this makes all requests Sync().
void SynchronizeForTest(bool synchronous);
bool synchronous() const { return synchronous_; }
// Read all responses from the socket without blocking.
void ReadResponses();
......@@ -165,13 +166,29 @@ class COMPONENT_EXPORT(X11) Connection : public XProto,
private:
friend class FutureBase;
// xcb returns unsigned int when making requests. This may be updated to
// uint16_t if/when we stop using xcb for socket IO.
using SequenceType = unsigned int;
struct Request {
Request(unsigned int sequence, FutureBase::ResponseCallback callback);
Request(FutureBase::ResponseCallback callback,
const char* request_name_for_tracing,
bool generates_reply);
Request(Request&& other);
~Request();
const unsigned int sequence;
// If |callback| is nullptr, then this request has already been processed
// out-of-order.
FutureBase::ResponseCallback callback;
const char* const request_name_for_tracing;
const bool generates_reply;
// Indicates if |reply| and |error| are available. A separate
// |have_response| flag is necessary to distinguish the case where a request
// hasn't finished yet from the case where a request finished but didn't
// generate a reply or error.
bool have_response = false;
FutureBase::RawReply reply;
FutureBase::RawError error;
......@@ -179,7 +196,36 @@ class COMPONENT_EXPORT(X11) Connection : public XProto,
void InitRootDepthAndVisual();
void AddRequest(unsigned int sequence, FutureBase::ResponseCallback callback);
// Creates a new Request and adds it to the end of the queue.
// |request_name_for_tracing| must be valid until the response is dispatched;
// currently the string values are only stored in .rodata, so this constraint
// is satisfied.
void AddRequest(SequenceType sequence,
FutureBase::ResponseCallback callback,
const char* request_name_for_tracing,
bool generates_reply);
// Update an existing Request with a new handler. |sequence| must correspond
// to a request in the queue that has not already been processed out-of-order.
void UpdateRequestHandler(SequenceType sequence,
FutureBase::ResponseCallback callback);
// Block until the reply or error for request |sequence| is received.
void WaitForResponse(SequenceType sequence);
// Call the response handler for request |sequence| now (out-of-order). The
// response must already have been obtained from a call to WaitForResponse().
void ProcessResponse(SequenceType sequence);
// Clear the response handler for request |sequence| and take the response.
// The response must already have been obtained using WaitForResponse().
void TakeResponse(SequenceType sequence,
FutureBase::RawReply* reply,
FutureBase::RawError* error);
bool NeedsExtraRequestForCheck(SequenceType sequence) const;
Request* GetRequestForSequence(SequenceType sequence);
bool HasNextResponse();
......@@ -209,13 +255,20 @@ class COMPONENT_EXPORT(X11) Connection : public XProto,
Depth* default_root_depth_ = nullptr;
VisualType* default_root_visual_ = nullptr;
std::unordered_map<VisualId, VisualInfo> default_screen_visuals_;
base::flat_map<VisualId, VisualInfo> default_screen_visuals_;
std::unique_ptr<KeyboardState> keyboard_state_;
std::list<Event> events_;
std::queue<Request> requests_;
base::circular_deque<Request> requests_;
// The sequence ID of requests_.front(), or if |requests_| is empty, then the
// ID of the next request that will go in the queue. This starts at 1 because
// the 0'th request is handled internally by XCB when opening the connection.
SequenceType first_request_id_ = 1;
// If any request in |requests_| will generate a reply, this is the ID of the
// latest one, otherwise this is base::nullopt.
base::Optional<SequenceType> last_non_void_request_id_;
using ErrorParser =
std::unique_ptr<Error> (*)(FutureBase::RawError error_bytes);
......
......@@ -61,11 +61,11 @@ UnretainedRefCountedMemory::~UnretainedRefCountedMemory() = default;
base::Optional<unsigned int> SendRequestImpl(x11::Connection* connection,
WriteBuffer* buf,
bool is_void,
bool generates_reply,
bool reply_has_fds) {
xcb_protocol_request_t xpr{
.ext = nullptr,
.isvoid = is_void,
.isvoid = !generates_reply,
};
struct RequestHeader {
......@@ -125,8 +125,6 @@ base::Optional<unsigned int> SendRequestImpl(x11::Connection* connection,
if (xcb_connection_has_error(conn))
return base::nullopt;
if (connection->synchronous())
connection->Sync();
return sequence;
}
......
......@@ -134,7 +134,7 @@ inline void Align(ReadBuffer* buf, size_t align) {
base::Optional<unsigned int> SendRequestImpl(x11::Connection* connection,
WriteBuffer* buf,
bool is_void,
bool generates_reply,
bool reply_has_fds);
template <typename Reply>
......@@ -142,7 +142,7 @@ Future<Reply> SendRequest(x11::Connection* connection,
WriteBuffer* buf,
bool reply_has_fds,
const char* request_name) {
auto sequence = SendRequestImpl(connection, buf, std::is_void<Reply>::value,
auto sequence = SendRequestImpl(connection, buf, !std::is_void<Reply>::value,
reply_has_fds);
return {sequence ? connection : nullptr, sequence,
sequence ? request_name : nullptr};
......
......@@ -7,7 +7,6 @@
#include <xcb/xcbext.h>
#include "base/memory/scoped_refptr.h"
#include "base/trace_event/trace_event.h"
#include "ui/gfx/x/connection.h"
#include "ui/gfx/x/xproto_internal.h"
......@@ -81,79 +80,49 @@ void WriteBuffer::AppendCurrentBuffer() {
FutureBase::FutureBase(Connection* connection,
base::Optional<unsigned int> sequence,
const char* request_name)
const char* request_name,
bool generates_reply)
: connection_(connection),
sequence_(sequence),
request_name_(request_name) {}
// If a user-defined response-handler is not installed before this object goes
// out of scope, a default response handler will be installed. The default
// handler throws away the reply and prints the error if there is one.
FutureBase::~FutureBase() {
if (!sequence_)
sequence_valid_(sequence.has_value()),
sequence_(sequence_valid_ ? sequence.value() : 0) {
if (!sequence_valid_)
return;
OnResponseImpl(base::BindOnce(
[](Connection* connection, const char* request_name,
Connection::ErrorHandler error_handler, RawReply raw_reply,
RawError raw_error) {
if (!raw_error)
return;
auto error = connection->ParseError(raw_error);
error_handler.Run(error.get(), request_name);
},
connection_, request_name_, connection_->error_handler_));
}
FutureBase::FutureBase(FutureBase&& future)
: connection_(future.connection_),
sequence_(future.sequence_),
request_name_(future.request_name_) {
future.Reset();
}
FutureBase& FutureBase::operator=(FutureBase&& future) {
connection_ = future.connection_;
sequence_ = future.sequence_;
request_name_ = future.request_name_;
future.Reset();
return *this;
// Install a default response-handler that throws away the reply and prints
// the error if there is one. This handler may be overridden by clients.
connection_->AddRequest(
sequence_,
base::BindOnce(
[](Connection* connection, const char* request_name,
Connection::ErrorHandler error_handler, RawReply raw_reply,
RawError raw_error) {
if (!raw_error)
return;
auto error = connection->ParseError(raw_error);
error_handler.Run(error.get(), request_name);
},
connection_, request_name, connection_->error_handler_),
request_name, generates_reply);
}
void FutureBase::SyncImpl(RawError* raw_error, RawReply* raw_reply) {
if (!sequence_)
void FutureBase::Wait() {
if (!sequence_valid_)
return;
xcb_generic_error_t* error = nullptr;
void* reply = nullptr;
if (!xcb_poll_for_reply(connection_->XcbConnection(), *sequence_, &reply,
&error)) {
TRACE_EVENT1("ui", "xcb_wait_for_reply", "request", request_name_);
reply =
xcb_wait_for_reply(connection_->XcbConnection(), *sequence_, &error);
}
if (reply)
*raw_reply = base::MakeRefCounted<MallocedRefCountedMemory>(reply);
if (error)
*raw_error = base::MakeRefCounted<MallocedRefCountedMemory>(error);
sequence_ = base::nullopt;
connection_->WaitForResponse(sequence_);
connection_->ProcessResponse(sequence_);
}
void FutureBase::SyncImpl(RawError* raw_error) {
if (!sequence_)
void FutureBase::SyncImpl(RawError* raw_reply, RawReply* raw_error) {
if (!sequence_valid_)
return;
if (xcb_generic_error_t* error =
xcb_request_check(connection_->XcbConnection(), {*sequence_})) {
*raw_error = base::MakeRefCounted<MallocedRefCountedMemory>(error);
}
sequence_ = base::nullopt;
connection_->WaitForResponse(sequence_);
connection_->TakeResponse(sequence_, raw_reply, raw_error);
}
void FutureBase::OnResponseImpl(ResponseCallback callback) {
if (!sequence_)
if (!sequence_valid_)
return;
connection_->AddRequest(*sequence_, std::move(callback));
sequence_ = base::nullopt;
connection_->UpdateRequestHandler(sequence_, std::move(callback));
}
// static
......@@ -164,10 +133,4 @@ std::unique_ptr<Error> FutureBase::ParseErrorImpl(x11::Connection* connection,
return connection->ParseError(raw_error);
}
void FutureBase::Reset() {
connection_ = nullptr;
sequence_ = base::nullopt;
request_name_ = nullptr;
}
} // namespace x11
......@@ -142,20 +142,18 @@ class COMPONENT_EXPORT(X11) FutureBase {
using ResponseCallback =
base::OnceCallback<void(RawReply reply, RawError error)>;
FutureBase(const FutureBase&) = delete;
FutureBase& operator=(const FutureBase&) = delete;
// Block until this request is handled by the server. Unlike Sync(), this
// method doesn't return the response. Rather, it calls the response handler
// installed for this request out-of-order.
void Wait();
protected:
FutureBase(Connection* connection,
base::Optional<unsigned int> sequence,
const char* request_name);
~FutureBase();
const char* request_name,
bool generates_reply);
FutureBase(FutureBase&& future);
FutureBase& operator=(FutureBase&& future);
void SyncImpl(RawError* raw_error, RawReply* raw_reply);
void SyncImpl(RawError* raw_error);
void SyncImpl(RawReply* raw_reply, RawError* raw_error);
void OnResponseImpl(ResponseCallback callback);
......@@ -165,11 +163,9 @@ class COMPONENT_EXPORT(X11) FutureBase {
RawError raw_error);
private:
void Reset();
Connection* connection_ = nullptr;
base::Optional<unsigned int> sequence_;
const char* request_name_ = nullptr;
bool sequence_valid_ = false;
unsigned int sequence_ = 0;
};
// An x11::Future wraps an asynchronous response from the X11 server. The
......@@ -180,13 +176,13 @@ class Future : public FutureBase {
public:
using Callback = base::OnceCallback<void(Response<Reply> response)>;
Future() : FutureBase(nullptr, base::nullopt, nullptr) {}
Future() : FutureBase(nullptr, base::nullopt, nullptr, false) {}
// Blocks until we receive the response from the server. Returns the response.
Response<Reply> Sync() {
RawError raw_error;
RawReply raw_reply;
SyncImpl(&raw_error, &raw_reply);
RawError raw_error;
SyncImpl(&raw_reply, &raw_error);
std::unique_ptr<Reply> reply;
if (raw_reply) {
......@@ -229,15 +225,20 @@ class Future : public FutureBase {
Future(Connection* connection,
base::Optional<unsigned int> sequence,
const char* request_name)
: FutureBase(connection, sequence, request_name) {}
: FutureBase(connection,
sequence,
request_name,
!std::is_void<Reply>::value) {}
};
// Sync() specialization for requests that don't generate replies. The returned
// response will only contain an error if there was one.
template <>
inline Response<void> Future<void>::Sync() {
RawReply raw_reply;
RawError raw_error;
SyncImpl(&raw_error);
SyncImpl(&raw_reply, &raw_error);
DCHECK(!raw_reply);
return Response<void>{ParseErrorImpl(connection(), raw_error)};
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment