Merge 'Unstall get_range_to_address_map' from Benny Halevy
Prevent stalls in this path as seen in performance testing. Also, add a respective rest_api test. Fixes #11114 Closes #11115 * github.com:scylladb/scylla: storage_service: reserve space in get_range_to_address_map and friends storage_service: coroutinize get_range_to_address_map and friends storage_service: pass replication map to get_range_to_address_map and friends storage_service: get_range_to_address_map: move selection of arbitrary ks to api layer test: rest_api: test range_to_endpoint_map and describe_ring
This commit is contained in:
@@ -514,10 +514,10 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
return ctx.db.local().get_config().saved_caches_directory();
|
||||
});
|
||||
|
||||
ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
|
||||
ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
std::vector<ss::maplist_mapper> res;
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_range_to_address_map(keyspace),
|
||||
co_return stream_range_as_array(co_await ss.local().get_range_to_address_map(keyspace),
|
||||
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
|
||||
ss::maplist_mapper m;
|
||||
if (entry.first.start()) {
|
||||
@@ -534,7 +534,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
m.value.push(address.to_sstring());
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
});
|
||||
});
|
||||
|
||||
ss::get_pending_range_to_endpoint_map.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
@@ -546,7 +546,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
});
|
||||
|
||||
ss::describe_any_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
|
||||
return describe_ring_as_json(ss, "");
|
||||
// Find an arbitrary non-system keyspace.
|
||||
auto keyspaces = ctx.db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
auto ks = keyspaces[0];
|
||||
return describe_ring_as_json(ss, ks);
|
||||
});
|
||||
|
||||
ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
|
||||
|
||||
@@ -746,51 +746,50 @@ storage_service::get_rpc_address(const inet_address& endpoint) const {
|
||||
return boost::lexical_cast<std::string>(endpoint);
|
||||
}
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace) const {
|
||||
return get_range_to_address_map(keyspace, get_token_metadata().sorted_tokens());
|
||||
return get_range_to_address_map(_db.local().find_keyspace(keyspace).get_effective_replication_map());
|
||||
}
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const {
|
||||
return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens());
|
||||
}
|
||||
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
storage_service::get_range_to_address_map_in_local_dc(
|
||||
const sstring& keyspace) const {
|
||||
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
|
||||
locator::effective_replication_map_ptr erm) const {
|
||||
auto orig_map = co_await get_range_to_address_map(erm, co_await get_tokens_in_local_dc(*erm->get_token_metadata_ptr()));
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> filtered_map;
|
||||
filtered_map.reserve(orig_map.size());
|
||||
for (auto entry : orig_map) {
|
||||
auto& addresses = filtered_map[entry.first];
|
||||
addresses.reserve(entry.second.size());
|
||||
std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), db::is_local);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
return filtered_map;
|
||||
co_return filtered_map;
|
||||
}
|
||||
|
||||
std::vector<token>
|
||||
storage_service::get_tokens_in_local_dc() const {
|
||||
// Caller is responsible to hold token_metadata valid until the returned future is resolved
|
||||
future<std::vector<token>>
|
||||
storage_service::get_tokens_in_local_dc(const locator::token_metadata& tm) const {
|
||||
std::vector<token> filtered_tokens;
|
||||
const auto& tm = get_token_metadata();
|
||||
for (auto token : tm.sorted_tokens()) {
|
||||
auto endpoint = tm.get_endpoint(token);
|
||||
if (db::is_local(*endpoint))
|
||||
filtered_tokens.push_back(token);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return filtered_tokens;
|
||||
co_return filtered_tokens;
|
||||
}
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
// Caller is responsible to hold token_metadata valid until the returned future is resolved
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node) {
|
||||
@@ -3066,11 +3065,13 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
|
||||
std::vector<token_range_endpoints> ranges;
|
||||
//Token.TokenFactory tf = getPartitioner().getTokenFactory();
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_to_address_map =
|
||||
auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map();
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_to_address_map = co_await (
|
||||
include_only_local_dc
|
||||
? get_range_to_address_map_in_local_dc(keyspace)
|
||||
: get_range_to_address_map(keyspace);
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
? get_range_to_address_map_in_local_dc(erm)
|
||||
: get_range_to_address_map(erm)
|
||||
);
|
||||
auto tmptr = erm->get_token_metadata_ptr();
|
||||
for (auto entry : range_to_address_map) {
|
||||
const auto& topology = tmptr->get_topology();
|
||||
auto range = entry.first;
|
||||
@@ -3113,17 +3114,18 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
|
||||
co_return ranges;
|
||||
}
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
storage_service::construct_range_to_endpoint_map(
|
||||
const sstring& keyspace,
|
||||
locator::effective_replication_map_ptr erm,
|
||||
const dht::token_range_vector& ranges) const {
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> res;
|
||||
auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map();
|
||||
res.reserve(ranges.size());
|
||||
for (auto r : ranges) {
|
||||
res[r] = erm->get_natural_endpoints(
|
||||
r.end() ? r.end()->value() : dht::maximum_token());
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return res;
|
||||
co_return res;
|
||||
}
|
||||
|
||||
|
||||
@@ -3378,20 +3380,24 @@ storage_service::get_ranges_for_endpoint(const sstring& name, const gms::inet_ad
|
||||
return _db.local().find_keyspace(name).get_effective_replication_map()->get_ranges(ep);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
// Caller is responsible to hold token_metadata valid until the returned future is resolved
|
||||
future<dht::token_range_vector>
|
||||
storage_service::get_all_ranges(const std::vector<token>& sorted_tokens) const {
|
||||
if (sorted_tokens.empty())
|
||||
return dht::token_range_vector();
|
||||
co_return dht::token_range_vector();
|
||||
int size = sorted_tokens.size();
|
||||
dht::token_range_vector ranges;
|
||||
ranges.reserve(size);
|
||||
ranges.push_back(dht::token_range::make_ending_with(range_bound<token>(sorted_tokens[0], true)));
|
||||
co_await coroutine::maybe_yield();
|
||||
for (int i = 1; i < size; ++i) {
|
||||
dht::token_range r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
|
||||
ranges.push_back(r);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ranges.push_back(dht::token_range::make_starting_with(range_bound<token>(sorted_tokens[size-1], false)));
|
||||
|
||||
return ranges;
|
||||
co_return ranges;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set
|
||||
|
||||
@@ -384,14 +384,15 @@ public:
|
||||
*/
|
||||
sstring get_rpc_address(const inet_address& endpoint) const;
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map(const sstring& keyspace) const;
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(const sstring& keyspace) const;
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm) const;
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map_in_local_dc(
|
||||
const sstring& keyspace) const;
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map_in_local_dc(
|
||||
locator::effective_replication_map_ptr erm) const;
|
||||
|
||||
std::vector<token> get_tokens_in_local_dc() const;
|
||||
future<std::vector<token>> get_tokens_in_local_dc(const locator::token_metadata&) const;
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map(const sstring& keyspace,
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm,
|
||||
const std::vector<token>& sorted_tokens) const;
|
||||
|
||||
/**
|
||||
@@ -424,8 +425,8 @@ public:
|
||||
* @param ranges
|
||||
* @return mapping of ranges to the replicas responsible for them.
|
||||
*/
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> construct_range_to_endpoint_map(
|
||||
const sstring& keyspace,
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> construct_range_to_endpoint_map(
|
||||
locator::effective_replication_map_ptr erm,
|
||||
const dht::token_range_vector& ranges) const;
|
||||
public:
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
@@ -633,7 +634,7 @@ public:
|
||||
* ranges.
|
||||
* @return ranges in sorted order
|
||||
*/
|
||||
dht::token_range_vector get_all_ranges(const std::vector<token>& sorted_tokens) const;
|
||||
future<dht::token_range_vector> get_all_ranges(const std::vector<token>& sorted_tokens) const;
|
||||
/**
|
||||
* This method returns the N endpoints that are responsible for storing the
|
||||
* specified key i.e for replication.
|
||||
|
||||
@@ -386,3 +386,16 @@ def test_materialized_view_pre_scrub_snapshot(cql, this_dc, rest_api):
|
||||
with new_secondary_index(cql, table, 'v') as si:
|
||||
resp = rest_api.send("GET", f"storage_service/keyspace_scrub/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
|
||||
def test_range_to_endpoint_map(cql, this_dc, rest_api):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
resp = rest_api.send("GET", f"storage_service/range_to_endpoint_map/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
|
||||
def test_describe_ring(cql, this_dc, rest_api):
|
||||
resp = rest_api.send("GET", "storage_service/describe_ring")
|
||||
resp.raise_for_status()
|
||||
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
resp = rest_api.send("GET", f"storage_service/describe_ring/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
|
||||
Reference in New Issue
Block a user