Merge 'Initialize storage_proxy early, without messaging_service and gossiper' from Kamil Braun

Move the initialization of `storage_proxy` early in the startup procedure, before starting
`system_keyspace`, `messaging_service`, `gossiper`, `storage_service` and more.

As a follow-up, we'll be able to move initialization of `query_processor` right
after `storage_proxy` (but this requires a bit of refactoring in
`query_processor` too).

Local queries through `storage_proxy` can be done after the early initialization step.
In a follow-up, when we do a similar thing for `query_processor`, we'll be able
to perform local CQL queries early as well. (Before starting `gossiper` etc.)

Closes #14231

* github.com:scylladb/scylladb:
  main, cql_test_env: initialize `storage_proxy` early
  main, cql_test_env: initialize `database` early
  storage_proxy: rename `init_messaging_service` to `start_remote`
  storage_proxy: don't pass `gossiper&` and `messaging_service&` during initialization
  storage_proxy: prepare for missing `remote`
  storage_proxy: don't access `remote` during local queries in `query_partition_key_range_concurrent`
  db: consistency_level: remove overload of `filter_for_query`
  storage_proxy: don't access `remote` when calculating target replicas for local queries
  storage_proxy: introduce const version of `remote()`
  replica: table: introduce `get_my_hit_rate`
  storage_proxy: `endpoint_filter`: remove gossiper dependency
This commit is contained in:
Botond Dénes
2023-06-14 15:37:33 +03:00
8 changed files with 339 additions and 264 deletions

View File

@@ -344,15 +344,6 @@ filter_for_query(consistency_level cl,
return selected_endpoints;
}
inet_address_vector_replica_set filter_for_query(consistency_level cl,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
const gms::gossiper& g,
replica::column_family* cf) {
return filter_for_query(cl, erm, live_endpoints, preferred_endpoints, read_repair_decision::NONE, g, nullptr, cf);
}
bool
is_sufficient_live_nodes(consistency_level cl,
const locator::effective_replication_map& erm,

View File

@@ -56,13 +56,6 @@ filter_for_query(consistency_level cl,
std::optional<gms::inet_address>* extra,
replica::column_family* cf);
inet_address_vector_replica_set filter_for_query(consistency_level cl,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
const gms::gossiper& g,
replica::column_family* cf);
struct dc_node_count {
size_t live = 0;
size_t pending = 0;

258
main.cc
View File

@@ -888,6 +888,132 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
mm_notifier.stop().get();
});
supervisor::notify("starting per-shard database core");
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer_verbose_shutdown("sst_dir_semaphore", [&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();
});
service_memory_limiter.start(memory::stats().total_memory()).get();
auto stop_mem_limiter = defer_verbose_shutdown("service_memory_limiter", [] {
// Uncomment this once services release all the memory on stop
// service_memory_limiter.stop().get();
});
supervisor::notify("creating and verifying directories");
utils::directories::set dir_set;
dir_set.add(cfg->data_file_directories());
dir_set.add(cfg->commitlog_directory());
dir_set.add(cfg->schema_commitlog_directory());
dirs.emplace(cfg->developer_mode());
dirs->create_and_verify(std::move(dir_set)).get();
auto hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->hints_directory()).get();
auto view_hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->view_hints_directory()).get();
if (!hinted_handoff_enabled.is_disabled_for_all()) {
hints_dir_initializer.ensure_created_and_verified().get();
}
view_hints_dir_initializer.ensure_created_and_verified().get();
auto get_tm_cfg = sharded_parameter([&] {
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
};
});
task_manager.start(std::move(get_tm_cfg), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_task_manager = defer_verbose_shutdown("task_manager", [&task_manager] {
task_manager.stop().get();
});
// Note: changed from using a move here, because we want the config object intact.
replica::database_config dbcfg;
dbcfg.compaction_scheduling_group = make_sched_group("compaction", 1000);
dbcfg.memory_compaction_scheduling_group = make_sched_group("mem_compaction", 1000);
dbcfg.streaming_scheduling_group = maintenance_scheduling_group;
dbcfg.statement_scheduling_group = make_sched_group("statement", 1000);
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.gossip_scheduling_group = make_sched_group("gossip", 1000);
dbcfg.commitlog_scheduling_group = make_sched_group("commitlog", 1000);
dbcfg.available_memory = memory::stats().total_memory();
supervisor::notify("starting compaction_manager");
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {
return compaction_manager::config {
.compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group},
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group},
.available_memory = dbcfg.available_memory,
.static_shares = cfg->compaction_static_shares,
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
};
});
cm.start(std::move(get_cm_cfg), std::ref(stop_signal.as_sharded_abort_source()), std::ref(task_manager)).get();
auto stop_cm = defer_verbose_shutdown("compaction_manager", [&cm] {
cm.stop().get();
});
sstm.start(std::ref(*cfg)).get();
auto stop_sstm = defer_verbose_shutdown("sstables storage manager", [&sstm] {
sstm.stop().get();
});
supervisor::notify("starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
db.invoke_on_all(&replica::database::stop).get();
});
// We need to init commitlog on shard0 before it is inited on other shards
// because it obtains the list of pre-existing segments for replay, which must
// not include reserve segments created by active commitlogs.
db.local().init_commitlog().get();
db.invoke_on_all(&replica::database::start).get();
smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
}).get();
debug::the_storage_proxy = &proxy;
supervisor::notify("starting storage proxy");
service::storage_proxy::config spcfg {
.hints_directory_initializer = hints_dir_initializer,
};
spcfg.hinted_handoff_enabled = hinted_handoff_enabled;
spcfg.available_memory = memory::stats().total_memory();
smp_service_group_config storage_proxy_smp_service_group_config;
// Assuming less than 1kB per queued request, this limits storage_proxy submit_to() queues to 5MB or less
storage_proxy_smp_service_group_config.max_nonlocal_requests = 5000;
spcfg.read_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.hints_write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
static db::view::node_update_backlog node_backlog(smp::count, 10ms);
scheduling_group_key_config storage_proxy_stats_cfg =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
storage_proxy_stats_cfg.constructor = [plain_constructor = storage_proxy_stats_cfg.constructor] (void* ptr) {
plain_constructor(ptr);
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_stats();
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_split_metrics_local();
};
storage_proxy_stats_cfg.rename = [] (void* ptr) {
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_stats();
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_split_metrics_local();
};
proxy.start(std::ref(db), spcfg, std::ref(node_backlog),
scheduling_group_key_create(storage_proxy_stats_cfg).get0(),
std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory)).get();
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting lifecycle notifier");
lifecycle_notifier.start().get();
// storage_service references this notifier and is not stopped yet
@@ -915,18 +1041,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::set_server_config(ctx, *cfg).get();
// Note: changed from using a move here, because we want the config object intact.
replica::database_config dbcfg;
dbcfg.compaction_scheduling_group = make_sched_group("compaction", 1000);
dbcfg.memory_compaction_scheduling_group = make_sched_group("mem_compaction", 1000);
dbcfg.streaming_scheduling_group = maintenance_scheduling_group;
dbcfg.statement_scheduling_group = make_sched_group("statement", 1000);
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.gossip_scheduling_group = make_sched_group("gossip", 1000);
dbcfg.commitlog_scheduling_group = make_sched_group("commitlog", 1000);
dbcfg.available_memory = memory::stats().total_memory();
netw::messaging_service::config mscfg;
mscfg.ip = utils::resolve(cfg->listen_address, family).get0();
@@ -1089,88 +1203,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.stop().get();
});
supervisor::notify("starting per-shard database core");
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer_verbose_shutdown("sst_dir_semaphore", [&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();
});
service_memory_limiter.start(memory::stats().total_memory()).get();
auto stop_mem_limiter = defer_verbose_shutdown("service_memory_limiter", [] {
// Uncomment this once services release all the memory on stop
// service_memory_limiter.stop().get();
});
supervisor::notify("creating and verifying directories");
utils::directories::set dir_set;
dir_set.add(cfg->data_file_directories());
dir_set.add(cfg->commitlog_directory());
dir_set.add(cfg->schema_commitlog_directory());
dirs.emplace(cfg->developer_mode());
dirs->create_and_verify(std::move(dir_set)).get();
auto hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->hints_directory()).get();
auto view_hints_dir_initializer = db::hints::directory_initializer::make(*dirs, cfg->view_hints_directory()).get();
if (!hinted_handoff_enabled.is_disabled_for_all()) {
hints_dir_initializer.ensure_created_and_verified().get();
}
view_hints_dir_initializer.ensure_created_and_verified().get();
std::optional<wasm::startup_context> wasm_ctx;
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
wasm_ctx.emplace(*cfg, dbcfg);
}
auto get_tm_cfg = sharded_parameter([&] {
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
};
});
task_manager.start(std::move(get_tm_cfg), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_task_manager = defer_verbose_shutdown("task_manager", [&task_manager] {
task_manager.stop().get();
});
supervisor::notify("starting compaction_manager");
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {
return compaction_manager::config {
.compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group},
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group},
.available_memory = dbcfg.available_memory,
.static_shares = cfg->compaction_static_shares,
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
};
});
cm.start(std::move(get_cm_cfg), std::ref(stop_signal.as_sharded_abort_source()), std::ref(task_manager)).get();
auto stop_cm = defer_verbose_shutdown("compaction_manager", [&cm] {
cm.stop().get();
});
sstm.start(std::ref(*cfg)).get();
auto stop_sstm = defer_verbose_shutdown("sstables storage manager", [&sstm] {
sstm.stop().get();
});
supervisor::notify("starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
db.invoke_on_all(&replica::database::stop).get();
});
// We need to init commitlog on shard0 before it is inited on other shards
// because it obtains the list of pre-existing segments for replay, which must
// not include reserve segments created by active commitlogs.
db.local().init_commitlog().get();
db.invoke_on_all(&replica::database::start).get();
// Initialization of a keyspace is done by shard 0 only. For system
// keyspace, the procedure will go through the hardcoded column
// families, and in each of them, it will load the sstables for all
@@ -1180,40 +1217,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// described here: https://github.com/scylladb/scylla/issues/1014
supervisor::notify("loading system sstables");
replica::distributed_loader::init_system_keyspace(sys_ks, db, ss, gossiper, raft_gr, *cfg, system_table_load_phase::phase1).get();
smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
}).get();
debug::the_storage_proxy = &proxy;
supervisor::notify("starting storage proxy");
service::storage_proxy::config spcfg {
.hints_directory_initializer = hints_dir_initializer,
};
spcfg.hinted_handoff_enabled = hinted_handoff_enabled;
spcfg.available_memory = memory::stats().total_memory();
smp_service_group_config storage_proxy_smp_service_group_config;
// Assuming less than 1kB per queued request, this limits storage_proxy submit_to() queues to 5MB or less
storage_proxy_smp_service_group_config.max_nonlocal_requests = 5000;
spcfg.read_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.hints_write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
static db::view::node_update_backlog node_backlog(smp::count, 10ms);
scheduling_group_key_config storage_proxy_stats_cfg =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
storage_proxy_stats_cfg.constructor = [plain_constructor = storage_proxy_stats_cfg.constructor] (void* ptr) {
plain_constructor(ptr);
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_stats();
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_split_metrics_local();
};
storage_proxy_stats_cfg.rename = [] (void* ptr) {
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_stats();
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_split_metrics_local();
};
proxy.start(std::ref(db), std::ref(gossiper), spcfg, std::ref(node_backlog),
scheduling_group_key_create(storage_proxy_stats_cfg).get0(),
std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory), std::ref(messaging)).get();
supervisor::notify("starting forward service");
forward_service.start(std::ref(messaging), std::ref(proxy), std::ref(db), std::ref(token_metadata)).get();
auto stop_forward_service_handlers = defer_verbose_shutdown("forward service", [&forward_service] {
@@ -1228,9 +1231,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
tablet_allocator.stop().get();
});
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
debug::the_migration_manager = &mm;
mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
@@ -1452,11 +1452,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
mm.init_messaging_service();
}).get();
supervisor::notify("initializing storage proxy RPC verbs");
proxy.invoke_on_all([&mm] (service::storage_proxy& proxy) {
proxy.init_messaging_service(&mm.local());
proxy.invoke_on_all([&messaging, &gossiper, &mm] (service::storage_proxy& proxy) {
proxy.start_remote(messaging.local(), gossiper.local(), mm.local());
}).get();
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});
debug::the_stream_manager = &stream_manager;

