view, storage_proxy: carry effective_replication_map along with endpoints

When sending mutation to remote endpoint,
the selected endpoints must be in sync with
the current effective_replication_map.

Currently, the endpoints are sent down the storage_proxy
stack, and later on an effective_replication_map is retrieved
again, and it might not match the target or pending endpoints,
similar to the case seen in https://github.com/scylladb/scylladb/issues/15138

The correct way is to carry the same effective replication map
used to select said endpoints and pass it down the stack.
See also https://github.com/scylladb/scylladb/pull/15141

Fixes scylladb/scylladb#15144
Fixes scylladb/scylladb#14730

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #15142
This commit is contained in:
Benny Halevy
2023-08-23 21:20:42 +03:00
committed by Botond Dénes
parent 0b9f221f2a
commit 2c54d7a35a
5 changed files with 30 additions and 20 deletions

View File

@@ -1553,14 +1553,11 @@ bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_b
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
static std::optional<gms::inet_address>
get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name,
get_view_natural_endpoint(const locator::vnode_effective_replication_map_ptr& erm, bool network_topology,
const dht::token& base_token, const dht::token& view_token) {
auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_effective_replication_map();
auto& topology = erm->get_token_metadata_ptr()->get_topology();
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_datacenter = topology.get_datacenter();
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());
std::vector<gms::inet_address> base_endpoints, view_endpoints;
for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) {
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
@@ -1595,7 +1592,8 @@ get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name,
return view_endpoints[base_it - base_endpoints.begin()];
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints,
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints,
frozen_mutation_and_schema&& mut, const dht::token& base_token, const dht::token& view_token,
service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) {
@@ -1603,6 +1601,7 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in
mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token);
return proxy.send_to_endpoint(
std::move(mut),
std::move(ermp),
target,
std::move(pending_endpoints),
db::write_type::VIEW,
@@ -1639,8 +1638,11 @@ future<> view_update_generator::mutate_MV(
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> {
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto& keyspace_name = mut.s->ks_name();
auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token);
auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token);
auto& ks = _proxy.local().local_db().find_keyspace(keyspace_name);
auto ermp = ks.get_effective_replication_map();
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());
auto target_endpoint = get_view_natural_endpoint(ermp, network_topology, base_token, view_token);
auto remote_endpoints = ermp->get_pending_endpoints(view_token);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
const bool update_synchronously = should_update_synchronously(*mut.s);
@@ -1724,7 +1726,7 @@ future<> view_update_generator::mutate_MV(
stats.view_updates_pushed_remote += updates_pushed_remote;
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
schema_ptr s = mut.s;
future<> view_update = apply_to_remote_endpoints(_proxy.local(), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable {
if (f.failed()) {