Merge 'message: messaging_service: fix topology_ignored for pending endpoints in get_rpc_client' from Kamil Braun

`get_rpc_client` calculates a `topology_ignored` field when creating a
client which says whether the client's endpoint had topology information
when this client was created. This is later used to check if that client
needs to be dropped and replaced with a new client which uses the
correct topology information.

The `topology_ignored` field was incorrectly calculated as `true` for
pending endpoints even though we had topology information for them. This
would lead to unnecessary drops of RPC clients later. Fix this.

Remove the default parameter for `with_pending` from
`topology::has_endpoint` to avoid similar bugs in the future.

Apparently this fixes #11780. The verbs used by decommission operation
use RPC client index 1 (see `do_get_rpc_client_idx` in
message/messaging_service.cc). From local testing with additional
logging I found that by the time this client is created (i.e. the first
verb in this group is used), we already know the topology. The node is
pending at that point - hence the bug would cause us to assume we don't
know the topology, leading us to dropping the RPC client later, possibly
in the middle of a decommission operation.

Fixes: #11780

Closes #11942

* github.com:scylladb/scylladb:
  message: messaging_service: check for known topology before calling is_same_dc/rack
  test: reenable test_topology::test_decommission_node_add_column
  test/pylib: util: configurable period in wait_for
  message: messaging_service: fix topology_ignored for pending endpoints in get_rpc_client
  message: messaging_service: topology independent connection settings for GOSSIP verbs
This commit is contained in:
Pavel Emelyanov
2022-11-17 20:14:32 +03:00
8 changed files with 100 additions and 39 deletions

View File

