replica: Introduce concept of storage group
Storage group is the storage of tablets. This new concept is helpful for tablet splitting, where the storage of tablet will be split in multiple compaction groups, where each can be compacted independently. The reason for not going with arena concept is that it added complexity, and it felt much more elegant to keep compaction group unchanged which at the end of the day abstracts the concept of a set of sstables that can be compacted and operated independently. When splitting, the storage group for a tablet may therefore own multiple compaction groups, left, right, and main, where main keeps the data that needs splitting. When splitting completes, only left and right compaction groups will be populated. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -14,7 +14,8 @@
|
||||
#include "compaction/compaction_backlog_manager.hh"
|
||||
#include "compaction/compaction_strategy_state.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "compaction/compaction_fwd.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include <boost/intrusive/list.hpp>
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -51,6 +52,8 @@ class compaction_group {
|
||||
seastar::condition_variable _staging_done_condition;
|
||||
// Gates async operations confined to a single group.
|
||||
seastar::gate _async_gate;
|
||||
using list_hook_t = boost::intrusive::list_member_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
|
||||
list_hook_t _list_hook;
|
||||
private:
|
||||
// Adds new sstable to the set of sstables
|
||||
// Doesn't update the cache. The cache must be synchronized in order for reads to see
|
||||
@@ -71,6 +74,10 @@ private:
|
||||
// it to be moved from its original sstable set (e.g. maintenance) into a new one (e.g. main).
|
||||
future<> delete_unused_sstables(sstables::compaction_completion_desc desc);
|
||||
public:
|
||||
using list_t = boost::intrusive::list<compaction_group,
|
||||
boost::intrusive::member_hook<compaction_group, compaction_group::list_hook_t, &compaction_group::_list_hook>,
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
|
||||
compaction_group(table& t, size_t gid, dht::token_range token_range);
|
||||
|
||||
size_t group_id() const noexcept {
|
||||
@@ -150,8 +157,30 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using compaction_group_vector = utils::chunked_vector<std::unique_ptr<compaction_group>>;
|
||||
using compaction_group_ptr = std::unique_ptr<compaction_group>;
|
||||
using compaction_group_vector = utils::chunked_vector<compaction_group_ptr>;
|
||||
using compaction_group_list = compaction_group::list_t;
|
||||
|
||||
// Storage group is responsible for storage that belongs to a single tablet.
|
||||
// A storage group can manage 1 or more compaction groups, each of which can be compacted independently.
|
||||
// If a tablet needs splitting, the storage group can be put in splitting mode, allowing the storage
|
||||
// in main compaction groups to be split into two new compaction groups, all of which will be managed
|
||||
// by the same storage group.
|
||||
class storage_group {
|
||||
compaction_group_ptr _main_cg;
|
||||
public:
|
||||
storage_group(compaction_group_ptr cg, compaction_group_list& list);
|
||||
|
||||
const dht::token_range& token_range() const noexcept;
|
||||
|
||||
compaction_group_ptr& main_compaction_group() noexcept;
|
||||
|
||||
utils::small_vector<compaction_group*, 3> compaction_groups() noexcept;
|
||||
};
|
||||
|
||||
using storage_group_vector = utils::chunked_vector<std::unique_ptr<storage_group>>;
|
||||
|
||||
// TODO: will be changed into storage_group_manager. Not doing it now to reduce the change size.
|
||||
class compaction_group_manager {
|
||||
public:
|
||||
virtual ~compaction_group_manager() {}
|
||||
|
||||
@@ -65,6 +65,7 @@
|
||||
#include "locator/tablets.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "compaction/compaction_fwd.hh"
|
||||
#include "compaction_group.hh"
|
||||
|
||||
class cell_locker;
|
||||
class cell_locker_stats;
|
||||
@@ -325,11 +326,6 @@ struct table_stats;
|
||||
using column_family_stats = table_stats;
|
||||
|
||||
class database_sstable_write_monitor;
|
||||
class compaction_group;
|
||||
class compaction_group_manager;
|
||||
using compaction_group_vector = utils::chunked_vector<std::unique_ptr<compaction_group>>;
|
||||
|
||||
using enable_backlog_tracker = bool_class<class enable_backlog_tracker_tag>;
|
||||
|
||||
extern const ssize_t new_reader_base_cost;
|
||||
|
||||
@@ -451,7 +447,10 @@ private:
|
||||
compaction_manager& _compaction_manager;
|
||||
sstables::compaction_strategy _compaction_strategy;
|
||||
std::unique_ptr<compaction_group_manager> _cg_manager;
|
||||
compaction_group_vector _compaction_groups;
|
||||
// The compaction group list is only a helper for accessing the groups managed by the storage groups.
|
||||
// The list entries are unlinked automatically when the storage group, they belong to, is removed.
|
||||
mutable compaction_group_list _compaction_groups;
|
||||
storage_group_vector _storage_groups;
|
||||
// Compound SSTable set for all the compaction groups, which is useful for operations spanning all of them.
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
@@ -573,20 +572,25 @@ public:
|
||||
};
|
||||
|
||||
private:
|
||||
using compaction_group_ptr = std::unique_ptr<compaction_group>;
|
||||
storage_group_vector make_storage_groups();
|
||||
// Select a compaction group from a given token.
|
||||
size_t storage_group_id_for_token(dht::token token) const noexcept;
|
||||
storage_group* storage_group_for_token(dht::token token) const noexcept;
|
||||
|
||||
std::unique_ptr<compaction_group_manager> make_compaction_group_manager();
|
||||
// Return compaction group if table owns a single one. Otherwise, null is returned.
|
||||
compaction_group* single_compaction_group_if_available() const noexcept;
|
||||
compaction_group* get_compaction_group(size_t id) const noexcept;
|
||||
// Select a compaction group from a given token.
|
||||
compaction_group& compaction_group_for_token(dht::token token) const noexcept;
|
||||
// Return ids of compaction groups, present in this shard, that own a particular token range.
|
||||
std::vector<size_t> compaction_group_ids_for_token_range(dht::token_range tr) const;
|
||||
// Return compaction groups, present in this shard, that own a particular token range.
|
||||
utils::chunked_vector<compaction_group*> compaction_groups_for_token_range(dht::token_range tr) const;
|
||||
// Select a compaction group from a given key.
|
||||
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept;
|
||||
// Select a compaction group from a given sstable based on its token range.
|
||||
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept;
|
||||
// Returns a list of all compaction groups.
|
||||
const compaction_group_vector& compaction_groups() const noexcept;
|
||||
compaction_group_list& compaction_groups() const noexcept;
|
||||
// Safely iterate through compaction groups, while performing async operations on them.
|
||||
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
|
||||
|
||||
|
||||
175
replica/table.cc
175
replica/table.cc
@@ -203,8 +203,8 @@ table::add_memtables_to_reader_list(std::vector<flat_mutation_reader_v2>& reader
|
||||
}
|
||||
reserve_fn(boost::accumulate(compaction_groups() | boost::adaptors::transformed(std::mem_fn(&compaction_group::memtable_count)), uint64_t(0)));
|
||||
// TODO: implement a incremental reader selector for memtable, using existing reader_selector interface for combined_reader.
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
add_memtables_from_cg(*cg);
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
add_memtables_from_cg(cg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,8 +370,8 @@ future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db
|
||||
}
|
||||
|
||||
std::vector<memtable*> table::active_memtables() {
|
||||
return boost::copy_range<std::vector<memtable*>>(compaction_groups() | boost::adaptors::transformed([] (const compaction_group_ptr& cg) {
|
||||
return &cg->memtables()->active_memtable();
|
||||
return boost::copy_range<std::vector<memtable*>>(compaction_groups() | boost::adaptors::transformed([] (compaction_group& cg) {
|
||||
return &cg.memtables()->active_memtable();
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -610,6 +610,31 @@ bool table::uses_tablets() const {
|
||||
return _erm && _erm->get_replication_strategy().uses_tablets();
|
||||
}
|
||||
|
||||
storage_group::storage_group(compaction_group_ptr cg, compaction_group_list& list)
|
||||
: _main_cg(std::move(cg)) {
|
||||
list.push_back(*_main_cg);
|
||||
}
|
||||
|
||||
const dht::token_range& storage_group::token_range() const noexcept {
|
||||
return _main_cg->token_range();
|
||||
}
|
||||
|
||||
compaction_group_ptr& storage_group::main_compaction_group() noexcept {
|
||||
return _main_cg;
|
||||
}
|
||||
|
||||
utils::small_vector<compaction_group*, 3> storage_group::compaction_groups() noexcept {
|
||||
return {_main_cg.get()};
|
||||
}
|
||||
|
||||
storage_group_vector table::make_storage_groups() {
|
||||
storage_group_vector sgs;
|
||||
for (compaction_group_ptr& cg : _cg_manager->make_compaction_groups()) {
|
||||
sgs.push_back(std::make_unique<storage_group>(std::move(cg), _compaction_groups));
|
||||
}
|
||||
return sgs;
|
||||
}
|
||||
|
||||
std::unique_ptr<compaction_group_manager> table::make_compaction_group_manager() {
|
||||
if (uses_tablets()) {
|
||||
return std::make_unique<tablet_compaction_group_manager>(*this);
|
||||
@@ -618,37 +643,57 @@ std::unique_ptr<compaction_group_manager> table::make_compaction_group_manager()
|
||||
}
|
||||
|
||||
compaction_group* table::single_compaction_group_if_available() const noexcept {
|
||||
return _compaction_groups.size() == 1 ? &*_compaction_groups[0] : nullptr;
|
||||
return _compaction_groups.size() == 1 ? get_compaction_group(0) : nullptr;
|
||||
}
|
||||
|
||||
compaction_group* table::get_compaction_group(size_t id) const noexcept {
|
||||
return _storage_groups[id]->main_compaction_group().get();
|
||||
}
|
||||
|
||||
size_t table::storage_group_id_for_token(dht::token token) const noexcept {
|
||||
auto idx = _cg_manager->compaction_group_of(token);
|
||||
if (idx >= _storage_groups.size()) {
|
||||
on_fatal_internal_error(tlogger, format("storage_group_for_token: index out of range: idx={} size_log2={} size={} token={}",
|
||||
idx, _cg_manager->log2_compaction_groups(), _storage_groups.size(), token));
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
storage_group* table::storage_group_for_token(dht::token token) const noexcept {
|
||||
auto idx = storage_group_id_for_token(token);
|
||||
auto& ret = *_storage_groups[idx];
|
||||
if (token.is_minimum() || token.is_maximum()) {
|
||||
return &ret;
|
||||
}
|
||||
if (!ret.token_range().contains(token, dht::token_comparator())) {
|
||||
on_fatal_internal_error(tlogger, format("storage_group_for_token: storage_group idx={} range={} does not contain token={}",
|
||||
idx, ret.token_range(), token));
|
||||
}
|
||||
return &ret;
|
||||
}
|
||||
|
||||
compaction_group& table::compaction_group_for_token(dht::token token) const noexcept {
|
||||
auto idx = _cg_manager->compaction_group_of(token);
|
||||
if (idx >= _compaction_groups.size()) {
|
||||
on_fatal_internal_error(tlogger, format("compaction_group_for_token: index out of range: idx={} size_log2={} size={} token={}",
|
||||
idx, _cg_manager->log2_compaction_groups(), _compaction_groups.size(), token));
|
||||
}
|
||||
auto& ret = *_compaction_groups[idx];
|
||||
if (token.is_minimum() || token.is_maximum()) {
|
||||
return ret;
|
||||
}
|
||||
if (!ret.token_range().contains(token, dht::token_comparator())) {
|
||||
on_fatal_internal_error(tlogger, format("compaction_group_for_token: compaction_group idx={} range={} does not contain token={}",
|
||||
idx, ret.token_range(), token));
|
||||
}
|
||||
return ret;
|
||||
// TODO: if storage group is in splitting mode, it will have to select the correct group.
|
||||
return *(storage_group_for_token(token)->main_compaction_group());
|
||||
}
|
||||
|
||||
std::vector<size_t> table::compaction_group_ids_for_token_range(dht::token_range tr) const {
|
||||
std::vector<size_t> ret;
|
||||
utils::chunked_vector<compaction_group*> table::compaction_groups_for_token_range(dht::token_range tr) const {
|
||||
utils::chunked_vector<compaction_group*> ret;
|
||||
auto cmp = dht::token_comparator();
|
||||
|
||||
size_t candidate_start = tr.start() ? compaction_group_for_token(tr.start()->value()).group_id() : size_t(0);
|
||||
size_t candidate_end = tr.end() ? compaction_group_for_token(tr.end()->value()).group_id() : (_compaction_groups.size() - 1);
|
||||
size_t candidate_start = tr.start() ? storage_group_id_for_token(tr.start()->value()) : size_t(0);
|
||||
size_t candidate_end = tr.end() ? storage_group_id_for_token(tr.end()->value()) : (_storage_groups.size() - 1);
|
||||
|
||||
while (candidate_start <= candidate_end) {
|
||||
auto& cg = _compaction_groups[candidate_start++];
|
||||
auto& sg = _storage_groups[candidate_start++];
|
||||
if (!sg) {
|
||||
continue;
|
||||
}
|
||||
// FIXME: indentation.
|
||||
for (auto& cg : sg->compaction_groups()) {
|
||||
if (cg && tr.overlaps(cg->token_range(), cmp)) {
|
||||
ret.push_back(cg->group_id());
|
||||
ret.push_back(cg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -668,23 +713,21 @@ compaction_group& table::compaction_group_for_sstable(const sstables::shared_sst
|
||||
return compaction_group_for_token(sst->get_first_decorated_key().token());
|
||||
}
|
||||
|
||||
const compaction_group_vector& table::compaction_groups() const noexcept {
|
||||
compaction_group_list& table::compaction_groups() const noexcept {
|
||||
return _compaction_groups;
|
||||
}
|
||||
|
||||
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
|
||||
// TODO: place a barrier here when we allow dynamic groups.
|
||||
co_await coroutine::parallel_for_each(compaction_groups(), [&] (const compaction_group_ptr& cg) {
|
||||
return action(*cg);
|
||||
co_await coroutine::parallel_for_each(compaction_groups(), [&] (compaction_group& cg) {
|
||||
return action(cg);
|
||||
});
|
||||
}
|
||||
|
||||
future<sstables::sstable_list> table::take_storage_snapshot(dht::token_range tr) {
|
||||
sstables::sstable_list ret;
|
||||
|
||||
for (auto cg_id : compaction_group_ids_for_token_range(tr)) {
|
||||
auto& cg = _compaction_groups[cg_id];
|
||||
|
||||
for (auto& cg : compaction_groups_for_token_range(tr)) {
|
||||
// We don't care about sstables in snapshot being unlinked, as the file
|
||||
// descriptors remain opened until last reference to them are gone.
|
||||
// Also, we should be careful with taking a deletion lock here as a
|
||||
@@ -1086,8 +1129,8 @@ table::stop() {
|
||||
co_await _sstable_deletion_gate.close();
|
||||
co_await std::move(gate_closed_fut);
|
||||
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
cg->clear_sstables();
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
cg.clear_sstables();
|
||||
}
|
||||
_sstables = make_compound_sstable_set();
|
||||
}));
|
||||
@@ -1202,10 +1245,10 @@ void table::rebuild_statistics() {
|
||||
_stats.live_sstable_count = 0;
|
||||
_stats.total_disk_space_used = 0;
|
||||
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
_stats.live_disk_space_used += cg->live_disk_space_used();
|
||||
_stats.total_disk_space_used += cg->total_disk_space_used();
|
||||
_stats.live_sstable_count += cg->live_sstable_count();
|
||||
for (const compaction_group& cg : compaction_groups()) {
|
||||
_stats.live_disk_space_used += cg.live_disk_space_used();
|
||||
_stats.total_disk_space_used += cg.total_disk_space_used();
|
||||
_stats.live_sstable_count += cg.live_sstable_count();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1403,8 +1446,8 @@ void table::start_compaction() {
|
||||
}
|
||||
|
||||
void table::trigger_compaction() {
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
cg->trigger_compaction();
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
cg.trigger_compaction();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1450,7 +1493,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
|
||||
co_await flush();
|
||||
|
||||
if (_compaction_groups.size() == 1) {
|
||||
auto& cg = *_compaction_groups[0];
|
||||
auto& cg = *get_compaction_group(0);
|
||||
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state(), info);
|
||||
}
|
||||
|
||||
@@ -1464,7 +1507,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
|
||||
dht::token_range_vector cg_ranges;
|
||||
std::unordered_map<dht::token_range, compaction::owned_ranges_ptr> cg_ranges_map;
|
||||
for (const auto& cg : _compaction_groups) {
|
||||
const auto& cg_range = cg->token_range();
|
||||
const auto& cg_range = cg.token_range();
|
||||
while (!candidates.empty()) {
|
||||
auto range = std::move(candidates.front());
|
||||
auto trimmed = range.intersection(cg_range, cmp);
|
||||
@@ -1491,8 +1534,8 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
|
||||
}
|
||||
|
||||
unsigned table::estimate_pending_compactions() const {
|
||||
return boost::accumulate(compaction_groups() | boost::adaptors::transformed([this] (const compaction_group_ptr& cg) {
|
||||
return _compaction_strategy.estimated_pending_compactions(cg->as_table_state());
|
||||
return boost::accumulate(compaction_groups() | boost::adaptors::transformed([this] (const compaction_group& cg) {
|
||||
return _compaction_strategy.estimated_pending_compactions(cg.as_table_state());
|
||||
}), unsigned(0));
|
||||
}
|
||||
|
||||
@@ -1540,8 +1583,8 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
};
|
||||
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
|
||||
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
compaction_group_sstable_set_updater updater(*this, *cg, new_cs);
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
|
||||
updater.prepare(new_cs);
|
||||
cg_sstable_set_updaters.push_back(std::move(updater));
|
||||
}
|
||||
@@ -1614,15 +1657,15 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
|
||||
// garbage-collect a tombstone that covers data in an sstable that may not be
|
||||
// successfully deleted.
|
||||
lw_shared_ptr<const sstable_list> table::get_sstables_including_compacted_undeleted() const {
|
||||
bool no_compacted_undeleted_sstable = std::ranges::all_of(compaction_groups(), [] (const compaction_group_ptr& cg) {
|
||||
return cg->compacted_undeleted_sstables().empty();
|
||||
bool no_compacted_undeleted_sstable = std::ranges::all_of(compaction_groups(), [] (const compaction_group& cg) {
|
||||
return cg.compacted_undeleted_sstables().empty();
|
||||
});
|
||||
if (no_compacted_undeleted_sstable) {
|
||||
return get_sstables();
|
||||
}
|
||||
auto ret = make_lw_shared<sstable_list>(*_sstables->all());
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
for (auto&& s: cg->compacted_undeleted_sstables()) {
|
||||
for (const compaction_group& cg : compaction_groups()) {
|
||||
for (auto&& s: cg.compacted_undeleted_sstables()) {
|
||||
ret->insert(s);
|
||||
}
|
||||
}
|
||||
@@ -1689,7 +1732,7 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
|
||||
, _compaction_manager(compaction_manager)
|
||||
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _cg_manager(make_compaction_group_manager())
|
||||
, _compaction_groups(_cg_manager->make_compaction_groups())
|
||||
, _storage_groups(make_storage_groups())
|
||||
, _sstables(make_compound_sstable_set())
|
||||
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
|
||||
, _commitlog(nullptr)
|
||||
@@ -1763,8 +1806,8 @@ table::~table() {
|
||||
|
||||
logalloc::occupancy_stats table::occupancy() const {
|
||||
logalloc::occupancy_stats res;
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
for (auto& m : *cg->memtables()) {
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
for (auto& m : *cg.memtables()) {
|
||||
res += m->region().occupancy();
|
||||
}
|
||||
}
|
||||
@@ -2020,8 +2063,8 @@ future<> table::clear() {
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
// if we implement notifications, whatnot.
|
||||
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
|
||||
assert(std::ranges::all_of(compaction_groups(), [this] (const compaction_group_ptr& cg) {
|
||||
return _compaction_manager.compaction_disabled(cg->as_table_state());
|
||||
assert(std::ranges::all_of(compaction_groups(), [this] (const compaction_group& cg) {
|
||||
return _compaction_manager.compaction_disabled(cg.as_table_state());
|
||||
}));
|
||||
|
||||
db::replay_position rp;
|
||||
@@ -2034,7 +2077,7 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
|
||||
|
||||
co_await _cache.invalidate(row_cache::external_updater([this, &rp, &remove, truncated_at] {
|
||||
// FIXME: the following isn't exception safe.
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
auto gc_trunc = to_gc_clock(truncated_at);
|
||||
|
||||
auto pruned = make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema));
|
||||
@@ -2046,17 +2089,17 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
|
||||
pruning->for_each_sstable([&] (const sstables::shared_sstable& p) mutable {
|
||||
if (p->max_data_age() <= gc_trunc) {
|
||||
rp = std::max(p->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(removed_sstable{*cg, p, enable_backlog_tracker});
|
||||
remove.emplace_back(removed_sstable{cg, p, enable_backlog_tracker});
|
||||
return;
|
||||
}
|
||||
pruned->insert(p);
|
||||
});
|
||||
};
|
||||
prune(pruned, cg->main_sstables(), enable_backlog_tracker::yes);
|
||||
prune(maintenance_pruned, cg->maintenance_sstables(), enable_backlog_tracker::no);
|
||||
prune(pruned, cg.main_sstables(), enable_backlog_tracker::yes);
|
||||
prune(maintenance_pruned, cg.maintenance_sstables(), enable_backlog_tracker::no);
|
||||
|
||||
cg->set_main_sstables(std::move(pruned));
|
||||
cg->set_maintenance_sstables(std::move(maintenance_pruned));
|
||||
cg.set_main_sstables(std::move(pruned));
|
||||
cg.set_maintenance_sstables(std::move(maintenance_pruned));
|
||||
}
|
||||
refresh_compound_sstable_set();
|
||||
tlogger.debug("cleaning out row cache");
|
||||
@@ -2099,8 +2142,8 @@ void table::set_schema(schema_ptr s) {
|
||||
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",
|
||||
_schema->ks_name(), _schema->cf_name(), _schema->id(), _schema->version(), s->version());
|
||||
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
for (auto& m: *cg->memtables()) {
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
for (auto& m: *cg.memtables()) {
|
||||
m->set_schema(s);
|
||||
}
|
||||
}
|
||||
@@ -2987,7 +3030,7 @@ compaction::table_state& compaction_group::as_table_state() const noexcept {
|
||||
|
||||
compaction::table_state& table::as_table_state() const noexcept {
|
||||
// FIXME: kill it once we're done with all remaining users.
|
||||
return _compaction_groups[0]->as_table_state();
|
||||
return get_compaction_group(0)->as_table_state();
|
||||
}
|
||||
|
||||
future<> table::parallel_foreach_table_state(std::function<future<>(table_state&)> action) {
|
||||
@@ -3067,7 +3110,10 @@ future<> compaction_group::cleanup() {
|
||||
future<> table::cleanup_tablet(locator::tablet_id tid) {
|
||||
auto holder = async_gate().hold();
|
||||
|
||||
auto& cg_ptr = _compaction_groups[tid.value()];
|
||||
auto& sg = _storage_groups[tid.value()];
|
||||
|
||||
// FIXME: indentation.
|
||||
for (auto& cg_ptr : sg->compaction_groups()) {
|
||||
|
||||
if (!cg_ptr) {
|
||||
throw std::runtime_error(format("Cannot cleanup tablet {} of table {}.{} because it is not allocated in this shard",
|
||||
@@ -3079,6 +3125,7 @@ future<> table::cleanup_tablet(locator::tablet_id tid) {
|
||||
//co_await _cg.stop();
|
||||
co_await cg_ptr->flush();
|
||||
co_await cg_ptr->cleanup();
|
||||
}
|
||||
|
||||
tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name());
|
||||
|
||||
|
||||
@@ -4410,8 +4410,12 @@ class scylla_memtables(gdb.Command):
|
||||
for table in for_each_table():
|
||||
gdb.write('table %s:\n' % schema_ptr(table['_schema']).table_name())
|
||||
try:
|
||||
for cg_ptr in chunked_vector(table["_compaction_groups"]):
|
||||
scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(cg_ptr).get())
|
||||
try:
|
||||
for cg in intrusive_list(table["_compaction_groups"], link='_list_hook'):
|
||||
scylla_memtables.dump_compaction_group_memtables(cg)
|
||||
except gdb.error:
|
||||
for cg_ptr in chunked_vector(table["_compaction_groups"]):
|
||||
scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(cg_ptr).get())
|
||||
except gdb.error:
|
||||
try:
|
||||
for cg_ptr in std_vector(table["_compaction_groups"]):
|
||||
|
||||
Reference in New Issue
Block a user