Commit 956058ab authored by Qingsi Wang's avatar Qingsi Wang Committed by Commit Bot

Block following mDNS packets on an incomplete send in the mDNS responder

service and ignore msg-too-big read errors.

Blocked packets are buffered in a send queue with a limited capacity,
and are flushed after the previous send is complete.

We originally remove a socket when it encounters read errors and reboot
the service when no socket left. The msg-too-big error will be ignored
after this change.

Bug: 933869, 964951
Change-Id: I3665dd732526591f350834757bc0e6d1c0d6b3dd
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1617614
Commit-Queue: Qingsi Wang <qingsi@chromium.org>
Reviewed-by: default avatarEric Orth <ericorth@chromium.org>
Cr-Commit-Position: refs/heads/master@{#662837}
parent 4367d762
......@@ -4,6 +4,7 @@
#include <algorithm>
#include <numeric>
#include <queue>
#include <utility>
#include "services/network/mdns_responder.h"
......@@ -19,6 +20,7 @@
#include "base/sys_byteorder.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/default_tick_clock.h"
#include "base/timer/timer.h"
#include "net/base/address_family.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_address.h"
......@@ -45,9 +47,11 @@
//
// 3) Support parsing the authority section of a query in the wire format to
// correctly implement the detection of probe queries.
namespace network {
namespace {
using MdnsResponderServiceError = network::MdnsResponderManager::ServiceError;
using MdnsResponderServiceError = MdnsResponderManager::ServiceError;
// RFC 6762, Section 6.
//
......@@ -74,6 +78,8 @@ const uint16_t kFlagCacheFlush = 0x8000;
// Maximum number of retries for the same response due to send failure.
const uint8_t kMaxMdnsResponseRetries = 2;
// The capacity of the send queue for packets blocked by an incomplete send.
const uint8_t kSendQueueCapacity = 100;
// Maximum delay allowed for per-response rate-limited responses.
const base::TimeDelta kMaxScheduledDelay = base::TimeDelta::FromSeconds(10);
......@@ -198,9 +204,25 @@ void ReportServiceError(MdnsResponderServiceError error) {
UMA_HISTOGRAM_ENUMERATION("NetworkService.MdnsResponder.ServiceError", error);
}
struct PendingPacket {
PendingPacket(scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option,
const base::TimeTicks& send_ready_time)
: buf(std::move(buf)),
option(std::move(option)),
send_ready_time(send_ready_time) {}
bool operator<(const PendingPacket& other) const {
return send_ready_time > other.send_ready_time;
}
scoped_refptr<net::IOBufferWithSize> buf;
scoped_refptr<MdnsResponseSendOption> option;
base::TimeTicks send_ready_time;
};
} // namespace
namespace network {
namespace mdns_helper {
......@@ -293,8 +315,8 @@ class MdnsResponderManager::SocketHandler {
bool Send(scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option);
void DoSend(scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option);
// Returns a net error code, or ERR_IO_PENDING if the IO is in progress.
int DoSend(PendingPacket pending_packet);
uint16_t id() const { return id_; }
......@@ -392,11 +414,11 @@ class MdnsResponderManager::SocketHandler::ResponseScheduler {
explicit ResponseScheduler(MdnsResponderManager::SocketHandler* handler)
: handler_(handler),
task_runner_(base::SequencedTaskRunnerHandle::Get()),
tick_clock_(base::DefaultTickClock::GetInstance()),
dispatch_timer_(std::make_unique<base::OneShotTimer>(tick_clock_)),
next_available_time_per_resp_sched_(tick_clock_->NowTicks()),
weak_factory_(this) {}
~ResponseScheduler() = default;
~ResponseScheduler() { dispatch_timer_->Stop(); }
// Implements the rate limit scheme on the underlying interface managed by
// |handler_|. Returns true if the send is scheduled on this interface.
......@@ -404,25 +426,28 @@ class MdnsResponderManager::SocketHandler::ResponseScheduler {
// Pending sends scheduled are cancelled after |handler_| becomes invalid;
bool ScheduleNextSend(scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option);
void OnResponseSent(scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option,
int result) {
void OnResponseSent(PendingPacket pending_packet, int result) {
DCHECK(send_pending_);
send_pending_ = false;
scoped_refptr<MdnsResponseSendOption>& option = pending_packet.option;
if (result < 0) {
VLOG(1) << "Socket send error, socket=" << handler_->id()
<< ", error=" << result;
if (CanBeRetriedAfterSendFailure(*option)) {
++option->num_send_retries_done;
handler_->DoSend(std::move(buf), std::move(option));
send_queue_.push(std::move(pending_packet));
} else {
VLOG(1) << "Response cannot be sent after " << kMaxMdnsResponseRetries
<< " retries.";
}
}
DispatchPendingPackets();
}
// Also resets the scheduler.
void SetTickClockForTesting(const base::TickClock* tick_clock) {
tick_clock_ = tick_clock;
dispatch_timer_ = std::make_unique<base::OneShotTimer>(tick_clock_);
next_available_time_per_resp_sched_ = tick_clock_->NowTicks();
next_available_time_for_name_.clear();
}
......@@ -454,6 +479,10 @@ class MdnsResponderManager::SocketHandler::ResponseScheduler {
ComputeResponseDelayAndUpdateNextAvailableTime(
RateLimitScheme rate_limit_scheme,
const MdnsResponseSendOption& option);
// Dispatches packets in the send queue serially with retries.
void DispatchPendingPackets();
// Determines if a response can be retried after send failure.
bool CanBeRetriedAfterSendFailure(const MdnsResponseSendOption& option) {
if (option.num_send_retries_done >= kMaxMdnsResponseRetries)
......@@ -472,10 +501,13 @@ class MdnsResponderManager::SocketHandler::ResponseScheduler {
// reference should be used to access the handler when there is no such
// guarantee in an operation.
MdnsResponderManager::SocketHandler* const handler_;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
const base::TickClock* tick_clock_;
std::unique_ptr<base::OneShotTimer> dispatch_timer_;
std::map<std::string, base::TimeTicks> next_available_time_for_name_;
base::TimeTicks next_available_time_per_resp_sched_;
bool send_pending_ = false;
// Packets with earlier ready time have higher priorities.
std::priority_queue<PendingPacket> send_queue_;
base::WeakPtrFactory<ResponseScheduler> weak_factory_;
......@@ -488,15 +520,13 @@ bool MdnsResponderManager::SocketHandler::Send(
return scheduler_->ScheduleNextSend(std::move(buf), std::move(option));
}
void MdnsResponderManager::SocketHandler::DoSend(
scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option) {
auto* buf_data = buf.get();
size_t buf_size = buf->size();
socket_->SendTo(buf_data, buf_size, multicast_addr_,
base::BindOnce(&ResponseScheduler::OnResponseSent,
scheduler_->GetWeakPtr(), std::move(buf),
std::move(option)));
int MdnsResponderManager::SocketHandler::DoSend(PendingPacket pending_packet) {
auto* buf_data = pending_packet.buf.get();
size_t buf_size = pending_packet.buf->size();
return socket_->SendTo(
buf_data, buf_size, multicast_addr_,
base::BindOnce(&ResponseScheduler::OnResponseSent,
scheduler_->GetWeakPtr(), std::move(pending_packet)));
}
void MdnsResponderManager::SocketHandler::SetTickClockForTesting(
......@@ -507,31 +537,35 @@ void MdnsResponderManager::SocketHandler::SetTickClockForTesting(
bool MdnsResponderManager::SocketHandler::ResponseScheduler::ScheduleNextSend(
scoped_refptr<net::IOBufferWithSize> buf,
scoped_refptr<MdnsResponseSendOption> option) {
if (send_queue_.size() >= kSendQueueCapacity) {
VLOG(1)
<< "mDNS packet discarded after reaching the capacity of send queue.";
return false;
}
auto rate_limit_scheme = GetRateLimitSchemeForClass(option->klass);
base::Optional<base::TimeDelta> delay;
if (rate_limit_scheme == RateLimitScheme::NO_LIMIT) {
// Skip the scheduling for this response. Currently the zero delay is only
// used for negative responses generated by the responder itself. Responses
// with positive name resolution generated by the responder and also those
// triggered via the Mojo connection (i.e. announcements and goodbye
// packets) are rate limited via the scheduled delay below.
handler_->DoSend(std::move(buf), std::move(option));
return true;
delay = base::TimeDelta();
} else {
// TODO(qingsi): The computation of the delay is done statically below at
// schedule-time. Change it to computing dynamically so that the delay is
// based on the time of the last send completion.
delay = ComputeResponseDelayAndUpdateNextAvailableTime(rate_limit_scheme,
*option);
}
const base::Optional<base::TimeDelta> delay =
ComputeResponseDelayAndUpdateNextAvailableTime(rate_limit_scheme,
*option);
if (!delay)
return false;
// Note that the owning handler of this scheduler may be removed if it
// encounters read error as we process in OnSocketHandlerReadError. We should
// guarantee any posted task can be cancelled if the handler goes away, which
// we do via the weak pointer.
task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&MdnsResponderManager::SocketHandler::DoSend,
handler_->GetWeakPtr(), std::move(buf), std::move(option)),
delay.value());
PendingPacket pending_packet(std::move(buf), std::move(option),
tick_clock_->NowTicks() + delay.value());
send_queue_.push(std::move(pending_packet));
DispatchPendingPackets();
return true;
}
......@@ -598,6 +632,39 @@ base::Optional<base::TimeDelta> MdnsResponderManager::SocketHandler::
return delay;
}
void MdnsResponderManager::SocketHandler::ResponseScheduler::
DispatchPendingPackets() {
while (!send_pending_ && !send_queue_.empty()) {
const auto now = tick_clock_->NowTicks();
const auto next_send_ready_time = send_queue_.top().send_ready_time;
if (now >= next_send_ready_time) {
auto pending_packet = std::move(send_queue_.top());
send_queue_.pop();
int rv = handler_->DoSend(std::move(pending_packet));
if (rv == net::ERR_IO_PENDING) {
send_pending_ = true;
} else if (rv < net::OK) {
VLOG(1) << "mDNS packet discarded due to socket send error, socket="
<< handler_->id() << ", error=" << rv;
}
} else {
// We have no packet due; post a task to flush the send queue later.
//
// Note that the owning handler of this scheduler may be removed if it
// encounters read error as we process in OnSocketHandlerReadError. We
// should guarantee any posted task can be cancelled if the scheduler goes
// away, which we do via the weak pointer.
const base::TimeDelta time_to_next_packet = next_send_ready_time - now;
dispatch_timer_->Start(
FROM_HERE, time_to_next_packet,
base::BindOnce(&MdnsResponderManager::SocketHandler::
ResponseScheduler::DispatchPendingPackets,
GetWeakPtr()));
return;
}
}
}
MdnsResponseSendOption::MdnsResponseSendOption() = default;
MdnsResponseSendOption::~MdnsResponseSendOption() = default;
......@@ -744,14 +811,17 @@ void MdnsResponderManager::OnMdnsQueryReceived(
void MdnsResponderManager::OnSocketHandlerReadError(uint16_t socket_handler_id,
int result) {
VLOG(1) << "Socket read error, socket=" << socket_handler_id
<< ", error=" << result;
if (IsNonFatalError(result))
return;
auto it = socket_handler_by_id_.find(socket_handler_id);
DCHECK(it != socket_handler_by_id_.end());
// It is safe to remove the handler in error since this error handler is
// invoked by the callback after the asynchronous return of RecvFrom, when the
// handler has exited the read loop.
socket_handler_by_id_.erase(it);
VLOG(1) << "Socket read error, socket=" << socket_handler_id
<< ", error=" << result;
if (socket_handler_by_id_.empty()) {
LOG(ERROR)
<< "All socket handlers failed. Restarting the mDNS responder manager.";
......@@ -761,6 +831,14 @@ void MdnsResponderManager::OnSocketHandlerReadError(uint16_t socket_handler_id,
}
}
bool MdnsResponderManager::IsNonFatalError(int result) {
DCHECK(result < net::OK);
if (result == net::ERR_MSG_TOO_BIG)
return true;
return false;
}
void MdnsResponderManager::SocketHandler::HandlePacket(int result) {
if (result <= 0)
return;
......
......@@ -198,6 +198,7 @@ class COMPONENT_EXPORT(NETWORK_SERVICE) MdnsResponderManager {
void OnMdnsQueryReceived(const net::DnsQuery& query,
uint16_t recv_socket_handler_id);
void OnSocketHandlerReadError(uint16_t socket_handler_id, int result);
bool IsNonFatalError(int result);
std::unique_ptr<net::MDnsSocketFactory> owned_socket_factory_;
net::MDnsSocketFactory* socket_factory_;
......
......@@ -31,10 +31,11 @@
namespace network {
namespace {
using ::testing::_;
using ::testing::AnyNumber;
using ::testing::Invoke;
using ::testing::NiceMock;
using ::testing::Return;
using ::testing::_;
using ServiceError = MdnsResponderManager::ServiceError;
const net::IPAddress kPublicAddrs[2] = {net::IPAddress(11, 11, 11, 11),
......@@ -107,6 +108,37 @@ class MockFailingMdnsSocketFactory : public net::MDnsSocketFactory {
return -1;
}
// Emulates IO blocking in sending packets if |BlockSend()| is called, in
// which case the completion callback is not invoked until |ResumeSend()| is
// called.
int MaybeBlockSend(const std::string& packet,
const std::string& address,
net::CompletionRepeatingCallback callback) {
OnSendTo(packet);
if (block_send_) {
blocked_packet_size_ = packet.size();
blocked_send_callback_ = std::move(callback);
} else {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce([](net::CompletionRepeatingCallback callback,
size_t packet_size) { callback.Run(packet_size); },
callback, packet.size()));
}
return -1;
}
void BlockSend() {
DCHECK(!block_send_);
block_send_ = true;
}
void ResumeSend() {
DCHECK(block_send_);
block_send_ = false;
blocked_send_callback_.Run(blocked_packet_size_);
}
// Emulates the asynchronous contract of invoking |callback| in the RecvFrom
// primitive but failed receiving;
int FailToRecv(net::IOBuffer* buffer,
......@@ -122,6 +154,9 @@ class MockFailingMdnsSocketFactory : public net::MDnsSocketFactory {
}
private:
bool block_send_ = false;
size_t blocked_packet_size_ = 0;
net::CompletionRepeatingCallback blocked_send_callback_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
};
......@@ -283,6 +318,14 @@ class MdnsResponderTest : public testing::Test {
Reset();
}
~MdnsResponderTest() {
// Goodbye messages are scheduled when the responder service |host_manager_|
// is destroyed and can be synchronously sent if the rate limiting permits.
// See ResponseScheduler::DispatchPendingPackets().
EXPECT_CALL(socket_factory_, OnSendTo(_)).Times(AnyNumber());
EXPECT_CALL(failing_socket_factory_, OnSendTo(_)).Times(AnyNumber());
}
void Reset(bool use_failing_socket_factory = false) {
client_[0].reset();
client_[1].reset();
......@@ -365,12 +408,12 @@ class MdnsResponderTest : public testing::Test {
base::test::ScopedTaskEnvironment scoped_task_environment_{
base::test::ScopedTaskEnvironment::MainThreadType::MOCK_TIME};
mojom::MdnsResponderPtr client_[2];
std::unique_ptr<MdnsResponderManager> host_manager_;
// Overrides the current thread task runner, so we can simulate the passage
// of time and avoid any actual sleeps.
NiceMock<net::MockMDnsSocketFactory> socket_factory_;
NiceMock<MockFailingMdnsSocketFactory> failing_socket_factory_;
mojom::MdnsResponderPtr client_[2];
std::unique_ptr<MdnsResponderManager> host_manager_;
std::string last_name_created_;
};
......@@ -1013,4 +1056,58 @@ TEST_F(MdnsResponderTest, ManagerCanRestartAfterAllSocketHandlersFailToRead) {
tester.ExpectTotalCount(kServiceErrorHistogram, 2);
}
// Test that sending packets on an interface can be blocked by an incomplete
// send on the same interface. Blocked packets are later flushed when sending is
// unblocked.
TEST_F(MdnsResponderTest, IncompleteSendBlocksFollowingSends) {
auto create_send_blocking_socket =
[this](std::vector<std::unique_ptr<net::DatagramServerSocket>>* sockets) {
auto socket =
std::make_unique<NiceMock<net::MockMDnsDatagramServerSocket>>(
net::ADDRESS_FAMILY_IPV4);
ON_CALL(*socket, SendToInternal(_, _, _))
.WillByDefault(
Invoke(&failing_socket_factory_,
&MockFailingMdnsSocketFactory::MaybeBlockSend));
ON_CALL(*socket, RecvFromInternal(_, _, _, _))
.WillByDefault(Return(-1));
sockets->push_back(std::move(socket));
};
EXPECT_CALL(failing_socket_factory_, CreateSockets(_))
.WillOnce(Invoke(create_send_blocking_socket));
Reset(true /* use_failing_socket_factory */);
const auto& addr1 = kPublicAddrs[0];
std::string expected_announcement1 =
CreateResolutionResponse(kDefaultTtl, {{"0.local", addr1}});
// Mocked CreateSockets above only creates one socket.
// We schedule to send the announcement for |kNumAnnouncementsPerInterface|
// times but the second announcement is blocked by the first one in this case.
EXPECT_CALL(failing_socket_factory_, OnSendTo(expected_announcement1))
.Times(1);
failing_socket_factory_.BlockSend();
const auto name1 = CreateNameForAddress(0, addr1);
RunUntilNoTasksRemain();
const auto& addr2 = kPublicAddrs[1];
std::string expected_announcement2 =
CreateResolutionResponse(kDefaultTtl, {{"1.local", addr2}});
// The announcement for the following name should also be blocked.
const auto name2 = CreateNameForAddress(0, addr2);
EXPECT_CALL(failing_socket_factory_, OnSendTo(expected_announcement2))
.Times(0);
RunUntilNoTasksRemain();
// We later unblock sending packets. Previously scheduled announcements should
// be flushed.
EXPECT_CALL(failing_socket_factory_, OnSendTo(expected_announcement1))
.Times(kNumAnnouncementsPerInterface - 1);
EXPECT_CALL(failing_socket_factory_, OnSendTo(expected_announcement2))
.Times(kNumAnnouncementsPerInterface);
failing_socket_factory_.ResumeSend();
RunUntilNoTasksRemain();
}
} // namespace network
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment