From 840be34b5f067841b09d1d263ccf60f13b8c926f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 15 Nov 2022 18:00:37 +0100 Subject: [PATCH 1/5] message: messaging_service: topology independent connection settings for GOSSIP verbs The gossip verbs are used to learn about topology of other nodes. If inter-dc/rack encryption is enabled, the knowledge of topology is necessary to decide whether it's safe to send unencrypted messages to nodes (i.e., whether the destination lies in the same dc/rack). The logic in `messaging_service::get_rpc_client`, which decided whether a connection must be encrypted, was this (given that encryption is enabled): if the topology of the peer is known, and the peer is in the same dc/rack, don't encrypt. Otherwise encrypt. However, it may happen that node A knows node B's topology, but B doesn't know A's topology. A deduces that B is in the same DC and rack and tries sending B an unencrypted message. As the code currently stands, this would cause B to call `on_internal_error`. This is what I encountered when attempting to fix #11780. To guarantee that it's always possible to deliver gossiper verbs (even if one or both sides don't know each other's topology), and to simplify reasoning about the system in general, choose connection settings that are independent of the topology - for the connection used by gossiper verbs (other connections are still topology-dependent and use complex logic to handle the situation of unknown-and-later-known topology). This connection only contains 'rare' and 'cheap' verbs, so it's not a performance problem to always encrypt it (given that encryption is configured). And this is what already was happening in the past; it was at some point removed during topology knowledge management refactors. We just bring this logic back. Fixes #11992. Inspired by xemul/scylla@45d48f3d02fd48c6d186cc955cf83d747ac080b9. --- message/messaging_service.cc | 52 +++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index f9982c09c4..837d85cc30 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -472,6 +472,24 @@ rpc::no_wait_type messaging_service::no_wait() { return rpc::no_wait; } +// The verbs using this RPC client use the following connection settings, +// regardless of whether the peer is in the same DC/Rack or not: +// - tcp_nodelay +// - encryption (unless completely disabled in config) +// - compression (unless completely disabled in config) +// +// The reason for having topology-independent setting for encryption is to ensure +// that gossiper verbs can reach the peer, even though the peer may not know our topology yet. +// See #11992 for detailed explanations. +// +// We also always want `tcp_nodelay` for gossiper verbs so they have low latency +// (and there's no advantage from batching verbs in this group anyway). +// +// And since we fixed a topology-independent setting for encryption and tcp_nodelay, +// to keep things simple, we also fix a setting for compression. This allows this RPC client +// to be established without checking the topology (which may not be known anyway +// when we first start gossiping). +static constexpr unsigned TOPOLOGY_INDEPENDENT_IDX = 0; static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { // *_CONNECTION_COUNT constants needs to be updated after allocating a new index. @@ -492,8 +510,11 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::GROUP0_PEER_EXCHANGE: case messaging_verb::GROUP0_MODIFY_CONFIG: case messaging_verb::GET_GROUP0_UPGRADE_STATE: - // ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay - // setting in get_rpc_client(), which assumes gossiper verbs live in idx 0 + // See comment above `TOPOLOGY_INDEPENDENT_IDX`. + // DO NOT put any 'hot' (e.g. data path) verbs in this group, + // only verbs which are 'rare' and 'cheap'. + // DO NOT move GOSSIP_ verbs outside this group. + static_assert(TOPOLOGY_INDEPENDENT_IDX == 0); return 0; case messaging_verb::PREPARE_MESSAGE: case messaging_verb::PREPARE_DONE_MESSAGE: @@ -719,12 +740,14 @@ shared_ptr messaging_service::ge if (_cfg.encrypt == encrypt_what::none) { return false; } - if (_cfg.encrypt == encrypt_what::all || !has_topology()) { + + // See comment above `TOPOLOGY_INDEPENDENT_IDX`. + if (_cfg.encrypt == encrypt_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) { return true; } // either rack/dc need to be in same dc to use non-tls - if (!is_same_dc(id.addr)) { + if (!has_topology() || !is_same_dc(id.addr)) { return true; } @@ -753,22 +776,21 @@ shared_ptr messaging_service::ge return false; } - if (_cfg.compress == compress_what::all || !has_topology()) { + // See comment above `TOPOLOGY_INDEPENDENT_IDX`. + if (_cfg.compress == compress_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) { return true; } - return !is_same_dc(id.addr); + return !has_topology() || !is_same_dc(id.addr); }(); auto must_tcp_nodelay = [&] { - if (idx == 0) { - return true; // gossip - } - if (_cfg.tcp_nodelay == tcp_nodelay_what::all || !has_topology()) { + // See comment above `TOPOLOGY_INDEPENDENT_IDX`. + if (_cfg.tcp_nodelay == tcp_nodelay_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) { return true; } - return is_same_dc(id.addr); + return !has_topology() || is_same_dc(id.addr); }(); auto addr = get_preferred_ip(id.addr); @@ -790,7 +812,13 @@ shared_ptr messaging_service::ge ::make_shared(_rpc->protocol(), std::move(opts), remote_addr, laddr); - bool topology_ignored = topology_status.has_value() ? *topology_status == false : false; + // Remember if we had the peer's topology information when creating the client; + // if not, we shall later drop the client and create a new one after we learn the peer's + // topology (so we can use optimal encryption settings and so on for intra-dc/rack messages). + // But we don't want to apply this logic for TOPOLOGY_INDEPENDENT_IDX client - its settings + // are independent of topology, so there's no point in dropping it later after we learn + // the topology (so we always set `topology_ignored` to `false` in that case). + bool topology_ignored = idx != TOPOLOGY_INDEPENDENT_IDX && topology_status.has_value() && *topology_status == false; auto res = _clients[idx].emplace(id, shard_info(std::move(client), topology_ignored)); assert(res.second); it = res.first; From 1bd2471c190af9b75a671c75bc4930ce05fd1c0f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 9 Nov 2022 13:47:27 +0100 Subject: [PATCH 2/5] message: messaging_service: fix topology_ignored for pending endpoints in get_rpc_client `get_rpc_client` calculates a `topology_ignored` field when creating a client which says whether the client's endpoint had topology information when topology was created. This is later used to check if that client needs to be dropped and replaced with a new client which uses the correct topology information. The `topology_ignored` field was incorrectly calculated as `true` for pending endpoints even though we had topology information for them. This would lead to unnecessary drops of RPC clients later. Fix this. Remove the default parameter for `with_pending` from `topology::has_endpoint` to avoid similar bugs in the future. Apparently this fixes #11780. The verbs used by decommission operation use RPC client index 1 (see `do_get_rpc_client_idx` in message/messaging_service.cc). From local testing with additional logging I found that by the time this client is created (i.e. the first verb in this group is used), we already know the topology. The node is pending at that point - hence the bug would cause us to assume we don't know the topology, leading us to dropping the RPC client later, possibly in the middle of a decommission operation. Fixes: #11780 --- locator/token_metadata.cc | 2 +- locator/token_metadata.hh | 5 +++-- message/messaging_service.cc | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 887c42dc40..60d5e73c48 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -562,7 +562,7 @@ const std::unordered_map& token_metadata_impl::get_endpoi } bool token_metadata_impl::is_member(inet_address endpoint) const { - return _topology.has_endpoint(endpoint); + return _topology.has_endpoint(endpoint, topology::pending::no); } void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index a3cfc43064..68e8680b77 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -67,9 +67,10 @@ public: void remove_endpoint(inet_address ep); /** - * Returns true iff contains given endpoint + * Returns true iff contains given endpoint. + * Excludes pending endpoints if `with_pending == pending::no`. */ - bool has_endpoint(inet_address, pending with_pending = pending::no) const; + bool has_endpoint(inet_address, pending with_pending) const; const std::unordered_map>& diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 837d85cc30..d04d7690bb 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -731,7 +731,9 @@ shared_ptr messaging_service::ge std::optional topology_status; auto has_topology = [&] { if (!topology_status.has_value()) { - topology_status = _token_metadata ? _token_metadata->get()->get_topology().has_endpoint(id.addr) : false; + topology_status = _token_metadata + ? _token_metadata->get()->get_topology().has_endpoint(id.addr, locator::topology::pending::yes) + : false; } return *topology_status; }; From 0f49813312f119315f6dcbf4df538c3f44e41b8f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 9 Nov 2022 15:38:35 +0100 Subject: [PATCH 3/5] test/pylib: util: configurable period in wait_for --- test/pylib/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/pylib/util.py b/test/pylib/util.py index bc06b796cd..c3099dc271 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -20,13 +20,15 @@ def unique_name(): return unique_name_prefix + str(current_ms) -async def wait_for(pred: Callable[[], Awaitable[Optional[T]]], deadline: float) -> T: +async def wait_for( + pred: Callable[[], Awaitable[Optional[T]]], + deadline: float, period: float = 1) -> T: while True: assert(time.time() < deadline), "Deadline exceeded, failing test." res = await pred() if res is not None: return res - await asyncio.sleep(1) + await asyncio.sleep(period) unique_name.last_ms = 0 From 9b2449d3eab30f99083ffde6afade251dfd7d878 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 10 Nov 2022 10:22:26 +0100 Subject: [PATCH 4/5] test: reenable test_topology::test_decommission_node_add_column Also improve the test to increase the probability of reproducing #11780 by injecting sleeps in appropriate places. Without the fix for #11780 from the earlier commit, the test reproduces the issue in roughly half of all runs in dev build on my laptop. --- service/storage_service.cc | 6 ++++++ test/pylib/rest_client.py | 6 ++++++ test/topology/test_topology.py | 31 +++++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8e5d9c09e1..4254c9704e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -64,6 +64,7 @@ #include #include #include "utils/stall_free.hh" +#include "utils/error_injection.hh" #include #include @@ -2584,6 +2585,8 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } else if (req.cmd == node_ops_cmd::removenode_abort) { node_ops_abort(ops_uuid).get(); } else if (req.cmd == node_ops_cmd::decommission_prepare) { + utils::get_local_injector().inject( + "storage_service_decommission_prepare_handler_sleep", std::chrono::milliseconds{1500}).get(); if (req.leaving_nodes.size() > 1) { auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); slogger.warn("{}", msg); @@ -3613,6 +3616,9 @@ future<> storage_service::notify_joined(inet_address endpoint) { co_return; } + co_await utils::get_local_injector().inject( + "storage_service_notify_joined_sleep", std::chrono::milliseconds{500}); + co_await container().invoke_on_all([endpoint] (auto&& ss) { ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0}); return ss._lifecycle_notifier.notify_joined(endpoint); diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 1211eee481..268e51fe51 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -177,6 +177,12 @@ class ScyllaRESTAPIClient(): assert(type(data) == int) return data + async def get_joining_nodes(self, node_ip: str) -> list: + """Get the list of joining nodes according to `node_ip`.""" + data = await self.client.get_json(f"/storage_service/nodes/joining", host=node_ip) + assert(type(data) == list) + return data + async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None: """Enable error injection named `injection` on `node_ip`. Depending on `one_shot`, the injection will be executed only once or every time the process passes the injection point. diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py index b1f8074557..d198e3a18f 100644 --- a/test/topology/test_topology.py +++ b/test/topology/test_topology.py @@ -10,6 +10,9 @@ import pytest import logging import asyncio import random +import time + +from test.pylib.util import wait_for logger = logging.getLogger(__name__) @@ -57,13 +60,33 @@ async def test_remove_node_add_column(manager, random_tables): @pytest.mark.asyncio -@pytest.mark.skip(reason="Flaky due to #11780") async def test_decommission_node_add_column(manager, random_tables): """Add a node, remove an original node, add a column""" - servers = await manager.running_servers() table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await manager.decommission_node(servers[1].server_id) # Decommission [1] + servers = await manager.running_servers() + decommission_target = servers[1] + # The sleep injections significantly increase the probability of reproducing #11780: + # 1. bootstrapped_server finishes bootstrapping and enters NORMAL state + # 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server), + # enters sleep before calling storage_service::notify_joined + # 3. we start decommission on decommission_target + # 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server + # 5. bootstrapped_server receives the RPC and enters sleep + # 6. decommission_target handle_state_normal wakes up, + # calls storage_service::notify_joined which drops some RPC clients + # 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail + await manager.api.enable_injection( + decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True) + bootstrapped_server = await manager.server_add() + async def no_joining_nodes(): + joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr) + return not joining_nodes + # Wait until decommission_target thinks that bootstrapped_server is NORMAL + # note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal + await wait_for(no_joining_nodes, time.time() + 30, period=.1) + await manager.api.enable_injection( + bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True) + await manager.decommission_node(decommission_target.server_id) await table.add_column() await random_tables.verify_schema() From a83789160d0515617c2b87935a31cf65ad569894 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 15 Nov 2022 18:18:11 +0100 Subject: [PATCH 5/5] message: messaging_service: check for known topology before calling is_same_dc/rack `is_same_dc` and `is_same_rack` assume that the peer's topology is known. If it's unknown, `on_internal_error` will be called inside topology. When these functions are used in `get_rpc_client`, they are already protected by an earlier check for knowing the peer's topology (the `has_topology()` lambda). Another use is in `do_start_listen()`, where we create a filter for RPC module to check if it should accept incoming connections. If cross-dc or cross-rack encryption is enabled, we will reject connections attempts to the regular (non-ssl) port from other dcs/rack using `is_same_dc/rack`. However, it might happen that something (other Scylla node or otherwise) tries to contact us on the regular port and we don't know that thing's topology, which would result in `on_internal_error`. But this is not a fatal error; we simply want to reject that connection. So protect these calls as well. Finally, there's `get_preferred_ip` with an unprotected `is_same_dc` call which, for a given peer, may return a different IP from preferred IP cache if the endpoint resides in the same DC. If there is not entry in the preferred IP cache, we return the original (external) IP of the peer. We can do the same if we don't know the peer's topology. It's interesting that we didn't see this particular place blowing up. Perhaps the preferred IP cache is always populated after we know the topology. --- message/messaging_service.cc | 32 ++++++++++++-------------------- message/messaging_service.hh | 1 + 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index d04d7690bb..dc95842826 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -255,27 +255,21 @@ future<> messaging_service::start_listen(locator::shared_token_metadata& stm) { return make_ready_future<>(); } -bool messaging_service::is_same_dc(inet_address addr) const { - // It's a "safety check". The token metadata pointer is nullptr before +bool messaging_service::topology_known_for(inet_address addr) const { + // The token metadata pointer is nullptr before // the service is start_listen()-ed and after it's being shutdown()-ed. - // No new clients should appear in those period, but if they do it's - // better to classify them somehow. Telling that all endpoints live in - // different DCs/RACKs would make messaging apply the most restrictive - // compression/encryption rules which can be sub-optimal but not bad. - if (_token_metadata == nullptr) { - return false; - } + return _token_metadata + && _token_metadata->get()->get_topology().has_endpoint(addr, locator::topology::pending::yes); +} +// Precondition: `topology_known_for(addr)`. +bool messaging_service::is_same_dc(inet_address addr) const { const auto& topo = _token_metadata->get()->get_topology(); return topo.get_datacenter(addr) == topo.get_datacenter(); } +// Precondition: `topology_known_for(addr)`. bool messaging_service::is_same_rack(inet_address addr) const { - // See comment in is_same_dc() about this check - if (_token_metadata == nullptr) { - return false; - } - const auto& topo = _token_metadata->get()->get_topology(); return topo.get_rack(addr) == topo.get_rack(); } @@ -308,13 +302,13 @@ void messaging_service::do_start_listen() { case encrypt_what::dc: so.filter_connection = [this](const seastar::socket_address& caddr) { auto addr = get_public_endpoint_for(caddr); - return is_same_dc(addr); + return topology_known_for(addr) && is_same_dc(addr); }; break; case encrypt_what::rack: so.filter_connection = [this](const seastar::socket_address& caddr) { auto addr = get_public_endpoint_for(caddr); - return is_same_dc(addr) && is_same_rack(addr); + return topology_known_for(addr) && is_same_dc(addr) && is_same_rack(addr); }; break; } @@ -675,7 +669,7 @@ gms::inet_address messaging_service::get_preferred_ip(gms::inet_address ep) { auto it = _preferred_ip_cache.find(ep); if (it != _preferred_ip_cache.end()) { - if (is_same_dc(ep)) { + if (topology_known_for(ep) && is_same_dc(ep)) { return it->second; } } @@ -731,9 +725,7 @@ shared_ptr messaging_service::ge std::optional topology_status; auto has_topology = [&] { if (!topology_status.has_value()) { - topology_status = _token_metadata - ? _token_metadata->get()->get_topology().has_endpoint(id.addr, locator::topology::pending::yes) - : false; + topology_status = topology_known_for(id.addr); } return *topology_status; }; diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 9395966366..3814c19747 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -504,6 +504,7 @@ private: void find_and_remove_client(clients_map& clients, msg_addr id, Fn&& filter); void do_start_listen(); + bool topology_known_for(inet_address) const; bool is_same_dc(inet_address ep) const; bool is_same_rack(inet_address ep) const;