diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 72d83d2b46..10027e02f3 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -538,13 +538,13 @@ future manager::end_point_hints_manager::sender::get_last_file_modific }); } -future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept { - return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> { +future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept { + return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_hint_to_endpoint(std::move(m), end_point_key()); + return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), end_point_key()); } else { manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); return _proxy.send_hint_to_all_replicas(std::move(m)); @@ -857,7 +857,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta auto token = dht::get_token(*m.s, m.fm.key()); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints(std::move(token)); - return do_send_one_mutation(std::move(m), natural_endpoints); + return do_send_one_mutation(std::move(m), std::move(erm), std::move(natural_endpoints)); } future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index f42f898276..233eccaafc 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -27,6 +27,7 @@ #include "db/hints/resource_manager.hh" #include "db/hints/host_filter.hh" #include "db/hints/sync_point.hh" +#include "locator/abstract_replication_strategy.hh" class fragmented_temporary_buffer; @@ -276,9 +277,10 @@ public: /// to it, otherwise execute the mutation "from scratch" with CL=ALL. /// /// \param m mutation to send + /// \param ermp points to the effective_replication_map used to obtain \c natural_endpoints /// \param natural_endpoints current replicas for the given mutation /// \return future that resolves when the operation is complete - future<> do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept; + future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept; /// \brief Send one mutation out. /// diff --git a/db/view/view.cc b/db/view/view.cc index ca900e6f74..864010e3bf 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1553,14 +1553,11 @@ bool needs_static_row(const mutation_partition& mp, const std::vector -get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name, +get_view_natural_endpoint(const locator::vnode_effective_replication_map_ptr& erm, bool network_topology, const dht::token& base_token, const dht::token& view_token) { - auto& ks = db.find_keyspace(keyspace_name); - auto erm = ks.get_effective_replication_map(); auto& topology = erm->get_token_metadata_ptr()->get_topology(); auto my_address = utils::fb_utilities::get_broadcast_address(); auto my_datacenter = topology.get_datacenter(); - bool network_topology = dynamic_cast(&ks.get_replication_strategy()); std::vector base_endpoints, view_endpoints; for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) { if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) { @@ -1595,7 +1592,8 @@ get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name, return view_endpoints[base_it - base_endpoints.begin()]; } -static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, +static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp, + gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, frozen_mutation_and_schema&& mut, const dht::token& base_token, const dht::token& view_token, service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) { @@ -1603,6 +1601,7 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token); return proxy.send_to_endpoint( std::move(mut), + std::move(ermp), target, std::move(pending_endpoints), db::write_type::VIEW, @@ -1639,8 +1638,11 @@ future<> view_update_generator::mutate_MV( [this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto& keyspace_name = mut.s->ks_name(); - auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token); - auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token); + auto& ks = _proxy.local().local_db().find_keyspace(keyspace_name); + auto ermp = ks.get_effective_replication_map(); + bool network_topology = dynamic_cast(&ks.get_replication_strategy()); + auto target_endpoint = get_view_natural_endpoint(ermp, network_topology, base_token, view_token); + auto remote_endpoints = ermp->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); @@ -1724,7 +1726,7 @@ future<> view_update_generator::mutate_MV( stats.view_updates_pushed_remote += updates_pushed_remote; cf_stats.total_view_updates_pushed_remote += updates_pushed_remote; schema_ptr s = mut.s; - future<> view_update = apply_to_remote_endpoints(_proxy.local(), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( + future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable { if (f.failed()) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3947626e7f..b3dfd74a67 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3764,6 +3764,7 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const future<> storage_proxy::send_to_endpoint( std::unique_ptr m, + locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, @@ -3782,7 +3783,7 @@ future<> storage_proxy::send_to_endpoint( timeout = clock_type::now() + 5min; } return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(), - [this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] ( + [this, tr_state, erm = std::move(ermp), target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] ( std::unique_ptr& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable { @@ -3794,8 +3795,6 @@ future<> storage_proxy::send_to_endpoint( std::inserter(targets, targets.begin()), std::back_inserter(dead_endpoints), std::bind_front(&storage_proxy::is_alive, this)); - auto& table = _db.local().find_column_family(m->schema()->id()); - auto erm = table.get_effective_replication_map(); slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints); return create_write_response_handler( @@ -3820,6 +3819,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, + locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, @@ -3828,6 +3828,7 @@ future<> storage_proxy::send_to_endpoint( is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), + std::move(ermp), std::move(target), std::move(pending_endpoints), type, @@ -3839,6 +3840,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, + locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, @@ -3848,6 +3850,7 @@ future<> storage_proxy::send_to_endpoint( is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), + std::move(ermp), std::move(target), std::move(pending_endpoints), type, @@ -3857,10 +3860,11 @@ future<> storage_proxy::send_to_endpoint( cancellable); } -future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) { +future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target) { if (!_features.hinted_handoff_separate_connection) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), + std::move(ermp), std::move(target), { }, db::write_type::SIMPLE, @@ -3872,6 +3876,7 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, return send_to_endpoint( std::make_unique(std::move(fm_a_s)), + std::move(ermp), std::move(target), { }, db::write_type::SIMPLE, diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 3f2a6fdfee..4e23c1d5e9 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -421,6 +421,7 @@ private: future<> send_to_endpoint( std::unique_ptr m, + locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, @@ -588,15 +589,15 @@ public: // Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, is_cancellable); - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, allow_hints, is_cancellable); // Send a mutation to a specific remote target as a hint. // Unlike regular mutations during write operations, hints are sent on the streaming connection // and use different RPC verb. - future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target); + future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target); /** * Performs the truncate operatoin, which effectively deletes all data from