View File

@@ -960,6 +960,7 @@ public:
}
void set_hit_rate(gms::inet_address addr, cache_temperature rate);
cache_hit_rate get_my_hit_rate() const;
cache_hit_rate get_hit_rate(const gms::gossiper& g, gms::inet_address addr);
void drop_hit_rate(gms::inet_address addr);

View File

@@ -2190,9 +2190,13 @@ void table::set_hit_rate(gms::inet_address addr, cache_temperature rate) {
e.last_updated = lowres_clock::now();
}
table::cache_hit_rate table::get_my_hit_rate() const {
return cache_hit_rate { _global_cache_hit_rate, lowres_clock::now()};
}
table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::inet_address addr) {
if (utils::fb_utilities::get_broadcast_address() == addr) {
return cache_hit_rate { _global_cache_hit_rate, lowres_clock::now()};
return get_my_hit_rate();
}
auto it = _cluster_cache_hit_rates.find(addr);
if (it == _cluster_cache_hit_rates.end()) {

View File

@@ -142,6 +142,10 @@ static future<rpc::tuple<Elements..., replica::exception_variant>> encode_replic
return make_exception_future<final_tuple_type>(std::move(eptr));
}
static bool only_me(const inet_address_vector_replica_set& replicas) {
return replicas.size() == 1 && replicas[0] == utils::fb_utilities::get_broadcast_address();
}
// This class handles all communication with other nodes in `storage_proxy`:
// sending and receiving RPCs, checking the state of other nodes (e.g. by accessing gossiper state), fetching schema.
//
@@ -151,26 +155,24 @@ static future<rpc::tuple<Elements..., replica::exception_variant>> encode_replic
// Without it only local queries are available.
class storage_proxy::remote {
storage_proxy& _sp;
migration_manager* _mm;
netw::messaging_service& _ms;
const gms::gossiper& _gossiper;
migration_manager& _mm;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
bool _stopped{false};
public:
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g)
: _sp(sp), _ms(ms), _gossiper(g)
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm)
, _connection_dropped(std::bind_front(&remote::connection_dropped, this))
, _condrop_registration(_ms.when_connection_drops(_connection_dropped))
{}
void init_messaging_service(migration_manager* mm, storage_proxy* sp) {
_mm = mm;
{
ser::storage_proxy_rpc_verbs::register_counter_mutation(&_ms, std::bind_front(&remote::handle_counter_mutation, this));
ser::storage_proxy_rpc_verbs::register_mutation(&_ms, std::bind_front(&remote::receive_mutation_handler, this, sp->_write_smp_service_group));
ser::storage_proxy_rpc_verbs::register_hint_mutation(&_ms, [this, sp] <typename... Args>(Args&&... args) { return receive_mutation_handler(sp->_hints_write_smp_service_group, std::forward<Args>(args)..., std::monostate()); });
ser::storage_proxy_rpc_verbs::register_mutation(&_ms, std::bind_front(&remote::receive_mutation_handler, this, _sp._write_smp_service_group));
ser::storage_proxy_rpc_verbs::register_hint_mutation(&_ms, [this] <typename... Args>(Args&&... args) { return receive_mutation_handler(_sp._hints_write_smp_service_group, std::forward<Args>(args)..., std::monostate()); });
ser::storage_proxy_rpc_verbs::register_paxos_learn(&_ms, std::bind_front(&remote::handle_paxos_learn, this));
ser::storage_proxy_rpc_verbs::register_mutation_done(&_ms, std::bind_front(&remote::handle_mutation_done, this));
ser::storage_proxy_rpc_verbs::register_mutation_failed(&_ms, std::bind_front(&remote::handle_mutation_failed, this));
@@ -184,9 +186,14 @@ public:
ser::storage_proxy_rpc_verbs::register_paxos_prune(&_ms, std::bind_front(&remote::handle_paxos_prune, this));
}
future<> uninit_messaging_service() {
~remote() {
assert(_stopped);
}
// Must call before destroying the `remote` object.
future<> stop() {
co_await ser::storage_proxy_rpc_verbs::unregister(&_ms);
_mm = nullptr;
_stopped = true;
}
const gms::gossiper& gossiper() const {
@@ -197,6 +204,13 @@ public:
return _gossiper.is_alive(ep);
}
// Note: none of the `send_*` functions use `remote` after yielding - by the first yield,
// control is delegated to another service (messaging_service). Thus unfinished `send`s
// do not make it unsafe to destroy the `remote` object.
//
// Running handlers prevent the object from being destroyed,
// assuming `stop()` is called before destruction.
future<> send_mutation(
netw::msg_addr addr, storage_proxy::clock_type::time_point timeout, const std::optional<tracing::trace_info>& trace_info,
const frozen_mutation& m, const inet_address_vector_replica_set& forward, gms::inet_address reply_to, unsigned shard,
@@ -361,12 +375,12 @@ public:
private:
future<schema_ptr> get_schema_for_read(table_schema_version v, netw::msg_addr from, clock_type::time_point timeout) {
abort_on_expiry aoe(timeout);
co_return co_await _mm->get_schema_for_read(std::move(v), std::move(from), _ms, &aoe.abort_source());
co_return co_await _mm.get_schema_for_read(std::move(v), std::move(from), _ms, &aoe.abort_source());
}
future<schema_ptr> get_schema_for_write(table_schema_version v, netw::msg_addr from, clock_type::time_point timeout) {
abort_on_expiry aoe(timeout);
co_return co_await _mm->get_schema_for_write(std::move(v), std::move(from), _ms, &aoe.abort_source());
co_return co_await _mm.get_schema_for_write(std::move(v), std::move(from), _ms, &aoe.abort_source());
}
future<> handle_counter_mutation(
@@ -2702,9 +2716,12 @@ inline std::ostream& operator<<(std::ostream& os, const read_repair_mutation& m)
using namespace std::literals::chrono_literals;
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<replica::database>& db, gms::gossiper& gossiper, storage_proxy::config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory, netw::messaging_service& ms)
storage_proxy::~storage_proxy() {
assert(!_remote);
}
storage_proxy::storage_proxy(distributed<replica::database>& db, storage_proxy::config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory)
: _db(db)
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
@@ -2719,7 +2736,6 @@ storage_proxy::storage_proxy(distributed<replica::database>& db, gms::gossiper&
, _hints_for_views_manager(_db.local().get_config().view_hints_directory(), {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db)
, _stats_key(stats_key)
, _features(feat)
, _remote(std::make_unique<struct remote>(*this, ms, gossiper))
, _background_write_throttle_threahsold(cfg.available_memory / 10)
, _mutate_stage{"storage_proxy_mutate", &storage_proxy::do_mutate}
, _max_view_update_backlog(max_view_update_backlog)
@@ -2736,7 +2752,29 @@ storage_proxy::storage_proxy(distributed<replica::database>& db, gms::gossiper&
}
struct storage_proxy::remote& storage_proxy::remote() {
return *_remote;
return const_cast<struct remote&>(const_cast<const storage_proxy*>(this)->remote());
}
const struct storage_proxy::remote& storage_proxy::remote() const {
if (_remote) {
return *_remote;
}
// This error should not appear because the user should not be able to send queries
// before `remote` is initialized, and user queries should be drained before `remote`
// is destroyed; Scylla code should take care not to perform cluster queries outside
// the lifetime of `remote` (it can still perform queries to local tables during
// the entire lifetime of `storage_proxy`, which is larger than `remote`).
//
// If there's a bug though, fail the query.
//
// In the future we may want to introduce a 'recovery mode' in which Scylla starts
// without contacting the cluster and allows the user to perform local queries (say,
// to system tables), then this code path would be expected to happen if the user
// tries a remote query in this recovery mode, in which case we should change it
// from `on_internal_error` to a regular exception.
on_internal_error(slogger,
"attempted to perform remote query when `storage_proxy::remote` is unavailable");
}
const data_dictionary::database
@@ -3368,7 +3406,9 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
.then(utils::result_into_future<result<>>);
}
static inet_address_vector_replica_set endpoint_filter(const gms::gossiper& g, const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
static inet_address_vector_replica_set endpoint_filter(
const noncopyable_function<bool(const gms::inet_address&)>& is_alive,
const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
// special case for single-node data centers
if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) {
return boost::copy_range<inet_address_vector_replica_set>(endpoints.begin()->second);
@@ -3377,9 +3417,9 @@ static inet_address_vector_replica_set endpoint_filter(const gms::gossiper& g, c
// strip out dead endpoints and localhost
std::unordered_multimap<sstring, gms::inet_address> validated;
auto is_valid = [&g] (gms::inet_address input) {
auto is_valid = [&is_alive] (gms::inet_address input) {
return input != utils::fb_utilities::get_broadcast_address()
&& g.is_alive(input);
&& is_alive(input);
};
for (auto& e : endpoints) {
@@ -3473,8 +3513,8 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
auto local_dc = topology.get_datacenter();
auto& local_endpoints = topology.get_datacenter_racks().at(local_dc);
auto local_rack = topology.get_rack();
auto& gossiper = _p._remote->gossiper();
auto chosen_endpoints = endpoint_filter(gossiper, local_rack, local_endpoints);
auto chosen_endpoints = endpoint_filter(std::bind_front(&storage_proxy::is_alive, &_p),
local_rack, local_endpoints);
if (chosen_endpoints.empty()) {
if (_cl == db::consistency_level::ANY) {
@@ -5106,9 +5146,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
auto cf = _db.local().find_column_family(schema).shared_from_this();
auto& gossiper = _remote->gossiper();
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, *erm, all_replicas, preferred_endpoints, repair_decision,
gossiper,
inet_address_vector_replica_set target_replicas = filter_replicas_for_read(cl, *erm, all_replicas, preferred_endpoints, repair_decision,
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
_db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr);
@@ -5409,12 +5447,11 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
// get stuck on 0 and never increased too much if the number of results remains small.
concurrency_factor = std::max(size_t(1), ranges.size());
auto& gossiper = _remote->gossiper();
while (i != ranges.end()) {
dht::partition_range& range = *i;
inet_address_vector_replica_set live_endpoints = get_endpoints_for_reading(schema->ks_name(), *erm, end_token(range));
inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, *erm, live_endpoints, merged_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set filtered_endpoints = filter_replicas_for_read(cl, *erm, live_endpoints, merged_preferred_replicas, pcf);
std::vector<dht::token_range> merged_ranges{to_token_range(range)};
++i;
@@ -5426,7 +5463,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
dht::partition_range& next_range = *i;
inet_address_vector_replica_set next_endpoints = get_endpoints_for_reading(schema->ks_name(), *erm, end_token(next_range));
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, *erm, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set next_filtered_endpoints = filter_replicas_for_read(cl, *erm, next_endpoints, current_range_preferred_replicas, pcf);
// Origin has this to say here:
// * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
@@ -5471,21 +5508,30 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
break;
}
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, *erm, merged, current_merged_preferred_replicas, gossiper, pcf);
inet_address_vector_replica_set filtered_merged = filter_replicas_for_read(cl, *erm, merged, current_merged_preferred_replicas, pcf);
// Estimate whether merging will be a win or not
if (!is_worth_merging_for_range_query(erm->get_topology(), filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
break;
} else if (pcf) {
// check that merged set hit rate is not to low
auto find_min = [&g = _remote->gossiper(), pcf] (const inet_address_vector_replica_set& range) {
auto find_min = [this, pcf] (const inet_address_vector_replica_set& range) {
if (only_me(range)) {
// The `min_element` call below would return the same thing, but thanks to this branch
// we avoid having to access `remote` - so we can perform local queries without `remote`.
return float(pcf->get_my_hit_rate().rate);
}
// There are nodes other than us in `range`.
struct {
const gms::gossiper& g;
replica::column_family* cf = nullptr;
float operator()(const gms::inet_address& ep) const {
return float(cf->get_hit_rate(g, ep).rate);
}
} ep_to_hr{g, pcf};
} ep_to_hr{remote().gossiper(), pcf};
assert (!range.empty());
return *boost::range::min_element(range | boost::adaptors::transformed(ep_to_hr));
};
auto merged = find_min(filtered_merged) * 1.2; // give merged set 20% boost
@@ -6025,8 +6071,40 @@ inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const s
return std::move(*endpoints);
}
// `live_endpoints` must already contain only replicas for this query; the function only filters out some of them.
inet_address_vector_replica_set
storage_proxy::filter_replicas_for_read(
db::consistency_level cl,
const locator::effective_replication_map& erm,
inet_address_vector_replica_set live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
db::read_repair_decision repair_decision,
std::optional<gms::inet_address>* extra,
replica::column_family* cf) const {
if (live_endpoints.empty() || only_me(live_endpoints)) {
// `db::filter_for_query` would return the same thing, but thanks to this branch we avoid having
// to access `remote` - so we can perform local queries without the need of `remote`.
return live_endpoints;
}
// There are nodes other than us in `live_endpoints`.
auto& gossiper = remote().gossiper();
return db::filter_for_query(cl, erm, std::move(live_endpoints), preferred_endpoints, repair_decision, gossiper, extra, cf);
}
inet_address_vector_replica_set
storage_proxy::filter_replicas_for_read(
db::consistency_level cl,
const locator::effective_replication_map& erm,
const inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
replica::column_family* cf) const {
return filter_replicas_for_read(cl, erm, live_endpoints, preferred_endpoints, db::read_repair_decision::NONE, nullptr, cf);
}
bool storage_proxy::is_alive(const gms::inet_address& ep) const {
return _remote->is_alive(ep);
return _remote ? _remote->is_alive(ep) : (ep == utils::fb_utilities::get_broadcast_address());
}
inet_address_vector_replica_set storage_proxy::intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2) {
@@ -6050,12 +6128,13 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std:
return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms);
}
void storage_proxy::init_messaging_service(migration_manager* mm) {
_remote->init_messaging_service(mm, this);
void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm) {
_remote = std::make_unique<struct remote>(*this, ms, g, mm);
}
future<> storage_proxy::uninit_messaging_service() {
return _remote->uninit_messaging_service();
future<> storage_proxy::stop_remote() {
co_await _remote->stop();
_remote = nullptr;
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>

View File

@@ -330,6 +330,9 @@ private:
db::hints::manager& hints_manager_for(db::write_type type);
void sort_endpoints_by_proximity(const locator::topology& topo, inet_address_vector_replica_set& eps) const;
inet_address_vector_replica_set get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const;
inet_address_vector_replica_set filter_replicas_for_read(db::consistency_level, const locator::effective_replication_map&, inet_address_vector_replica_set live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, db::read_repair_decision, std::optional<gms::inet_address>* extra, replica::column_family*) const;
// As above with read_repair_decision=NONE, extra=nullptr.
inet_address_vector_replica_set filter_replicas_for_read(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set& live_endpoints, const inet_address_vector_replica_set& preferred_endpoints, replica::column_family*) const;
bool is_alive(const gms::inet_address&) const;
db::read_repair_decision new_read_repair_decision(const schema& s);
result<::shared_ptr<abstract_read_executor>> get_read_executor(lw_shared_ptr<query::read_command> cmd,
@@ -445,12 +448,10 @@ private:
inet_address_vector_replica_set& l2) const;
public:
storage_proxy(distributed<replica::database>& db, gms::gossiper& gossiper, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory, netw::messaging_service& ms);
storage_proxy(distributed<replica::database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory);
~storage_proxy();
remote& remote();
const distributed<replica::database>& get_db() const {
return _db;
}
@@ -481,10 +482,16 @@ public:
}
return next;
}
void init_messaging_service(migration_manager*);
future<> uninit_messaging_service();
// Start/stop the remote part of `storage_proxy` that is required for performing distributed queries.
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&);
future<> stop_remote();
private:
// Throws an error if remote is not initialized.
const struct remote& remote() const;
struct remote& remote();
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);

View File

@@ -573,6 +573,10 @@ public:
auto stop_configurables = defer([&] { notify_set.notify_all(configurable::system_state::stopped).get(); });
gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features);
feature_service.start(fcfg).get();
auto stop_feature_service = defer([&] { feature_service.stop().get(); });
sharded<locator::snitch_ptr> snitch;
snitch.start(locator::snitch_config{}).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
@@ -594,6 +598,76 @@ public:
mm_notif.start().get();
auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
sharded<sstables::directory_semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer([&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();
});
replica::database_config dbcfg;
if (cfg_in.dbcfg) {
dbcfg = std::move(*cfg_in.dbcfg);
} else {
dbcfg.available_memory = memory::stats().total_memory();
}
dbcfg.compaction_scheduling_group = scheduling_groups.compaction_scheduling_group;
dbcfg.memory_compaction_scheduling_group = scheduling_groups.memory_compaction_scheduling_group;
dbcfg.streaming_scheduling_group = scheduling_groups.streaming_scheduling_group;
dbcfg.statement_scheduling_group = scheduling_groups.statement_scheduling_group;
dbcfg.memtable_scheduling_group = scheduling_groups.memtable_scheduling_group;
dbcfg.memtable_to_cache_scheduling_group = scheduling_groups.memtable_to_cache_scheduling_group;
dbcfg.gossip_scheduling_group = scheduling_groups.gossip_scheduling_group;
dbcfg.sstables_format = sstables::version_from_string(cfg->sstable_format());
auto get_tm_cfg = sharded_parameter([&] {
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
};
});
task_manager.start(std::move(get_tm_cfg), std::ref(abort_sources)).get();
auto stop_task_manager = defer([&task_manager] {
task_manager.stop().get();
});
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {
return compaction_manager::config {
.compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group},
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group},
.available_memory = dbcfg.available_memory,
.static_shares = cfg->compaction_static_shares,
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
};
});
cm.start(std::move(get_cm_cfg), std::ref(abort_sources), std::ref(task_manager)).get();
auto stop_cm = deferred_stop(cm);
sstm.start(std::ref(*cfg)).get();
auto stop_sstm = deferred_stop(sstm);
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_db = defer([&db] {
db.stop().get();
});
db.invoke_on_all(&replica::database::start).get();
smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
}).get();
service::storage_proxy::config spcfg {
.hints_directory_initializer = db::hints::directory_initializer::make_dummy(),
};
spcfg.available_memory = memory::stats().total_memory();
db::view::node_update_backlog b(smp::count, 10ms);
scheduling_group_key_config sg_conf =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
proxy.start(std::ref(db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
sharded<service::endpoint_lifecycle_notifier> elc_notif;
elc_notif.start().get();
auto stop_elc_notif = defer([&elc_notif] { elc_notif.stop().get(); });
@@ -625,10 +699,6 @@ public:
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features);
feature_service.start(fcfg).get();
auto stop_feature_service = defer([&] { feature_service.stop().get(); });
sharded<gms::gossiper> gossiper;
// Init gossiper
@@ -696,80 +766,10 @@ public:
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer([&stream_manager] { stream_manager.stop().get(); });
sharded<sstables::directory_semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer([&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();
});
replica::database_config dbcfg;
if (cfg_in.dbcfg) {
dbcfg = std::move(*cfg_in.dbcfg);
} else {
dbcfg.available_memory = memory::stats().total_memory();
}
dbcfg.compaction_scheduling_group = scheduling_groups.compaction_scheduling_group;
dbcfg.memory_compaction_scheduling_group = scheduling_groups.memory_compaction_scheduling_group;
dbcfg.streaming_scheduling_group = scheduling_groups.streaming_scheduling_group;
dbcfg.statement_scheduling_group = scheduling_groups.statement_scheduling_group;
dbcfg.memtable_scheduling_group = scheduling_groups.memtable_scheduling_group;
dbcfg.memtable_to_cache_scheduling_group = scheduling_groups.memtable_to_cache_scheduling_group;
dbcfg.gossip_scheduling_group = scheduling_groups.gossip_scheduling_group;
dbcfg.sstables_format = sstables::version_from_string(cfg->sstable_format());
auto get_tm_cfg = sharded_parameter([&] {
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
};
});
task_manager.start(std::move(get_tm_cfg), std::ref(abort_sources)).get();
auto stop_task_manager = defer([&task_manager] {
task_manager.stop().get();
});
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {
return compaction_manager::config {
.compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group},
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group},
.available_memory = dbcfg.available_memory,
.static_shares = cfg->compaction_static_shares,
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
};
});
cm.start(std::move(get_cm_cfg), std::ref(abort_sources), std::ref(task_manager)).get();
auto stop_cm = deferred_stop(cm);
sstm.start(std::ref(*cfg)).get();
auto stop_sstm = deferred_stop(sstm);
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(cm), std::ref(sstm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
auto stop_db = defer([&db] {
db.stop().get();
});
db.invoke_on_all(&replica::database::start).get();
feature_service.invoke_on_all([] (auto& fs) {
return fs.enable(fs.supported_feature_set());
}).get();
smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
}).get();
service::storage_proxy::config spcfg {
.hints_directory_initializer = db::hints::directory_initializer::make_dummy(),
};
spcfg.available_memory = memory::stats().total_memory();
db::view::node_update_backlog b(smp::count, 10ms);
scheduling_group_key_config sg_conf =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
proxy.start(std::ref(db), std::ref(gossiper), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory), std::ref(ms)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
forward_service.start(std::ref(ms), std::ref(proxy), std::ref(db), std::ref(token_metadata)).get();
auto stop_forward_service = defer([&forward_service] { forward_service.stop().get(); });