Merge 'Relax cql_test_env services maintenance' from Pavel Emelyanov
To add a sharded service to the cql_test_env one needs to patch it in 5 or 6 places - add cql_test_env reference - add cql_test_env constructor argument - initialize the reference in initializer list - add service variable to do_with method - pass the variable to cql_test_env constructor - (optionally) export it via cql_test_env public method Steps 1 through 5 are annoying, things get much simpler if look like - add cql_test_env variable - (optionally) export it via cql_test_env public method This is what this PR does refs: #2795 Closes #15028 * github.com:scylladb/scylladb: cql_test_env: Drop local *this reference cql_test_env: Drop local references cql_test_env: Move most of the stuff in run_in_thread() cql_test_env: Open-code env start/stop and remove both cql_test_env: Keep other services as class variables cql_test_env: Keep services as class variables cql_test_env: Construct env early cql_test_env: De-static fdpinger variable cql_test_env: Define all services' variables early cql_test_env: Keep group0_client pointer
This commit is contained in:
@@ -130,23 +130,44 @@ public:
|
||||
static constexpr std::string_view ks_name = "ks";
|
||||
static std::atomic<bool> active;
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
sharded<gms::feature_service>& _feature_service;
|
||||
sharded<sstables::storage_manager>& _sstm;
|
||||
sharded<service::storage_proxy>& _proxy;
|
||||
sharded<cql3::query_processor>& _qp;
|
||||
sharded<auth::service>& _auth_service;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
sharded<service::migration_notifier>& _mnotifier;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
sharded<db::batchlog_manager>& _batchlog_manager;
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
service::raft_group0_client& _group0_client;
|
||||
sharded<service::raft_group_registry>& _group0_registry;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
sharded<service::tablet_allocator>& _tablet_allocator;
|
||||
sharded<replica::database> _db;
|
||||
sharded<gms::feature_service> _feature_service;
|
||||
sharded<sstables::storage_manager> _sstm;
|
||||
sharded<service::storage_proxy> _proxy;
|
||||
sharded<cql3::query_processor> _qp;
|
||||
sharded<auth::service> _auth_service;
|
||||
sharded<db::view::view_builder> _view_builder;
|
||||
sharded<db::view::view_update_generator> _view_update_generator;
|
||||
sharded<service::migration_notifier> _mnotifier;
|
||||
sharded<qos::service_level_controller> _sl_controller;
|
||||
sharded<service::migration_manager> _mm;
|
||||
sharded<db::batchlog_manager> _batchlog_manager;
|
||||
sharded<gms::gossiper> _gossiper;
|
||||
sharded<service::raft_group_registry> _group0_registry;
|
||||
sharded<db::system_keyspace> _sys_ks;
|
||||
sharded<service::tablet_allocator> _tablet_allocator;
|
||||
sharded<db::system_distributed_keyspace> _sys_dist_ks;
|
||||
sharded<locator::snitch_ptr> _snitch;
|
||||
sharded<compaction_manager> _cm;
|
||||
sharded<tasks::task_manager> _task_manager;
|
||||
sharded<netw::messaging_service> _ms;
|
||||
sharded<service::storage_service> _ss;
|
||||
sharded<locator::shared_token_metadata> _token_metadata;
|
||||
sharded<locator::effective_replication_map_factory> _erm_factory;
|
||||
sharded<sstables::directory_semaphore> _sst_dir_semaphore;
|
||||
sharded<wasm::manager> _wasm;
|
||||
sharded<cql3::cql_config> _cql_config;
|
||||
sharded<service::endpoint_lifecycle_notifier> _elc_notif;
|
||||
sharded<cdc::generation_service> _cdc_generation_service;
|
||||
sharded<repair_service> _repair;
|
||||
sharded<streaming::stream_manager> _stream_manager;
|
||||
sharded<service::forward_service> _forward_service;
|
||||
sharded<direct_failure_detector::failure_detector> _fd;
|
||||
sharded<service::raft_address_map> _raft_address_map;
|
||||
sharded<service::direct_fd_pinger> _fd_pinger;
|
||||
sharded<cdc::cdc_service> _cdc;
|
||||
|
||||
service::raft_group0_client* _group0_client;
|
||||
|
||||
private:
|
||||
struct core_local_state {
|
||||
@@ -187,41 +208,7 @@ private:
|
||||
}
|
||||
}
|
||||
public:
|
||||
single_node_cql_env(
|
||||
sharded<replica::database>& db,
|
||||
sharded<gms::feature_service>& feature_service,
|
||||
sharded<sstables::storage_manager>& sstm,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
sharded<auth::service>& auth_service,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<service::migration_notifier>& mnotifier,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<qos::service_level_controller> &sl_controller,
|
||||
sharded<db::batchlog_manager>& batchlog_manager,
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
service::raft_group0_client& client,
|
||||
sharded<service::raft_group_registry>& group0_registry,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<service::tablet_allocator>& tablet_allocator)
|
||||
: _db(db)
|
||||
, _feature_service(feature_service)
|
||||
, _sstm(sstm)
|
||||
, _proxy(proxy)
|
||||
, _qp(qp)
|
||||
, _auth_service(auth_service)
|
||||
, _view_builder(view_builder)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _mnotifier(mnotifier)
|
||||
, _sl_controller(sl_controller)
|
||||
, _mm(mm)
|
||||
, _batchlog_manager(batchlog_manager)
|
||||
, _gossiper(gossiper)
|
||||
, _group0_client(client)
|
||||
, _group0_registry(group0_registry)
|
||||
, _sys_ks(sys_ks)
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
single_node_cql_env()
|
||||
{
|
||||
adjust_rlimit();
|
||||
}
|
||||
@@ -411,7 +398,7 @@ public:
|
||||
}
|
||||
|
||||
virtual service::raft_group0_client& get_raft_group0_client() override {
|
||||
return _group0_client;
|
||||
return *_group0_client;
|
||||
}
|
||||
|
||||
virtual sharded<service::raft_group_registry>& get_raft_group_registry() override {
|
||||
@@ -444,29 +431,13 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
future<> start() {
|
||||
return _core_local.start(std::ref(_auth_service), std::ref(_sl_controller));
|
||||
}
|
||||
|
||||
future<> stop() override {
|
||||
return _core_local.stop();
|
||||
}
|
||||
|
||||
future<> create_keyspace(std::string_view name) {
|
||||
auto query = format("create keyspace {} with replication = {{ 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'replication_factor' : 1 }};", name);
|
||||
return execute_cql(query).discard_result();
|
||||
}
|
||||
|
||||
static future<> do_with(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
|
||||
using namespace std::filesystem;
|
||||
|
||||
return seastar::async([cfg_in = std::move(cfg_in), init_configurables = std::move(init_configurables), func] {
|
||||
// disable reactor stall detection during startup
|
||||
auto blocked_reactor_notify_ms = engine().get_blocked_reactor_notify_ms();
|
||||
smp::invoke_on_all([] {
|
||||
engine().update_blocked_reactor_notify_ms(std::chrono::milliseconds(1000000));
|
||||
}).get();
|
||||
|
||||
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
|
||||
bool old_active = false;
|
||||
if (!active.compare_exchange_strong(old_active, true)) {
|
||||
@@ -488,15 +459,27 @@ public:
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
|
||||
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
|
||||
|
||||
single_node_cql_env env;
|
||||
env.run_in_thread(std::move(func), std::move(cfg_in), std::move(init_configurables));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
void run_in_thread(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
|
||||
using namespace std::filesystem;
|
||||
|
||||
// disable reactor stall detection during startup
|
||||
auto blocked_reactor_notify_ms = engine().get_blocked_reactor_notify_ms();
|
||||
smp::invoke_on_all([] {
|
||||
engine().update_blocked_reactor_notify_ms(std::chrono::milliseconds(1000000));
|
||||
}).get();
|
||||
|
||||
sharded<abort_source> abort_sources;
|
||||
abort_sources.start().get();
|
||||
// FIXME: handle signals (SIGINT, SIGTERM) - request aborts
|
||||
auto stop_abort_sources = defer([&] { abort_sources.stop().get(); });
|
||||
sharded<compaction_manager> cm;
|
||||
sharded<sstables::storage_manager> sstm;
|
||||
sharded<tasks::task_manager> task_manager;
|
||||
sharded<replica::database> db;
|
||||
debug::the_database = &db;
|
||||
|
||||
debug::the_database = &_db;
|
||||
auto reset_db_ptr = defer([] {
|
||||
debug::the_database = nullptr;
|
||||
});
|
||||
@@ -541,17 +524,9 @@ public:
|
||||
|
||||
auto scheduling_groups = get_scheduling_groups().get();
|
||||
|
||||
sharded<cql3::query_processor> qp;
|
||||
sharded<gms::feature_service> feature_service;
|
||||
sharded<netw::messaging_service> ms;
|
||||
distributed<service::migration_manager> mm;
|
||||
sharded<service::storage_service> ss;
|
||||
distributed<db::batchlog_manager> bm;
|
||||
sharded<service::storage_proxy> proxy;
|
||||
|
||||
auto notify_set = init_configurables
|
||||
? configurable::init_all(*cfg, init_configurables->extensions, service_set(
|
||||
db, ss, mm, proxy, feature_service, ms, qp, bm
|
||||
_db, _ss, _mm, _proxy, _feature_service, _ms, _qp, _batchlog_manager
|
||||
)).get0()
|
||||
: configurable::notify_set{}
|
||||
;
|
||||
@@ -559,33 +534,28 @@ public:
|
||||
auto stop_configurables = defer([&] { notify_set.notify_all(configurable::system_state::stopped).get(); });
|
||||
|
||||
gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features);
|
||||
feature_service.start(fcfg).get();
|
||||
auto stop_feature_service = defer([&] { feature_service.stop().get(); });
|
||||
_feature_service.start(fcfg).get();
|
||||
auto stop_feature_service = defer([this] { _feature_service.stop().get(); });
|
||||
|
||||
sharded<locator::snitch_ptr> snitch;
|
||||
snitch.start(locator::snitch_config{}).get();
|
||||
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
||||
snitch.invoke_on_all(&locator::snitch_ptr::start).get();
|
||||
_snitch.start(locator::snitch_config{}).get();
|
||||
auto stop_snitch = defer([this] { _snitch.stop().get(); });
|
||||
_snitch.invoke_on_all(&locator::snitch_ptr::start).get();
|
||||
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
locator::token_metadata::config tm_cfg;
|
||||
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
|
||||
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_token_metadata = defer([&token_metadata] { token_metadata.stop().get(); });
|
||||
tm_cfg.topo_cfg.local_dc_rack = { _snitch.local()->get_datacenter(), _snitch.local()->get_rack() };
|
||||
_token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_token_metadata = defer([this] { _token_metadata.stop().get(); });
|
||||
|
||||
sharded<locator::effective_replication_map_factory> erm_factory;
|
||||
erm_factory.start().get();
|
||||
auto stop_erm_factory = deferred_stop(erm_factory);
|
||||
_erm_factory.start().get();
|
||||
auto stop_erm_factory = deferred_stop(_erm_factory);
|
||||
|
||||
sharded<service::migration_notifier> mm_notif;
|
||||
mm_notif.start().get();
|
||||
auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
|
||||
_mnotifier.start().get();
|
||||
auto stop_mm_notify = defer([this] { _mnotifier.stop().get(); });
|
||||
|
||||
sharded<sstables::directory_semaphore> sst_dir_semaphore;
|
||||
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
|
||||
auto stop_sst_dir_sem = defer([&sst_dir_semaphore] {
|
||||
sst_dir_semaphore.stop().get();
|
||||
_sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
|
||||
auto stop_sst_dir_sem = defer([this] {
|
||||
_sst_dir_semaphore.stop().get();
|
||||
});
|
||||
|
||||
replica::database_config dbcfg;
|
||||
@@ -609,9 +579,9 @@ public:
|
||||
.task_ttl = cfg->task_ttl_seconds,
|
||||
};
|
||||
});
|
||||
task_manager.start(std::move(get_tm_cfg), std::ref(abort_sources)).get();
|
||||
auto stop_task_manager = defer([&task_manager] {
|
||||
task_manager.stop().get();
|
||||
_task_manager.start(std::move(get_tm_cfg), std::ref(abort_sources)).get();
|
||||
auto stop_task_manager = defer([this] {
|
||||
_task_manager.stop().get();
|
||||
});
|
||||
|
||||
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
|
||||
@@ -625,28 +595,27 @@ public:
|
||||
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
|
||||
};
|
||||
});
|
||||
cm.start(std::move(get_cm_cfg), std::ref(abort_sources), std::ref(task_manager)).get();
|
||||
auto stop_cm = deferred_stop(cm);
|
||||
_cm.start(std::move(get_cm_cfg), std::ref(abort_sources), std::ref(_task_manager)).get();
|
||||
auto stop_cm = deferred_stop(_cm);
|
||||
|
||||
sstm.start(std::ref(*cfg)).get();
|
||||
auto stop_sstm = deferred_stop(sstm);
|
||||
_sstm.start(std::ref(*cfg)).get();
|
||||
auto stop_sstm = deferred_stop(_sstm);
|
||||
|
||||
std::optional<wasm::startup_context> wasm_ctx;
|
||||
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
|
||||
wasm_ctx.emplace(*cfg, dbcfg);
|
||||
}
|
||||
|
||||
sharded<wasm::manager> wasm;
|
||||
wasm.start(std::ref(wasm_ctx)).get();
|
||||
auto stop_wasm = defer([&wasm] { wasm.stop().get(); });
|
||||
_wasm.start(std::ref(wasm_ctx)).get();
|
||||
auto stop_wasm = defer([this] { _wasm.stop().get(); });
|
||||
|
||||
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(cm), std::ref(sstm), std::ref(wasm), std::ref(sst_dir_semaphore), utils::cross_shard_barrier()).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_wasm), std::ref(_sst_dir_semaphore), utils::cross_shard_barrier()).get();
|
||||
auto stop_db = defer([this] {
|
||||
_db.stop().get();
|
||||
});
|
||||
|
||||
db.invoke_on_all(&replica::database::start).get();
|
||||
_db.invoke_on_all(&replica::database::start).get();
|
||||
|
||||
smp::invoke_on_all([blocked_reactor_notify_ms] {
|
||||
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
|
||||
@@ -659,12 +628,11 @@ public:
|
||||
db::view::node_update_backlog b(smp::count, 10ms);
|
||||
scheduling_group_key_config sg_conf =
|
||||
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
|
||||
proxy.start(std::ref(db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory)).get();
|
||||
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
|
||||
_proxy.start(std::ref(_db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_erm_factory)).get();
|
||||
auto stop_proxy = defer([this] { _proxy.stop().get(); });
|
||||
|
||||
sharded<cql3::cql_config> cql_config;
|
||||
cql_config.start(cql3::cql_config::default_tag{}).get();
|
||||
auto stop_cql_config = defer([&] { cql_config.stop().get(); });
|
||||
_cql_config.start(cql3::cql_config::default_tag{}).get();
|
||||
auto stop_cql_config = defer([this] { _cql_config.stop().get(); });
|
||||
|
||||
cql3::query_processor::memory_config qp_mcfg;
|
||||
if (cfg_in.qp_mcfg) {
|
||||
@@ -672,7 +640,7 @@ public:
|
||||
} else {
|
||||
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
}
|
||||
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db));
|
||||
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(_db));
|
||||
|
||||
utils::loading_cache_config auth_prep_cache_config;
|
||||
auth_prep_cache_config.max_size = qp_mcfg.authorized_prepared_cache_size;
|
||||
@@ -680,48 +648,40 @@ public:
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(cql3::prepared_statements_cache::entry_expiry));
|
||||
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
|
||||
|
||||
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), qp_mcfg, std::ref(cql_config), auth_prep_cache_config, std::ref(wasm)).get();
|
||||
auto stop_qp = defer([&qp] { qp.stop().get(); });
|
||||
_qp.start(std::ref(_proxy), std::move(local_data_dict), std::ref(_mnotifier), qp_mcfg, std::ref(_cql_config), auth_prep_cache_config, std::ref(_wasm)).get();
|
||||
auto stop_qp = defer([this] { _qp.stop().get(); });
|
||||
|
||||
sharded<service::endpoint_lifecycle_notifier> elc_notif;
|
||||
elc_notif.start().get();
|
||||
auto stop_elc_notif = defer([&elc_notif] { elc_notif.stop().get(); });
|
||||
|
||||
sharded<auth::service> auth_service;
|
||||
_elc_notif.start().get();
|
||||
auto stop_elc_notif = defer([this] { _elc_notif.stop().get(); });
|
||||
|
||||
set_abort_on_internal_error(true);
|
||||
const gms::inet_address listen("127.0.0.1");
|
||||
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
|
||||
auto sl_controller = sharded<qos::service_level_controller>();
|
||||
sl_controller.start(std::ref(auth_service), qos::service_level_options{}).get();
|
||||
auto stop_sl_controller = defer([&sl_controller] { sl_controller.stop().get(); });
|
||||
sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
|
||||
_sl_controller.start(std::ref(_auth_service), qos::service_level_options{}).get();
|
||||
auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); });
|
||||
_sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
|
||||
|
||||
auto sys_ks = seastar::sharded<db::system_keyspace>();
|
||||
sys_ks.start(std::ref(qp), std::ref(db), std::ref(snitch)).get();
|
||||
auto stop_sys_kd = defer([&sys_ks] { sys_ks.stop().get(); });
|
||||
_sys_ks.start(std::ref(_qp), std::ref(_db), std::ref(_snitch)).get();
|
||||
auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); });
|
||||
for (const auto p: all_system_table_load_phases) {
|
||||
replica::distributed_loader::init_system_keyspace(sys_ks, erm_factory, db, *cfg, p).get();
|
||||
replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db, *cfg, p).get();
|
||||
}
|
||||
|
||||
if (!cfg->host_id) {
|
||||
cfg->host_id = sys_ks.local().load_local_host_id().get0();
|
||||
cfg->host_id = _sys_ks.local().load_local_host_id().get0();
|
||||
}
|
||||
|
||||
// don't start listening so tests can be run in parallel
|
||||
ms.start(cfg->host_id, listen, std::move(7000)).get();
|
||||
auto stop_ms = defer([&ms] { ms.stop().get(); });
|
||||
_ms.start(cfg->host_id, listen, std::move(7000)).get();
|
||||
auto stop_ms = defer([this] { _ms.stop().get(); });
|
||||
|
||||
// Normally the auth server is already stopped in here,
|
||||
// but if there is an initialization failure we have to
|
||||
// make sure to stop it now or ~sharded will assert.
|
||||
auto stop_auth_server = defer([&auth_service] {
|
||||
auth_service.stop().get();
|
||||
auto stop_auth_server = defer([this] {
|
||||
_auth_service.stop().get();
|
||||
});
|
||||
|
||||
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
|
||||
|
||||
sharded<gms::gossiper> gossiper;
|
||||
auto stop_sys_dist_ks = defer([this] { _sys_dist_ks.stop().get(); });
|
||||
|
||||
// Init gossiper
|
||||
std::set<gms::inet_address> seeds;
|
||||
@@ -743,159 +703,147 @@ public:
|
||||
gcfg.cluster_name = "Test Cluster";
|
||||
gcfg.seeds = std::move(seeds);
|
||||
gcfg.skip_wait_for_gossip_to_settle = 0;
|
||||
gossiper.start(std::ref(abort_sources), std::ref(token_metadata), std::ref(ms), std::ref(*cfg), std::move(gcfg)).get();
|
||||
auto stop_ms_fd_gossiper = defer([&gossiper] {
|
||||
gossiper.stop().get();
|
||||
_gossiper.start(std::ref(abort_sources), std::ref(_token_metadata), std::ref(_ms), std::ref(*cfg), std::move(gcfg)).get();
|
||||
auto stop_ms_fd_gossiper = defer([this] {
|
||||
_gossiper.stop().get();
|
||||
});
|
||||
gossiper.invoke_on_all(&gms::gossiper::start).get();
|
||||
_gossiper.invoke_on_all(&gms::gossiper::start).get();
|
||||
|
||||
sharded<db::view::view_update_generator> view_update_generator;
|
||||
sharded<cdc::generation_service> cdc_generation_service;
|
||||
sharded<repair_service> repair;
|
||||
sharded<service::raft_group_registry> raft_gr;
|
||||
sharded<streaming::stream_manager> stream_manager;
|
||||
sharded<service::forward_service> forward_service;
|
||||
sharded<direct_failure_detector::failure_detector> fd;
|
||||
sharded<service::raft_address_map> raft_address_map;
|
||||
|
||||
raft_address_map.start().get();
|
||||
auto stop_address_map = defer([&raft_address_map] {
|
||||
raft_address_map.stop().get();
|
||||
_raft_address_map.start().get();
|
||||
auto stop_address_map = defer([this] {
|
||||
_raft_address_map.stop().get();
|
||||
});
|
||||
|
||||
|
||||
static sharded<service::direct_fd_pinger> fd_pinger;
|
||||
fd_pinger.start(std::ref(ms), std::ref(raft_address_map)).get();
|
||||
auto stop_fd_pinger = defer([] { fd_pinger.stop().get(); });
|
||||
_fd_pinger.start(std::ref(_ms), std::ref(_raft_address_map)).get();
|
||||
auto stop_fd_pinger = defer([this] { _fd_pinger.stop().get(); });
|
||||
|
||||
service::direct_fd_clock fd_clock;
|
||||
fd.start(
|
||||
std::ref(fd_pinger), std::ref(fd_clock),
|
||||
_fd.start(
|
||||
std::ref(_fd_pinger), std::ref(fd_clock),
|
||||
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get();
|
||||
|
||||
auto stop_fd = defer([&fd] {
|
||||
fd.stop().get();
|
||||
auto stop_fd = defer([this] {
|
||||
_fd.stop().get();
|
||||
});
|
||||
|
||||
raft_gr.start(cfg->consistent_cluster_management(),
|
||||
_group0_registry.start(cfg->consistent_cluster_management(),
|
||||
raft::server_id{cfg->host_id.id},
|
||||
std::ref(raft_address_map),
|
||||
std::ref(ms), std::ref(gossiper), std::ref(fd)).get();
|
||||
auto stop_raft_gr = deferred_stop(raft_gr);
|
||||
std::ref(_raft_address_map),
|
||||
std::ref(_ms), std::ref(_gossiper), std::ref(_fd)).get();
|
||||
auto stop_raft_gr = deferred_stop(_group0_registry);
|
||||
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer([&stream_manager] { stream_manager.stop().get(); });
|
||||
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer([this] { _stream_manager.stop().get(); });
|
||||
|
||||
feature_service.invoke_on_all([] (auto& fs) {
|
||||
_feature_service.invoke_on_all([] (auto& fs) {
|
||||
return fs.enable(fs.supported_feature_set());
|
||||
}).get();
|
||||
|
||||
forward_service.start(std::ref(ms), std::ref(proxy), std::ref(db), std::ref(token_metadata)).get();
|
||||
auto stop_forward_service = defer([&forward_service] { forward_service.stop().get(); });
|
||||
_forward_service.start(std::ref(_ms), std::ref(_proxy), std::ref(_db), std::ref(_token_metadata)).get();
|
||||
auto stop_forward_service = defer([this] { _forward_service.stop().get(); });
|
||||
|
||||
// gropu0 client exists only on shard 0
|
||||
service::raft_group0_client group0_client(raft_gr.local(), sys_ks.local());
|
||||
service::raft_group0_client group0_client(_group0_registry.local(), _sys_ks.local());
|
||||
|
||||
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
|
||||
auto stop_mm = defer([&mm] { mm.stop().get(); });
|
||||
_mm.start(std::ref(_mnotifier), std::ref(_feature_service), std::ref(_ms), std::ref(_proxy), std::ref(_gossiper), std::ref(group0_client), std::ref(_sys_ks)).get();
|
||||
auto stop_mm = defer([this] { _mm.stop().get(); });
|
||||
|
||||
distributed<service::tablet_allocator> the_tablet_allocator;
|
||||
the_tablet_allocator.start(std::ref(mm_notif), std::ref(db)).get();
|
||||
auto stop_tablet_allocator = defer([&] {
|
||||
the_tablet_allocator.stop().get();
|
||||
_tablet_allocator.start(std::ref(_mnotifier), std::ref(_db)).get();
|
||||
auto stop_tablet_allocator = defer([this] {
|
||||
_tablet_allocator.stop().get();
|
||||
});
|
||||
|
||||
qp.invoke_on_all([&mm, &forward_service, &group0_client] (cql3::query_processor& qp) {
|
||||
qp.start_remote(mm.local(), forward_service.local(), group0_client);
|
||||
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
|
||||
qp.start_remote(_mm.local(), _forward_service.local(), group0_client);
|
||||
}).get();
|
||||
auto stop_qp_remote = defer([&qp] {
|
||||
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
|
||||
auto stop_qp_remote = defer([this] {
|
||||
_qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
|
||||
});
|
||||
|
||||
db::batchlog_manager_config bmcfg;
|
||||
bmcfg.replay_rate = 100000000;
|
||||
bmcfg.write_request_timeout = 2s;
|
||||
bm.start(std::ref(qp), std::ref(sys_ks), bmcfg).get();
|
||||
auto stop_bm = defer([&bm] {
|
||||
bm.stop().get();
|
||||
_batchlog_manager.start(std::ref(_qp), std::ref(_sys_ks), bmcfg).get();
|
||||
auto stop_bm = defer([this] {
|
||||
_batchlog_manager.stop().get();
|
||||
});
|
||||
|
||||
service::raft_group0 group0_service{
|
||||
abort_sources.local(), raft_gr.local(), ms,
|
||||
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client};
|
||||
abort_sources.local(), _group0_registry.local(), _ms,
|
||||
_gossiper.local(), _feature_service.local(), _sys_ks.local(), group0_client};
|
||||
|
||||
ss.start(std::ref(abort_sources), std::ref(db),
|
||||
std::ref(gossiper),
|
||||
std::ref(sys_ks),
|
||||
std::ref(feature_service), std::ref(mm),
|
||||
std::ref(token_metadata), std::ref(erm_factory), std::ref(ms),
|
||||
std::ref(repair),
|
||||
std::ref(stream_manager),
|
||||
std::ref(elc_notif),
|
||||
std::ref(bm),
|
||||
std::ref(snitch),
|
||||
std::ref(the_tablet_allocator)).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
_ss.start(std::ref(abort_sources), std::ref(_db),
|
||||
std::ref(_gossiper),
|
||||
std::ref(_sys_ks),
|
||||
std::ref(_feature_service), std::ref(_mm),
|
||||
std::ref(_token_metadata), std::ref(_erm_factory), std::ref(_ms),
|
||||
std::ref(_repair),
|
||||
std::ref(_stream_manager),
|
||||
std::ref(_elc_notif),
|
||||
std::ref(_batchlog_manager),
|
||||
std::ref(_snitch),
|
||||
std::ref(_tablet_allocator)).get();
|
||||
auto stop_storage_service = defer([this] { _ss.stop().get(); });
|
||||
|
||||
ss.invoke_on_all([&] (service::storage_service& ss) {
|
||||
ss.set_query_processor(qp.local());
|
||||
_ss.invoke_on_all([this] (service::storage_service& ss) {
|
||||
ss.set_query_processor(_qp.local());
|
||||
}).get();
|
||||
sys_ks.invoke_on_all([&db, &ss, &gossiper, &raft_gr, &cfg] (db::system_keyspace& sys_ks) {
|
||||
return sys_ks.initialize_virtual_tables(db, ss, gossiper, raft_gr, *cfg);
|
||||
_sys_ks.invoke_on_all([this, &cfg] (db::system_keyspace& sys_ks) {
|
||||
return sys_ks.initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, *cfg);
|
||||
}).get();
|
||||
|
||||
replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
|
||||
replica::distributed_loader::init_non_system_keyspaces(_db, _proxy, _sys_ks).get();
|
||||
|
||||
db.invoke_on_all([] (replica::database& db) {
|
||||
_db.invoke_on_all([] (replica::database& db) {
|
||||
db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr<replica::table> table) {
|
||||
replica::table& t = *table;
|
||||
t.enable_auto_compaction();
|
||||
});
|
||||
}).get();
|
||||
|
||||
if (raft_gr.local().is_enabled()) {
|
||||
raft_gr.invoke_on_all([] (service::raft_group_registry& raft_gr) {
|
||||
if (_group0_registry.local().is_enabled()) {
|
||||
_group0_registry.invoke_on_all([] (service::raft_group_registry& raft_gr) {
|
||||
return raft_gr.start();
|
||||
}).get();
|
||||
}
|
||||
|
||||
group0_client.init().get();
|
||||
auto stop_system_keyspace = defer([&sys_ks] {
|
||||
sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
|
||||
auto stop_system_keyspace = defer([this] {
|
||||
_sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
|
||||
});
|
||||
|
||||
auto shutdown_db = defer([&db] {
|
||||
db.invoke_on_all(&replica::database::shutdown).get();
|
||||
auto shutdown_db = defer([this] {
|
||||
_db.invoke_on_all(&replica::database::shutdown).get();
|
||||
});
|
||||
// XXX: drain_on_shutdown raft before stopping the database and
|
||||
// query processor. Group registry stop raft groups
|
||||
// when stopped, and until then the groups may use
|
||||
// the database and the query processor.
|
||||
auto drain_raft = defer([&raft_gr] {
|
||||
raft_gr.invoke_on_all(&service::raft_group_registry::drain_on_shutdown).get();
|
||||
auto drain_raft = defer([this] {
|
||||
_group0_registry.invoke_on_all(&service::raft_group_registry::drain_on_shutdown).get();
|
||||
});
|
||||
|
||||
view_update_generator.start(std::ref(db), std::ref(proxy)).get();
|
||||
view_update_generator.invoke_on_all(&db::view::view_update_generator::start).get();
|
||||
auto stop_view_update_generator = defer([&view_update_generator] {
|
||||
view_update_generator.stop().get();
|
||||
_view_update_generator.start(std::ref(_db), std::ref(_proxy)).get();
|
||||
_view_update_generator.invoke_on_all(&db::view::view_update_generator::start).get();
|
||||
auto stop_view_update_generator = defer([this] {
|
||||
_view_update_generator.stop().get();
|
||||
});
|
||||
|
||||
sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();
|
||||
_sys_dist_ks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy)).get();
|
||||
|
||||
if (cfg_in.need_remote_proxy) {
|
||||
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(ms), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get();
|
||||
_proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks)).get();
|
||||
}
|
||||
auto stop_proxy_remote = defer([&proxy, need = cfg_in.need_remote_proxy] {
|
||||
auto stop_proxy_remote = defer([this, need = cfg_in.need_remote_proxy] {
|
||||
if (need) {
|
||||
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
|
||||
_proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
|
||||
}
|
||||
});
|
||||
|
||||
sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) {
|
||||
_sl_controller.invoke_on_all([this] (qos::service_level_controller& service) {
|
||||
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
|
||||
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
||||
make_shared<qos::unit_test_service_levels_accessor>(sl_controller,sys_dist_ks));
|
||||
make_shared<qos::unit_test_service_levels_accessor>(_sl_controller, _sys_dist_ks));
|
||||
return service.set_distributed_data_accessor(std::move(service_level_data_accessor));
|
||||
}).get();
|
||||
|
||||
@@ -908,16 +856,15 @@ public:
|
||||
* and would only slow down tests (by having them wait).
|
||||
*/
|
||||
cdc_config.ring_delay = std::chrono::milliseconds(0);
|
||||
cdc_generation_service.start(std::ref(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
|
||||
auto stop_cdc_generation_service = defer([&cdc_generation_service] {
|
||||
cdc_generation_service.stop().get();
|
||||
_cdc_generation_service.start(std::ref(cdc_config), std::ref(_gossiper), std::ref(_sys_dist_ks), std::ref(_sys_ks), std::ref(abort_sources), std::ref(_token_metadata), std::ref(_feature_service), std::ref(_db)).get();
|
||||
auto stop_cdc_generation_service = defer([this] {
|
||||
_cdc_generation_service.stop().get();
|
||||
});
|
||||
|
||||
sharded<cdc::cdc_service> cdc;
|
||||
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
|
||||
cdc.start(std::ref(proxy), sharded_parameter(get_cdc_metadata, std::ref(cdc_generation_service)), std::ref(mm_notif)).get();
|
||||
auto stop_cdc_service = defer([&] {
|
||||
cdc.stop().get();
|
||||
_cdc.start(std::ref(_proxy), sharded_parameter(get_cdc_metadata, std::ref(_cdc_generation_service)), std::ref(_mnotifier)).get();
|
||||
auto stop_cdc_service = defer([this] {
|
||||
_cdc.stop().get();
|
||||
});
|
||||
|
||||
group0_service.start().get();
|
||||
@@ -928,10 +875,10 @@ public:
|
||||
const bool raft_topology_change_enabled = group0_service.is_raft_enabled()
|
||||
&& cfg->check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES);
|
||||
|
||||
ss.local().set_group0(group0_service, raft_topology_change_enabled);
|
||||
_ss.local().set_group0(group0_service, raft_topology_change_enabled);
|
||||
|
||||
try {
|
||||
ss.local().join_cluster(cdc_generation_service.local(), sys_dist_ks, proxy, qp.local()).get();
|
||||
_ss.local().join_cluster(_cdc_generation_service.local(), _sys_dist_ks, _proxy, _qp.local()).get();
|
||||
} catch (std::exception& e) {
|
||||
// if any of the defers crashes too, we'll never see
|
||||
// the error
|
||||
@@ -953,23 +900,22 @@ public:
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
auth_service.start(perm_cache_config, std::ref(qp), std::ref(mm_notif), std::ref(mm), auth_config).get();
|
||||
auth_service.invoke_on_all([&mm] (auth::service& auth) {
|
||||
return auth.start(mm.local());
|
||||
_auth_service.start(perm_cache_config, std::ref(_qp), std::ref(_mnotifier), std::ref(_mm), auth_config).get();
|
||||
_auth_service.invoke_on_all([this] (auth::service& auth) {
|
||||
return auth.start(_mm.local());
|
||||
}).get();
|
||||
|
||||
auto deinit_storage_service_server = defer([&auth_service, &gossiper] {
|
||||
gossiper.invoke_on_all(&gms::gossiper::shutdown).get();
|
||||
auth_service.stop().get();
|
||||
auto deinit_storage_service_server = defer([this] {
|
||||
_gossiper.invoke_on_all(&gms::gossiper::shutdown).get();
|
||||
_auth_service.stop().get();
|
||||
});
|
||||
|
||||
sharded<db::view::view_builder> view_builder;
|
||||
view_builder.start(std::ref(db), std::ref(sys_ks), std::ref(sys_dist_ks), std::ref(mm_notif), std::ref(view_update_generator)).get();
|
||||
view_builder.invoke_on_all([&mm] (db::view::view_builder& vb) {
|
||||
return vb.start(mm.local());
|
||||
_view_builder.start(std::ref(_db), std::ref(_sys_ks), std::ref(_sys_dist_ks), std::ref(_mnotifier), std::ref(_view_update_generator)).get();
|
||||
_view_builder.invoke_on_all([this] (db::view::view_builder& vb) {
|
||||
return vb.start(_mm.local());
|
||||
}).get();
|
||||
auto stop_view_builder = defer([&view_builder] {
|
||||
view_builder.stop().get();
|
||||
auto stop_view_builder = defer([this] {
|
||||
_view_builder.stop().get();
|
||||
});
|
||||
|
||||
// Create the testing user.
|
||||
@@ -979,7 +925,7 @@ public:
|
||||
config.can_login = true;
|
||||
|
||||
auth::create_role(
|
||||
auth_service.local(),
|
||||
_auth_service.local(),
|
||||
testing_superuser,
|
||||
config,
|
||||
auth::authentication_options()).get0();
|
||||
@@ -989,20 +935,21 @@ public:
|
||||
|
||||
notify_set.notify_all(configurable::system_state::started).get();
|
||||
|
||||
single_node_cql_env env(db, feature_service, sstm, proxy, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), bm, gossiper, group0_client, raft_gr, sys_ks, the_tablet_allocator);
|
||||
env.start().get();
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
_group0_client = &group0_client;
|
||||
|
||||
if (!env.local_db().has_keyspace(ks_name)) {
|
||||
env.create_keyspace(ks_name).get();
|
||||
_core_local.start(std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
auto stop_core_local = defer([this] { _core_local.stop().get(); });
|
||||
|
||||
if (!local_db().has_keyspace(ks_name)) {
|
||||
create_keyspace(ks_name).get();
|
||||
}
|
||||
|
||||
with_scheduling_group(dbcfg.statement_scheduling_group, [&func, &env] {
|
||||
return func(env);
|
||||
with_scheduling_group(dbcfg.statement_scheduling_group, [&func, this] {
|
||||
return func(*this);
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
|
||||
const std::vector<sstring_view>& queries, std::unique_ptr<cql3::query_options> qo) override {
|
||||
using cql3::statements::batch_statement;
|
||||
|
||||
@@ -138,8 +138,6 @@ public:
|
||||
const sstring& column_name,
|
||||
data_value expected) = 0;
|
||||
|
||||
virtual future<> stop() = 0;
|
||||
|
||||
virtual service::client_state& local_client_state() = 0;
|
||||
|
||||
virtual replica::database& local_db() = 0;
|
||||
|
||||
Reference in New Issue
Block a user