token_metadata: add endpoints for reading
In this patch we add token_metadata::set_topology_transition_state method. If the current state is write_both_read_new update_pending_ranges will compute new ranges for read requests. The default value of topology_transition_state is null, meaning no read ranges are computed. We will add the appropriate set_topology_transition_state calls later. Also, we add endpoints_for_reading method to get read endpoints based on the computed ranges.
This commit is contained in:
@@ -61,14 +61,16 @@ private:
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
// For each keyspace, migration_info contains ranges of tokens and
|
||||
// corresponding replicas to which writes will be directed:
|
||||
// corresponding replicas to which writes or reads will be directed:
|
||||
// - pending_endpoints - will be appended to normal endpoints for writes;
|
||||
// - read_endpoints - will completely replace normal endpoints for reads.
|
||||
// This data structure is filled only during data migration between nodes
|
||||
// when they are added or removed from the cluster.
|
||||
// During normal operation, token mapping to nodes is
|
||||
// implemented in the effective_replication_map.
|
||||
struct migration_info {
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
};
|
||||
std::unordered_map<sstring, migration_info> _keyspace_to_migration_info;
|
||||
|
||||
@@ -78,6 +80,8 @@ private:
|
||||
|
||||
topology _topology;
|
||||
|
||||
std::optional<service::topology::transition_state> _topology_transition_state;
|
||||
|
||||
long _ring_version = 0;
|
||||
static thread_local long _static_ring_version;
|
||||
|
||||
@@ -282,6 +286,12 @@ public:
|
||||
// returns empty vector if keyspace_name not found.
|
||||
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_topology_transition_state = state;
|
||||
}
|
||||
|
||||
public:
|
||||
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
|
||||
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
|
||||
@@ -371,6 +381,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ret->_tablets = _tablets;
|
||||
ret->_topology_transition_state =_topology_transition_state;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -824,7 +835,7 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
const auto token = tokens[i];
|
||||
|
||||
const auto old_endpoints = strategy.calculate_natural_endpoints(token, *base_token_metadata).get0();
|
||||
const auto new_endpoints = strategy.calculate_natural_endpoints(token, new_token_metadata).get0();
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(token, new_token_metadata).get0();
|
||||
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
|
||||
using interval = ring_mapping::interval_type;
|
||||
@@ -851,6 +862,14 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
if (!pending_endpoints.empty()) {
|
||||
add_mapping(migration_info.pending_endpoints, std::move(pending_endpoints));
|
||||
}
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (_topology_transition_state == service::topology::transition_state::write_both_read_new &&
|
||||
new_endpoints.get_vector() != old_endpoints.get_vector())
|
||||
{
|
||||
add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set());
|
||||
}
|
||||
}
|
||||
return migration_info;
|
||||
});
|
||||
@@ -928,6 +947,14 @@ inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(c
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
std::optional<inet_address_vector_replica_set> token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
|
||||
const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name);
|
||||
if (endpoints == nullptr) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
|
||||
}
|
||||
|
||||
std::map<token, inet_address> token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const {
|
||||
std::map<token, inet_address> ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end());
|
||||
ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end());
|
||||
@@ -1225,6 +1252,16 @@ token_metadata::pending_endpoints_for(const token& token, const sstring& keyspac
|
||||
return _impl->pending_endpoints_for(token, keyspace_name);
|
||||
}
|
||||
|
||||
std::optional<inet_address_vector_replica_set>
|
||||
token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
|
||||
return _impl->endpoints_for_reading(token, keyspace_name);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_impl->set_topology_transition_state(state);
|
||||
}
|
||||
|
||||
std::multimap<inet_address, token>
|
||||
token_metadata::get_endpoint_to_token_map_for_reading() const {
|
||||
return _impl->get_endpoint_to_token_map_for_reading();
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
|
||||
#include "locator/types.hh"
|
||||
#include "locator/topology.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
|
||||
// forward declaration since replica/database.hh includes this file
|
||||
namespace replica {
|
||||
@@ -267,6 +268,16 @@ public:
|
||||
// returns empty vector if keyspace_name not found.
|
||||
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
// This function returns a list of nodes to which a read request should be directed.
|
||||
// Returns not null only during topology changes, if _topology_change_stage == read_new and
|
||||
// new set of replicas differs from the old one.
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
// updates the current topology_transition_state of this instance,
|
||||
// this value is preserved in all clone functions,
|
||||
// by default it's not set
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state);
|
||||
|
||||
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
|
||||
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user