service: range_streamer: Propagate topology_guard to receivers

This commit is contained in:
Tomasz Grabiec
2023-10-26 00:35:19 +02:00
parent 063095ea50
commit fd3c089ccc
17 changed files with 105 additions and 52 deletions

View File

@@ -27,7 +27,8 @@ static logging::logger blogger("boot_strapper");
namespace dht {
future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address) {
future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard topo_guard,
inet_address replace_address) {
blogger.debug("Beginning bootstrap process: sorted_tokens={}", get_token_metadata().sorted_tokens());
sstring description;
if (reason == streaming::stream_reason::bootstrap) {
@@ -38,7 +39,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
throw std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap");
}
try {
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason);
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason, topo_guard);
auto nodes_to_filter = gossiper.get_unreachable_members();
if (reason == streaming::stream_reason::replace) {
nodes_to_filter.insert(std::move(replace_address));

View File

@@ -14,6 +14,7 @@
#include <unordered_set>
#include "replica/database_fwd.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
@@ -52,7 +53,7 @@ public:
, _token_metadata_ptr(std::move(tmptr)) {
}
future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, inet_address replace_address = {});
future<> bootstrap(streaming::stream_reason reason, gms::gossiper& gossiper, service::frozen_topology_guard, inet_address replace_address = {});
/**
* if initialtoken was specified, use that (split on comma).

View File

@@ -265,7 +265,8 @@ future<> range_streamer::stream_async() {
unsigned nr_ranges_streamed = 0;
size_t nr_ranges_total = range_vec.size();
auto do_streaming = [&] (dht::token_range_vector&& ranges_to_stream) {
auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++), _reason);
auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++),
_reason, _topo_guard);
auto abort_listener = _abort_source.subscribe([&] () noexcept { sp.abort(); });
_abort_source.check();
logger.info("{} with {} for keyspace={}, streaming [{}, {}) out of {} ranges",

View File

@@ -14,6 +14,7 @@
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
#include "gms/inet_address.hh"
#include "range.hh"
#include <seastar/core/distributed.hh>
@@ -78,6 +79,7 @@ public:
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
service::frozen_topology_guard topo_guard,
std::vector<sstring> tables = {})
: _db(db)
, _stream_manager(sm)
@@ -89,13 +91,14 @@ public:
, _description(std::move(description))
, _reason(reason)
, _tables(std::move(tables))
, _topo_guard(topo_guard)
{
_abort_source.check();
}
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(tables)) {
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) {
}
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
@@ -158,6 +161,7 @@ private:
sstring _description;
streaming::stream_reason _reason;
std::vector<sstring> _tables;
service::frozen_topology_guard _topo_guard;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
// Number of tx and rx ranges added

View File

@@ -994,15 +994,15 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
}
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id) {
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
if (is_shutting_down()) {
return make_exception_future<value_type>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<value_type>(value_type(std::move(sink), source.get0()));
});
@@ -1010,7 +1010,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}
@@ -1107,13 +1107,13 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea
// PREPARE_MESSAGE
void messaging_service::register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func) {
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func) {
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));
}
future<streaming::prepare_message> messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
sstring description, streaming::stream_reason reason) {
sstring description, streaming::stream_reason reason, service::session_id session) {
return send_message<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
std::move(msg), plan_id, std::move(description), reason);
std::move(msg), plan_id, std::move(description), reason, session);
}
future<> messaging_service::unregister_prepare_message() {
return unregister_handler(messaging_verb::PREPARE_MESSAGE);

View File

@@ -20,6 +20,7 @@
#include "schema/schema_fwd.hh"
#include "streaming/stream_fwd.hh"
#include "locator/host_id.hh"
#include "service/session.hh"
#include <list>
#include <vector>
@@ -354,9 +355,9 @@ public:
// Wrapper for PREPARE_MESSAGE verb
void register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func);
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func);
future<streaming::prepare_message> send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
sstring description, streaming::stream_reason);
sstring description, streaming::stream_reason, service::session_id);
future<> unregister_prepare_message();
// Wrapper for PREPARE_DONE_MESSAGE verb
@@ -366,10 +367,10 @@ public:
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
future<> unregister_stream_mutation_fragments();
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id);
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);

View File

@@ -495,8 +495,9 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, _schema->get_sharder(), std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);

View File

@@ -10,6 +10,7 @@
*/
#include "storage_service.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <seastar/core/distributed.hh>
@@ -3539,7 +3540,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
_gossiper.wait_for_range_setup().get();
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
slogger.info("Starting to bootstrap...");
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get();
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard).get();
} else {
// Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created.
_gossiper.add_local_application_state({
@@ -4965,7 +4966,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
} else {
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get();
bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, replace_address).get();
}
on_streaming_finished();
@@ -5289,6 +5290,7 @@ void storage_service::node_ops_insert(node_ops_id ops_uuid,
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
auto topo_guard = null_topology_guard;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid);
if (req.cmd == node_ops_cmd::query_pending_ops) {
@@ -5360,7 +5362,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
_repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get();
} else {
slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator);
removenode_with_stream(node, as).get();
removenode_with_stream(node, topo_guard, as).get();
}
}
} else if (req.cmd == node_ops_cmd::removenode_abort) {
@@ -5661,7 +5663,7 @@ future<> storage_service::rebuild(sstring source_dc) {
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -5820,8 +5822,10 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streame
}
}
future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr] {
future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
frozen_topology_guard topo_guard,
shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr, topo_guard] {
auto tmptr = get_token_metadata_ptr();
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
@@ -5837,7 +5841,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
@@ -5888,7 +5892,12 @@ future<> storage_service::leave_ring() {
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission);
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source,
get_broadcast_address(),
_snitch.local()->get_location(),
"Unbootstrap",
streaming::stream_reason::decommission,
null_topology_guard);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -6246,7 +6255,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper);
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard);
}
}));
}
@@ -6273,7 +6282,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
auto existing_ip = _group0->address_map().find(replaced_id);
assert(existing_ip);
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, *existing_ip);
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, *existing_ip);
}
}));
}
@@ -6325,7 +6334,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto ops = seastar::make_shared<node_ops_info>(node_ops_id::create_random_id(), as, std::move(ignored_ips));
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops);
} else {
return removenode_with_stream(*ip, as);
return removenode_with_stream(*ip, null_topology_guard, as);
}
}));
result.status = raft_topology_cmd_result::command_status::success;
@@ -6340,7 +6349,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild,
null_topology_guard);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -6527,7 +6537,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm), guard.get_abort_source(),
get_broadcast_address(), _snitch.local()->get_location(),
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
"Tablet migration", streaming::stream_reason::tablet_migration, null_topology_guard, std::move(tables));
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));

