From 429f110110d23b74d893eaace55e5297ba60abc7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 25 Jul 2022 10:14:38 +0300 Subject: [PATCH 1/5] test: rest_api: test range_to_endpoint_map and describe_ring Signed-off-by: Benny Halevy --- test/rest_api/test_storage_service.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/rest_api/test_storage_service.py b/test/rest_api/test_storage_service.py index 41320c0cb7..fb717f9abb 100644 --- a/test/rest_api/test_storage_service.py +++ b/test/rest_api/test_storage_service.py @@ -386,3 +386,16 @@ def test_materialized_view_pre_scrub_snapshot(cql, this_dc, rest_api): with new_secondary_index(cql, table, 'v') as si: resp = rest_api.send("GET", f"storage_service/keyspace_scrub/{keyspace}") resp.raise_for_status() + +def test_range_to_endpoint_map(cql, this_dc, rest_api): + with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: + resp = rest_api.send("GET", f"storage_service/range_to_endpoint_map/{keyspace}") + resp.raise_for_status() + +def test_describe_ring(cql, this_dc, rest_api): + resp = rest_api.send("GET", "storage_service/describe_ring") + resp.raise_for_status() + + with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: + resp = rest_api.send("GET", f"storage_service/describe_ring/{keyspace}") + resp.raise_for_status() From 0b474866a37f9439bb63fc49f22b06f4902395b9 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 25 Jul 2022 10:24:03 +0300 Subject: [PATCH 2/5] storage_service: get_range_to_address_map: move selection of arbitrary ks to api layer It is only needed for the "storage_service/describe_ring" api and service/storage_service shouldn't bother with it. It's an api sugar coating. Signed-off-by: Benny Halevy --- api/storage_service.cc | 8 +++++++- service/storage_service.cc | 9 --------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 4679a1a280..1ffedf2809 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -546,7 +546,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { - return describe_ring_as_json(ss, ""); + // Find an arbitrary non-system keyspace. + auto keyspaces = ctx.db.local().get_non_system_keyspaces(); + if (keyspaces.empty()) { + throw std::runtime_error("No keyspace provided and no non system kespace exist"); + } + auto ks = keyspaces[0]; + return describe_ring_as_json(ss, ks); }); ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 2e4b7a9bf4..268e8c3a4c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -780,15 +780,6 @@ std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const { sstring ks = keyspace; - // some people just want to get a visual representation of things. Allow null and set it to the first - // non-system keyspace. - if (keyspace == "") { - auto keyspaces = _db.local().get_non_system_keyspaces(); - if (keyspaces.empty()) { - throw std::runtime_error("No keyspace provided and no non system kespace exist"); - } - ks = keyspaces[0]; - } return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens)); } From 3d62a1592f2e241f63a24b843f86e44907f8500c Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 25 Jul 2022 09:52:24 +0300 Subject: [PATCH 3/5] storage_service: pass replication map to get_range_to_address_map and friends Before they are made asynchronous in the next patch, so they work on a coherent snapshot of the token_metadata and replication map as their caller. Signed-off-by: Benny Halevy --- service/storage_service.cc | 29 ++++++++++++++++------------- service/storage_service.hh | 9 +++++---- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 268e8c3a4c..2dd0eb1782 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -747,13 +747,18 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace) const { - return get_range_to_address_map(keyspace, get_token_metadata().sorted_tokens()); + return get_range_to_address_map(_db.local().find_keyspace(keyspace).get_effective_replication_map()); +} + +std::unordered_map +storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const { + return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens()); } std::unordered_map storage_service::get_range_to_address_map_in_local_dc( - const sstring& keyspace) const { - auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); + locator::effective_replication_map_ptr erm) const { + auto orig_map = get_range_to_address_map(erm, get_tokens_in_local_dc(*erm->get_token_metadata_ptr())); std::unordered_map filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; @@ -765,9 +770,8 @@ storage_service::get_range_to_address_map_in_local_dc( } std::vector -storage_service::get_tokens_in_local_dc() const { +storage_service::get_tokens_in_local_dc(const locator::token_metadata& tm) const { std::vector filtered_tokens; - const auto& tm = get_token_metadata(); for (auto token : tm.sorted_tokens()) { auto endpoint = tm.get_endpoint(token); if (db::is_local(*endpoint)) @@ -777,10 +781,9 @@ storage_service::get_tokens_in_local_dc() const { } std::unordered_map -storage_service::get_range_to_address_map(const sstring& keyspace, +storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm, const std::vector& sorted_tokens) const { - sstring ks = keyspace; - return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens)); + return construct_range_to_endpoint_map(erm, get_all_ranges(sorted_tokens)); } future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node) { @@ -3049,11 +3052,12 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ std::vector ranges; //Token.TokenFactory tf = getPartitioner().getTokenFactory(); + auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map(); std::unordered_map range_to_address_map = include_only_local_dc - ? get_range_to_address_map_in_local_dc(keyspace) - : get_range_to_address_map(keyspace); - auto tmptr = get_token_metadata_ptr(); + ? get_range_to_address_map_in_local_dc(erm) + : get_range_to_address_map(erm); + auto tmptr = erm->get_token_metadata_ptr(); for (auto entry : range_to_address_map) { const auto& topology = tmptr->get_topology(); auto range = entry.first; @@ -3098,10 +3102,9 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ std::unordered_map storage_service::construct_range_to_endpoint_map( - const sstring& keyspace, + locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { std::unordered_map res; - auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map(); for (auto r : ranges) { res[r] = erm->get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); diff --git a/service/storage_service.hh b/service/storage_service.hh index 515c674543..5081d3e103 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -385,13 +385,14 @@ public: sstring get_rpc_address(const inet_address& endpoint) const; std::unordered_map get_range_to_address_map(const sstring& keyspace) const; + std::unordered_map get_range_to_address_map(locator::effective_replication_map_ptr erm) const; std::unordered_map get_range_to_address_map_in_local_dc( - const sstring& keyspace) const; + locator::effective_replication_map_ptr erm) const; - std::vector get_tokens_in_local_dc() const; + std::vector get_tokens_in_local_dc(const locator::token_metadata&) const; - std::unordered_map get_range_to_address_map(const sstring& keyspace, + std::unordered_map get_range_to_address_map(locator::effective_replication_map_ptr erm, const std::vector& sorted_tokens) const; /** @@ -425,7 +426,7 @@ public: * @return mapping of ranges to the replicas responsible for them. */ std::unordered_map construct_range_to_endpoint_map( - const sstring& keyspace, + locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; public: virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; From 5eb31eff644a1ddad4d67dae97839b0352f1f2ec Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 24 Jul 2022 14:50:02 +0300 Subject: [PATCH 4/5] storage_service: coroutinize get_range_to_address_map and friends And add calls to maybe_yield to prevent stalls in this path as seen in performance testing. Also, add a respective rest_api test. Fixes #11114 Signed-off-by: Benny Halevy --- api/storage_service.cc | 6 +++--- service/storage_service.cc | 41 +++++++++++++++++++++++--------------- service/storage_service.hh | 14 ++++++------- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 1ffedf2809..c44509c7c0 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -514,10 +514,10 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { + ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr req) -> future { auto keyspace = validate_keyspace(ctx, req->param); std::vector res; - return make_ready_future(stream_range_as_array(ss.local().get_range_to_address_map(keyspace), + co_return stream_range_as_array(co_await ss.local().get_range_to_address_map(keyspace), [](const std::pair& entry){ ss::maplist_mapper m; if (entry.first.start()) { @@ -534,7 +534,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 2dd0eb1782..28152d3f5c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -745,45 +745,49 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { return boost::lexical_cast(endpoint); } -std::unordered_map +future> storage_service::get_range_to_address_map(const sstring& keyspace) const { return get_range_to_address_map(_db.local().find_keyspace(keyspace).get_effective_replication_map()); } -std::unordered_map +future> storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const { return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens()); } -std::unordered_map +future> storage_service::get_range_to_address_map_in_local_dc( locator::effective_replication_map_ptr erm) const { - auto orig_map = get_range_to_address_map(erm, get_tokens_in_local_dc(*erm->get_token_metadata_ptr())); + auto orig_map = co_await get_range_to_address_map(erm, co_await get_tokens_in_local_dc(*erm->get_token_metadata_ptr())); std::unordered_map filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), db::is_local); + co_await coroutine::maybe_yield(); } - return filtered_map; + co_return filtered_map; } -std::vector +// Caller is responsible to hold token_metadata valid until the returned future is resolved +future> storage_service::get_tokens_in_local_dc(const locator::token_metadata& tm) const { std::vector filtered_tokens; for (auto token : tm.sorted_tokens()) { auto endpoint = tm.get_endpoint(token); if (db::is_local(*endpoint)) filtered_tokens.push_back(token); + co_await coroutine::maybe_yield(); } - return filtered_tokens; + co_return filtered_tokens; } -std::unordered_map +// Caller is responsible to hold token_metadata valid until the returned future is resolved +future> storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm, const std::vector& sorted_tokens) const { - return construct_range_to_endpoint_map(erm, get_all_ranges(sorted_tokens)); + co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens)); } future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node) { @@ -3053,10 +3057,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ //Token.TokenFactory tf = getPartitioner().getTokenFactory(); auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map(); - std::unordered_map range_to_address_map = + std::unordered_map range_to_address_map = co_await ( include_only_local_dc ? get_range_to_address_map_in_local_dc(erm) - : get_range_to_address_map(erm); + : get_range_to_address_map(erm) + ); auto tmptr = erm->get_token_metadata_ptr(); for (auto entry : range_to_address_map) { const auto& topology = tmptr->get_topology(); @@ -3100,7 +3105,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ co_return ranges; } -std::unordered_map +future> storage_service::construct_range_to_endpoint_map( locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { @@ -3108,8 +3113,9 @@ storage_service::construct_range_to_endpoint_map( for (auto r : ranges) { res[r] = erm->get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); + co_await coroutine::maybe_yield(); } - return res; + co_return res; } @@ -3361,20 +3367,23 @@ storage_service::get_ranges_for_endpoint(const sstring& name, const gms::inet_ad return _db.local().find_keyspace(name).get_effective_replication_map()->get_ranges(ep); } -dht::token_range_vector +// Caller is responsible to hold token_metadata valid until the returned future is resolved +future storage_service::get_all_ranges(const std::vector& sorted_tokens) const { if (sorted_tokens.empty()) - return dht::token_range_vector(); + co_return dht::token_range_vector(); int size = sorted_tokens.size(); dht::token_range_vector ranges; ranges.push_back(dht::token_range::make_ending_with(range_bound(sorted_tokens[0], true))); + co_await coroutine::maybe_yield(); for (int i = 1; i < size; ++i) { dht::token_range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); ranges.push_back(r); + co_await coroutine::maybe_yield(); } ranges.push_back(dht::token_range::make_starting_with(range_bound(sorted_tokens[size-1], false))); - return ranges; + co_return ranges; } inet_address_vector_replica_set diff --git a/service/storage_service.hh b/service/storage_service.hh index 5081d3e103..d4ae660af6 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -384,15 +384,15 @@ public: */ sstring get_rpc_address(const inet_address& endpoint) const; - std::unordered_map get_range_to_address_map(const sstring& keyspace) const; - std::unordered_map get_range_to_address_map(locator::effective_replication_map_ptr erm) const; + future> get_range_to_address_map(const sstring& keyspace) const; + future> get_range_to_address_map(locator::effective_replication_map_ptr erm) const; - std::unordered_map get_range_to_address_map_in_local_dc( + future> get_range_to_address_map_in_local_dc( locator::effective_replication_map_ptr erm) const; - std::vector get_tokens_in_local_dc(const locator::token_metadata&) const; + future> get_tokens_in_local_dc(const locator::token_metadata&) const; - std::unordered_map get_range_to_address_map(locator::effective_replication_map_ptr erm, + future> get_range_to_address_map(locator::effective_replication_map_ptr erm, const std::vector& sorted_tokens) const; /** @@ -425,7 +425,7 @@ public: * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - std::unordered_map construct_range_to_endpoint_map( + future> construct_range_to_endpoint_map( locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; public: @@ -634,7 +634,7 @@ public: * ranges. * @return ranges in sorted order */ - dht::token_range_vector get_all_ranges(const std::vector& sorted_tokens) const; + future get_all_ranges(const std::vector& sorted_tokens) const; /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication. From bc5f6cf45dfd006fe1747f748339a6ed227d28d7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 24 Jul 2022 15:02:24 +0300 Subject: [PATCH 5/5] storage_service: reserve space in get_range_to_address_map and friends To reduce the chance of reallocation. Signed-off-by: Benny Halevy --- service/storage_service.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 28152d3f5c..b4f34bd8b9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -760,6 +760,7 @@ storage_service::get_range_to_address_map_in_local_dc( locator::effective_replication_map_ptr erm) const { auto orig_map = co_await get_range_to_address_map(erm, co_await get_tokens_in_local_dc(*erm->get_token_metadata_ptr())); std::unordered_map filtered_map; + filtered_map.reserve(orig_map.size()); for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); @@ -3110,6 +3111,7 @@ storage_service::construct_range_to_endpoint_map( locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { std::unordered_map res; + res.reserve(ranges.size()); for (auto r : ranges) { res[r] = erm->get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); @@ -3374,6 +3376,7 @@ storage_service::get_all_ranges(const std::vector& sorted_tokens) const { co_return dht::token_range_vector(); int size = sorted_tokens.size(); dht::token_range_vector ranges; + ranges.reserve(size); ranges.push_back(dht::token_range::make_ending_with(range_bound(sorted_tokens[0], true))); co_await coroutine::maybe_yield(); for (int i = 1; i < size; ++i) {