@@ -555,7 +555,7 @@ const std::unordered_map<inet_address, host_id>& token_metadata_impl::get_endpoi
}
bool token_metadata_impl::is_member(inet_address endpoint) const {
return _topology.has_endpoint(endpoint);
return _topology.has_endpoint(endpoint, topology::pending::no);
}
void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) {

View File

@@ -51,9 +51,10 @@ public:
void remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint
* Returns true iff contains given endpoint.
* Excludes pending endpoints if `with_pending == pending::no`.
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
bool has_endpoint(inet_address, pending with_pending) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&

View File

@@ -255,27 +255,21 @@ future<> messaging_service::start_listen(locator::shared_token_metadata& stm) {
return make_ready_future<>();
}
bool messaging_service::is_same_dc(inet_address addr) const {
// It's a "safety check". The token metadata pointer is nullptr before
bool messaging_service::topology_known_for(inet_address addr) const {
// The token metadata pointer is nullptr before
// the service is start_listen()-ed and after it's being shutdown()-ed.
// No new clients should appear in those period, but if they do it's
// better to classify them somehow. Telling that all endpoints live in
// different DCs/RACKs would make messaging apply the most restrictive
// compression/encryption rules which can be sub-optimal but not bad.
if (_token_metadata == nullptr) {
return false;
}
return _token_metadata
&& _token_metadata->get()->get_topology().has_endpoint(addr, locator::topology::pending::yes);
}
// Precondition: `topology_known_for(addr)`.
bool messaging_service::is_same_dc(inet_address addr) const {
const auto& topo = _token_metadata->get()->get_topology();
return topo.get_datacenter(addr) == topo.get_datacenter();
}
// Precondition: `topology_known_for(addr)`.
bool messaging_service::is_same_rack(inet_address addr) const {
// See comment in is_same_dc() about this check
if (_token_metadata == nullptr) {
return false;
}
const auto& topo = _token_metadata->get()->get_topology();
return topo.get_rack(addr) == topo.get_rack();
}
@@ -308,13 +302,13 @@ void messaging_service::do_start_listen() {
case encrypt_what::dc:
so.filter_connection = [this](const seastar::socket_address& caddr) {
auto addr = get_public_endpoint_for(caddr);
return is_same_dc(addr);
return topology_known_for(addr) && is_same_dc(addr);
};
break;
case encrypt_what::rack:
so.filter_connection = [this](const seastar::socket_address& caddr) {
auto addr = get_public_endpoint_for(caddr);
return is_same_dc(addr) && is_same_rack(addr);
return topology_known_for(addr) && is_same_dc(addr) && is_same_rack(addr);
};
break;
}
@@ -472,6 +466,24 @@ rpc::no_wait_type messaging_service::no_wait() {
return rpc::no_wait;
}
// The verbs using this RPC client use the following connection settings,
// regardless of whether the peer is in the same DC/Rack or not:
// - tcp_nodelay
// - encryption (unless completely disabled in config)
// - compression (unless completely disabled in config)
//
// The reason for having topology-independent setting for encryption is to ensure
// that gossiper verbs can reach the peer, even though the peer may not know our topology yet.
// See #11992 for detailed explanations.
//
// We also always want `tcp_nodelay` for gossiper verbs so they have low latency
// (and there's no advantage from batching verbs in this group anyway).
//
// And since we fixed a topology-independent setting for encryption and tcp_nodelay,
// to keep things simple, we also fix a setting for compression. This allows this RPC client
// to be established without checking the topology (which may not be known anyway
// when we first start gossiping).
static constexpr unsigned TOPOLOGY_INDEPENDENT_IDX = 0;
static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
// *_CONNECTION_COUNT constants needs to be updated after allocating a new index.
@@ -492,8 +504,11 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
case messaging_verb::GET_GROUP0_UPGRADE_STATE:
// ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay
// setting in get_rpc_client(), which assumes gossiper verbs live in idx 0
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
// DO NOT move GOSSIP_ verbs outside this group.
static_assert(TOPOLOGY_INDEPENDENT_IDX == 0);
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -654,7 +669,7 @@ gms::inet_address messaging_service::get_preferred_ip(gms::inet_address ep) {
auto it = _preferred_ip_cache.find(ep);
if (it != _preferred_ip_cache.end()) {
if (is_same_dc(ep)) {
if (topology_known_for(ep) && is_same_dc(ep)) {
return it->second;
}
}
@@ -715,7 +730,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
std::optional<bool> topology_status;
auto has_topology = [&] {
if (!topology_status.has_value()) {
topology_status = _token_metadata ? _token_metadata->get()->get_topology().has_endpoint(id.addr) : false;
topology_status = topology_known_for(id.addr);
}
return *topology_status;
};
@@ -724,12 +739,14 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
if (_cfg.encrypt == encrypt_what::none) {
return false;
}
if (_cfg.encrypt == encrypt_what::all || !has_topology()) {
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.encrypt == encrypt_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
// either rack/dc need to be in same dc to use non-tls
if (!is_same_dc(id.addr)) {
if (!has_topology() || !is_same_dc(id.addr)) {
return true;
}
@@ -758,22 +775,21 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
return false;
}
if (_cfg.compress == compress_what::all || !has_topology()) {
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.compress == compress_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
return !is_same_dc(id.addr);
return !has_topology() || !is_same_dc(id.addr);
}();
auto must_tcp_nodelay = [&] {
if (idx == 0) {
return true; // gossip
}
if (_cfg.tcp_nodelay == tcp_nodelay_what::all || !has_topology()) {
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.tcp_nodelay == tcp_nodelay_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
return is_same_dc(id.addr);
return !has_topology() || is_same_dc(id.addr);
}();
auto addr = get_preferred_ip(id.addr);
@@ -795,7 +811,13 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
::make_shared<rpc_protocol_client_wrapper>(_rpc->protocol(), std::move(opts),
remote_addr, laddr);
bool topology_ignored = topology_status.has_value() ? *topology_status == false : false;
// Remember if we had the peer's topology information when creating the client;
// if not, we shall later drop the client and create a new one after we learn the peer's
// topology (so we can use optimal encryption settings and so on for intra-dc/rack messages).
// But we don't want to apply this logic for TOPOLOGY_INDEPENDENT_IDX client - its settings
// are independent of topology, so there's no point in dropping it later after we learn
// the topology (so we always set `topology_ignored` to `false` in that case).
bool topology_ignored = idx != TOPOLOGY_INDEPENDENT_IDX && topology_status.has_value() && *topology_status == false;
auto res = _clients[idx].emplace(id, shard_info(std::move(client), topology_ignored));
assert(res.second);
it = res.first;

View File

@@ -504,6 +504,7 @@ private:
void find_and_remove_client(clients_map& clients, msg_addr id, Fn&& filter);
void do_start_listen();
bool topology_known_for(inet_address) const;
bool is_same_dc(inet_address ep) const;
bool is_same_rack(inet_address ep) const;

View File

@@ -64,6 +64,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
@@ -2583,6 +2584,8 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
} else if (req.cmd == node_ops_cmd::removenode_abort) {
node_ops_abort(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
utils::get_local_injector().inject(
"storage_service_decommission_prepare_handler_sleep", std::chrono::milliseconds{1500}).get();
if (req.leaving_nodes.size() > 1) {
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
slogger.warn("{}", msg);
@@ -3612,6 +3615,9 @@ future<> storage_service::notify_joined(inet_address endpoint) {
co_return;
}
co_await utils::get_local_injector().inject(
"storage_service_notify_joined_sleep", std::chrono::milliseconds{500});
co_await container().invoke_on_all([endpoint] (auto&& ss) {
ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0});
return ss._lifecycle_notifier.notify_joined(endpoint);

View File

@@ -177,6 +177,12 @@ class ScyllaRESTAPIClient():
assert(type(data) == int)
return data
async def get_joining_nodes(self, node_ip: str) -> list:
"""Get the list of joining nodes according to `node_ip`."""
data = await self.client.get_json(f"/storage_service/nodes/joining", host=node_ip)
assert(type(data) == list)
return data
async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None:
"""Enable error injection named `injection` on `node_ip`. Depending on `one_shot`,
the injection will be executed only once or every time the process passes the injection point.

View File

@@ -20,13 +20,15 @@ def unique_name():
return unique_name_prefix + str(current_ms)
async def wait_for(pred: Callable[[], Awaitable[Optional[T]]], deadline: float) -> T:
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float, period: float = 1) -> T:
while True:
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
return res
await asyncio.sleep(1)
await asyncio.sleep(period)
unique_name.last_ms = 0

View File

@@ -10,6 +10,9 @@ import pytest
import logging
import asyncio
import random
import time
from test.pylib.util import wait_for
logger = logging.getLogger(__name__)
@@ -57,13 +60,33 @@ async def test_remove_node_add_column(manager, random_tables):
@pytest.mark.asyncio
@pytest.mark.skip(reason="Flaky due to #11780")
async def test_decommission_node_add_column(manager, random_tables):
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.decommission_node(servers[1].server_id) # Decommission [1]
servers = await manager.running_servers()
decommission_target = servers[1]
# The sleep injections significantly increase the probability of reproducing #11780:
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
# enters sleep before calling storage_service::notify_joined
# 3. we start decommission on decommission_target
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
# 5. bootstrapped_server receives the RPC and enters sleep
# 6. decommission_target handle_state_normal wakes up,
# calls storage_service::notify_joined which drops some RPC clients
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add()
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
await manager.api.enable_injection(
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
await manager.decommission_node(decommission_target.server_id)
await table.add_column()
await random_tables.verify_schema()