View File

@@ -14,6 +14,7 @@
#include <seastar/core/shared_future.hh>
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_guard.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
@@ -538,7 +539,7 @@ private:
*/
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
future<> removenode_with_stream(gms::inet_address leaving_node, frozen_topology_guard, shared_ptr<abort_source> as_ptr);
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
// needs to be modified to accept either a keyspace or ARS.

View File

@@ -184,7 +184,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
for (auto& node : current_targets) {
if (!metas.contains(node)) {
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(),
ops_uuid, cf_id, estimated_partitions, reason, netw::messaging_service::msg_addr(node));
ops_uuid, cf_id, estimated_partitions, reason, service::default_session_id, netw::messaging_service::msg_addr(node));
llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node);
metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source)));
metas.at(node).receive();

View File

@@ -24,11 +24,13 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader_v2 reader) -> future<> {
sstables::offstrategy offstrategy,
service::frozen_topology_guard frozen_guard) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
std::exception_ptr ex;
try {
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto guard = service::topology_guard(*cf, frozen_guard);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};

View File

@@ -8,6 +8,7 @@
#include "sstables/sstable_set.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
namespace replica {
class database;
@@ -28,6 +29,7 @@ std::function<future<>(flat_mutation_reader_v2)> make_streaming_consumer(sstring
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy);
sstables::offstrategy offstrategy,
service::frozen_topology_guard);
}

View File

@@ -25,6 +25,7 @@ stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, dh
auto session = _coordinator->get_or_create_session(_mgr, from);
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
session->set_reason(_reason);
session->set_topo_guard(_topo_guard);
return *this;
}
@@ -37,6 +38,7 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, dht
auto session = _coordinator->get_or_create_session(_mgr, to);
session->add_transfer_ranges(std::move(keyspace), std::move(ranges), std::move(column_families));
session->set_reason(_reason);
session->set_topo_guard(_topo_guard);
return *this;
}

View File

@@ -35,6 +35,7 @@ private:
plan_id _plan_id;
sstring _description;
stream_reason _reason;
service::frozen_topology_guard _topo_guard;
std::vector<stream_event_handler*> _handlers;
shared_ptr<stream_coordinator> _coordinator;
bool _range_added = false;
@@ -46,11 +47,13 @@ public:
*
* @param description Stream type that describes this StreamPlan
*/
stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified)
stream_plan(stream_manager& mgr, sstring description, stream_reason reason = stream_reason::unspecified,
service::frozen_topology_guard topo_guard = {})
: _mgr(mgr)
, _plan_id(plan_id{utils::UUID_gen::get_time_UUID()})
, _description(description)
, _reason(reason)
, _topo_guard(topo_guard)
, _coordinator(make_shared<stream_coordinator>())
{
}

View File

