token_metadata: replace set_topology_transition_state with set_read_new
This helps isolate topology::transition_state dependencies, token_metadata doesn't need the entire enum, just this boolean flag.
This commit is contained in:
@@ -80,7 +80,7 @@ private:
|
||||
|
||||
topology _topology;
|
||||
|
||||
std::optional<service::topology::transition_state> _topology_transition_state;
|
||||
token_metadata::read_new_t _read_new = token_metadata::read_new_t::no;
|
||||
|
||||
long _ring_version = 0;
|
||||
static thread_local long _static_ring_version;
|
||||
@@ -288,8 +288,8 @@ public:
|
||||
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_topology_transition_state = state;
|
||||
void set_read_new(token_metadata::read_new_t read_new) {
|
||||
_read_new = read_new;
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -381,7 +381,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ret->_tablets = _tablets;
|
||||
ret->_topology_transition_state =_topology_transition_state;
|
||||
ret->_read_new = _read_new;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -865,7 +865,7 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (_topology_transition_state == service::topology::transition_state::write_both_read_new &&
|
||||
if (_read_new == token_metadata::read_new_t::yes &&
|
||||
new_endpoints.get_vector() != old_endpoints.get_vector())
|
||||
{
|
||||
add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set());
|
||||
@@ -1258,8 +1258,8 @@ token_metadata::endpoints_for_reading(const token& token, const sstring& keyspac
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_impl->set_topology_transition_state(state);
|
||||
token_metadata::set_read_new(read_new_t read_new) {
|
||||
_impl->set_read_new(read_new);
|
||||
}
|
||||
|
||||
std::multimap<inet_address, token>
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
|
||||
#include "locator/types.hh"
|
||||
#include "locator/topology.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
|
||||
// forward declaration since replica/database.hh includes this file
|
||||
namespace replica {
|
||||
@@ -273,10 +272,13 @@ public:
|
||||
// new set of replicas differs from the old one.
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
// updates the current topology_transition_state of this instance,
|
||||
// this value is preserved in all clone functions,
|
||||
// by default it's not set
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state);
|
||||
// Updates the read_new flag, switching read requests from
|
||||
// the old endpoints to the new ones during topology changes:
|
||||
// read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints;
|
||||
// read_new_t::yes - triggers update_pending_ranges to compute and store new ranges for read requests.
|
||||
// The value is preserved in all clone functions, the default is read_new_t::no.
|
||||
using read_new_t = bool_class<class read_new_tag>;
|
||||
void set_read_new(read_new_t value);
|
||||
|
||||
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
|
||||
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
|
||||
|
||||
@@ -360,7 +360,19 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
co_await add_normal_node(id, rs);
|
||||
}
|
||||
|
||||
tmptr->set_topology_transition_state(_topology_state_machine._topology.tstate);
|
||||
tmptr->set_read_new(std::invoke([](std::optional<topology::transition_state> state) {
|
||||
using read_new_t = locator::token_metadata::read_new_t;
|
||||
if (!state.has_value()) {
|
||||
return read_new_t::no;
|
||||
}
|
||||
switch (*state) {
|
||||
case topology::transition_state::commit_cdc_generation:
|
||||
case topology::transition_state::write_both_read_old:
|
||||
return read_new_t::no;
|
||||
case topology::transition_state::write_both_read_new:
|
||||
return read_new_t::yes;
|
||||
}
|
||||
}, _topology_state_machine._topology.tstate));
|
||||
|
||||
for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
|
||||
@@ -234,7 +234,7 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
check_no_endpoints(2);
|
||||
|
||||
token_metadata.set_topology_transition_state(service::topology::transition_state::write_both_read_new);
|
||||
token_metadata.set_read_new(locator::token_metadata::read_new_t::yes);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
|
||||
check_endpoints(2, {e3, e1});
|
||||
|
||||
Reference in New Issue
Block a user