diff --git a/locator/tablets.cc b/locator/tablets.cc index 154ad509c1..602883d5c6 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -28,6 +28,53 @@ namespace locator { seastar::logger tablet_logger("tablets"); + +static +write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) { + switch (stage) { + case tablet_transition_stage::allow_write_both_read_old: + return write_replica_set_selector::previous; + case tablet_transition_stage::write_both_read_old: + return write_replica_set_selector::both; + case tablet_transition_stage::streaming: + return write_replica_set_selector::both; + case tablet_transition_stage::write_both_read_new: + return write_replica_set_selector::both; + case tablet_transition_stage::use_new: + return write_replica_set_selector::next; + case tablet_transition_stage::cleanup: + return write_replica_set_selector::next; + } + on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); +} + +static +read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) { + switch (stage) { + case tablet_transition_stage::allow_write_both_read_old: + return read_replica_set_selector::previous; + case tablet_transition_stage::write_both_read_old: + return read_replica_set_selector::previous; + case tablet_transition_stage::streaming: + return read_replica_set_selector::previous; + case tablet_transition_stage::write_both_read_new: + return read_replica_set_selector::next; + case tablet_transition_stage::use_new: + return read_replica_set_selector::next; + case tablet_transition_stage::cleanup: + return read_replica_set_selector::next; + } + on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); +} + +tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica) + : stage(stage) + , next(std::move(next)) + , pending_replica(std::move(pending_replica)) + , writes(get_selector_for_writes(stage)) + , reads(get_selector_for_reads(stage)) +{ } + const tablet_map& tablet_metadata::get_tablet_map(table_id id) const { try { return _tablets.at(id); @@ -267,7 +314,22 @@ public: virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override { auto&& tablets = get_tablet_map(); auto tablet = tablets.get_tablet_id(search_token); - auto&& replicas = tablets.get_tablet_info(tablet).replicas; + auto* info = tablets.get_tablet_transition_info(tablet); + auto&& replicas = std::invoke([&] () -> const tablet_replica_set& { + if (!info) { + return tablets.get_tablet_info(tablet).replicas; + } + switch (info->writes) { + case write_replica_set_selector::previous: + [[fallthrough]]; + case write_replica_set_selector::both: + return tablets.get_tablet_info(tablet).replicas; + case write_replica_set_selector::next: { + return info->next; + } + } + on_internal_error(tablet_logger, format("Invalid replica selector", static_cast(info->writes))); + }); tablet_logger.trace("get_natural_endpoints({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas); return to_replica_set(replicas); } @@ -285,13 +347,40 @@ public: if (!info) { return {}; } - tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}", - search_token, _table, tablet, info->pending_replica); - return {get_endpoint_for_host_id(info->pending_replica.host)}; + switch (info->writes) { + case write_replica_set_selector::previous: + return {}; + case write_replica_set_selector::both: + tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}", + search_token, _table, tablet, info->pending_replica); + return {get_endpoint_for_host_id(info->pending_replica.host)}; + case write_replica_set_selector::next: + return {}; + } + on_internal_error(tablet_logger, format("Invalid replica selector", static_cast(info->writes))); } virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override { - return get_natural_endpoints_without_node_being_replaced(search_token); + auto&& tablets = get_tablet_map(); + auto tablet = tablets.get_tablet_id(search_token); + auto&& info = tablets.get_tablet_transition_info(tablet); + auto&& replicas = std::invoke([&] () -> const tablet_replica_set& { + if (!info) { + return tablets.get_tablet_info(tablet).replicas; + } + switch (info->reads) { + case read_replica_set_selector::previous: + return tablets.get_tablet_info(tablet).replicas; + case read_replica_set_selector::next: { + return info->next; + } + } + on_internal_error(tablet_logger, format("Invalid replica selector", static_cast(info->reads))); + }); + tablet_logger.trace("get_endpoints_for_reading({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas); + auto result = to_replica_set(replicas); + maybe_remove_node_being_replaced(*_tmptr, *_rs, result); + return result; } virtual bool has_pending_ranges(inet_address endpoint) const override { diff --git a/locator/tablets.hh b/locator/tablets.hh index 8a0efc9c8c..c55db47f9b 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -80,12 +80,24 @@ enum class tablet_transition_stage { sstring tablet_transition_stage_to_string(tablet_transition_stage); tablet_transition_stage tablet_transition_stage_from_string(const sstring&); +enum class write_replica_set_selector { + previous, both, next +}; + +enum class read_replica_set_selector { + previous, next +}; + /// Used for storing tablet state transition during topology changes. /// Describes transition of a single tablet. struct tablet_transition_info { tablet_transition_stage stage; tablet_replica_set next; tablet_replica pending_replica; // Optimization (next - tablet_info::replicas) + write_replica_set_selector writes; + read_replica_set_selector reads; + + tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica); bool operator==(const tablet_transition_info&) const = default; };