service: Introduce tablet_allocator
Currently, responsible for injecting mutations of system.tablets to schema changes. Note that not all migrations are handled currently. Dependant view or cdc table drops are not handled.
This commit is contained in:
@@ -844,6 +844,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'validation.cc',
|
||||
'service/priority_manager.cc',
|
||||
'service/migration_manager.cc',
|
||||
'service/tablet_allocator.cc',
|
||||
'service/storage_proxy.cc',
|
||||
'query_ranges_to_vnodes.cc',
|
||||
'service/forward_service.cc',
|
||||
|
||||
9
main.cc
9
main.cc
@@ -25,6 +25,7 @@
|
||||
#include "db/legacy_schema_migrator.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "service/load_meter.hh"
|
||||
#include "service/view_update_backlog_broker.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -1112,6 +1113,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
forward_service.stop().get();
|
||||
});
|
||||
|
||||
distributed<service::tablet_allocator> tablet_allocator;
|
||||
if (cfg->check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
tablet_allocator.start(std::ref(mm_notifier), std::ref(db)).get();
|
||||
}
|
||||
auto stop_tablet_allocator = defer_verbose_shutdown("tablet allocator", [&tablet_allocator] {
|
||||
tablet_allocator.stop().get();
|
||||
});
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&proxy] { return proxy.stop(); });
|
||||
|
||||
|
||||
90
service/tablet_allocator.cc
Normal file
90
service/tablet_allocator.cc
Normal file
@@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "locator/tablets.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include "locator/tablet_replication_strategy.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
|
||||
using namespace locator;
|
||||
using namespace replica;
|
||||
|
||||
namespace service {
|
||||
|
||||
class tablet_allocator_impl : public tablet_allocator::impl
|
||||
, public service::migration_listener::empty_listener {
|
||||
service::migration_notifier& _migration_notifier;
|
||||
replica::database& _db;
|
||||
bool _stopped = false;
|
||||
public:
|
||||
tablet_allocator_impl(service::migration_notifier& mn, replica::database& db)
|
||||
: _migration_notifier(mn)
|
||||
, _db(db) {
|
||||
_migration_notifier.register_listener(this);
|
||||
}
|
||||
|
||||
tablet_allocator_impl(tablet_allocator_impl&&) = delete; // "this" captured.
|
||||
|
||||
~tablet_allocator_impl() {
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
future<> stop() {
|
||||
co_await _migration_notifier.unregister_listener(this);
|
||||
_stopped = true;
|
||||
}
|
||||
|
||||
void on_before_create_column_family(const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
|
||||
keyspace& ks = _db.find_keyspace(s.ks_name());
|
||||
auto&& rs = ks.get_replication_strategy();
|
||||
if (auto&& tablet_rs = rs.maybe_as_tablet_aware()) {
|
||||
auto tm = _db.get_shared_token_metadata().get();
|
||||
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm).get0();
|
||||
muts.emplace_back(tablet_map_to_mutation(map, s.id(), s.keypace_name(), s.cf_name(), ts).get0());
|
||||
}
|
||||
}
|
||||
|
||||
void on_before_drop_column_family(const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
|
||||
keyspace& ks = _db.find_keyspace(s.ks_name());
|
||||
auto&& rs = ks.get_replication_strategy();
|
||||
std::vector<mutation> result;
|
||||
if (rs.uses_tablets()) {
|
||||
auto tm = _db.get_shared_token_metadata().get();
|
||||
muts.emplace_back(make_drop_tablet_map_mutation(s.keypace_name(), s.id(), ts));
|
||||
}
|
||||
}
|
||||
|
||||
void on_before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>& muts, api::timestamp_type ts) override {
|
||||
keyspace& ks = _db.find_keyspace(keyspace_name);
|
||||
auto&& rs = ks.get_replication_strategy();
|
||||
if (rs.uses_tablets()) {
|
||||
auto tm = _db.get_shared_token_metadata().get();
|
||||
for (auto&& [name, s] : ks.metadata()->cf_meta_data()) {
|
||||
muts.emplace_back(make_drop_tablet_map_mutation(keyspace_name, s->id(), ts));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Handle materialized views.
|
||||
};
|
||||
|
||||
tablet_allocator::tablet_allocator(service::migration_notifier& mn, replica::database& db)
|
||||
: _impl(std::make_unique<tablet_allocator_impl>(mn, db)) {
|
||||
}
|
||||
|
||||
future<> tablet_allocator::stop() {
|
||||
return impl().stop();
|
||||
}
|
||||
|
||||
tablet_allocator_impl& tablet_allocator::impl() {
|
||||
return static_cast<tablet_allocator_impl&>(*_impl);
|
||||
}
|
||||
|
||||
}
|
||||
34
service/tablet_allocator.hh
Normal file
34
service/tablet_allocator.hh
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include <any>
|
||||
|
||||
namespace service {
|
||||
|
||||
class tablet_allocator_impl;
|
||||
|
||||
class tablet_allocator {
|
||||
public:
|
||||
class impl {
|
||||
public:
|
||||
virtual ~impl() = default;
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
tablet_allocator_impl& impl();
|
||||
public:
|
||||
tablet_allocator(service::migration_notifier& mn, replica::database& db);
|
||||
public:
|
||||
future<> stop();
|
||||
};
|
||||
|
||||
}
|
||||
@@ -29,6 +29,7 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/raft/raft_address_map.hh"
|
||||
@@ -758,6 +759,12 @@ public:
|
||||
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
|
||||
auto stop_mm = defer([&mm] { mm.stop().get(); });
|
||||
|
||||
distributed<service::tablet_allocator> the_tablet_allocator;
|
||||
the_tablet_allocator.start(std::ref(mm_notif), std::ref(db)).get();
|
||||
auto stop_tablet_allocator = defer([&] {
|
||||
the_tablet_allocator.stop().get();
|
||||
});
|
||||
|
||||
cql3::query_processor::memory_config qp_mcfg;
|
||||
if (cfg_in.qp_mcfg) {
|
||||
qp_mcfg = *cfg_in.qp_mcfg;
|
||||
|
||||
Reference in New Issue
Block a user