@@ -36,6 +36,7 @@
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "consumer.hh"
#include "readers/generating_v2.hh"
#include "service/topology_guard.hh"
namespace streaming {
@@ -89,17 +90,19 @@ public:
void stream_manager::init_messaging_service_handler(abort_source& as) {
auto& ms = _ms.local();
ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt) {
ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt, rpc::optional<service::session_id> session) {
const auto& src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
auto dst_cpu_id = this_shard_id();
auto reason = reason_opt ? *reason_opt : stream_reason::unspecified;
return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason] (auto& sm) mutable {
auto topo_guard = service::frozen_topology_guard(session.value_or(service::default_session_id));
return container().invoke_on(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, reason, topo_guard] (auto& sm) mutable {
auto sr = stream_result_future::init_receiving_side(sm, plan_id, description, from);
auto session = sm.get_session(plan_id, from, "PREPARE_MESSAGE");
session->init(sr);
session->dst_cpu_id = src_cpu_id;
session->set_reason(reason);
session->set_topo_guard(topo_guard);
return session->prepare(std::move(msg.requests), std::move(msg.summaries));
});
});
@@ -111,23 +114,29 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
return make_ready_future<>();
});
});
ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions,
rpc::optional<stream_reason> reason_opt,
rpc::optional<service::session_id> session,
rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
utils::fb_utilities::get_broadcast_address())));
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s] (reader_permit permit) mutable {
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;
bool got_end_of_stream = false;
};
auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
auto offstrategy_update = make_lw_shared<offstrategy_trigger>(_db, cf_id, plan_id);
auto get_next_mutation_fragment = [&sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
auto guard = service::topology_guard(s->table(), topo_guard);
auto get_next_mutation_fragment = [guard = std::move(guard), &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
guard.check();
return source().then([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
if (opt) {
auto cmd = std::get<1>(*opt);
@@ -172,7 +181,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
//FIXME: discarded future.
(void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s),
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)),
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard),
std::move(op)
).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future<uint64_t> f) mutable {
int32_t status = 0;
@@ -271,7 +280,7 @@ future<> stream_session::on_initialization_complete() {
}
auto id = msg_addr{this->peer, 0};
sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id);
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) {
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) {
try {
auto msg = f.get0();
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer);

View File

@@ -22,6 +22,7 @@
#include "streaming/stream_manager.hh"
#include "streaming/stream_reason.hh"
#include "streaming/session_info.hh"
#include "service/topology_guard.hh"
#include "query-request.hh"
#include <map>
#include <vector>
@@ -156,6 +157,7 @@ private:
session_info _session_info;
stream_reason _reason = stream_reason::unspecified;
service::frozen_topology_guard _topo_guard;
public:
stream_reason get_reason() const {
return _reason;
@@ -164,6 +166,14 @@ public:
_reason = reason;
}
void set_topo_guard(service::frozen_topology_guard topo_guard) {
_topo_guard = topo_guard;
}
service::frozen_topology_guard topo_guard() const {
return _topo_guard;
}
void add_bytes_sent(int64_t bytes) {
_bytes_sent += bytes;
}

View File

@@ -16,6 +16,7 @@
#include "streaming/stream_manager.hh"
#include "streaming/stream_reason.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "service/topology_guard.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "mutation/mutation_fragment_stream_validator.hh"
#include "mutation/frozen_mutation.hh"
@@ -50,6 +51,7 @@ struct send_info {
netw::messaging_service::msg_addr id;
uint32_t dst_cpu_id;
stream_reason reason;
service::frozen_topology_guard topo_guard;
size_t mutations_nr{0};
semaphore mutations_done{0};
bool error_logged = false;
@@ -60,13 +62,15 @@ struct send_info {
noncopyable_function<void(size_t)> update;
send_info(netw::messaging_service& ms_, streaming::plan_id plan_id_, replica::table& tbl_, reader_permit permit_,
dht::token_range_vector ranges_, netw::messaging_service::msg_addr id_,
uint32_t dst_cpu_id_, stream_reason reason_, noncopyable_function<void(size_t)> update_fn)
uint32_t dst_cpu_id_, stream_reason reason_, service::frozen_topology_guard topo_guard_,
noncopyable_function<void(size_t)> update_fn)
: ms(ms_)
, plan_id(plan_id_)
, cf_id(tbl_.schema()->id())
, id(id_)
, dst_cpu_id(dst_cpu_id_)
, reason(reason_)
, topo_guard(topo_guard_)
, cf(tbl_)
, ranges(std::move(ranges_))
, prs(dht::to_partition_ranges(ranges))
@@ -116,7 +120,7 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
}
return si->estimate_partitions().then([si] (size_t estimated_partitions) {
sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions);
return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then_unpack([si] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd> sink, rpc::source<int32_t> source) mutable {
return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->topo_guard, si->id).then_unpack([si] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd> sink, rpc::source<int32_t> source) mutable {
auto got_error_from_peer = make_lw_shared<bool>(false);
auto table_is_dropped = make_lw_shared<bool>(false);
@@ -205,10 +209,11 @@ future<> stream_transfer_task::execute() {
sort_and_merge_ranges();
auto reason = session->get_reason();
auto& sm = session->manager();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable {
auto topo_guard = session->topo_guard();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable {
auto& tbl = sm.db().find_column_family(cf_id);
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) {
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) {
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
});
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {