Convert column_family::_sstables to sstable_set
Using sstable_set will allow us to filter sstables during a query before actually creating a reader (this is left to the next patch; here we just convert the users of the _sstables field).
This commit is contained in:
57
database.cc
57
database.cc
@@ -114,7 +114,8 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
|
||||
, _config(std::move(config))
|
||||
, _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list())
|
||||
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
|
||||
, _sstables(make_lw_shared<sstable_list>())
|
||||
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
|
||||
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
|
||||
, _commitlog(cl)
|
||||
, _compaction_manager(compaction_manager)
|
||||
@@ -129,7 +130,7 @@ partition_presence_checker
|
||||
column_family::make_partition_presence_checker(sstables::shared_sstable exclude_sstable) {
|
||||
return [this, exclude_sstable = std::move(exclude_sstable)] (partition_key_view key) {
|
||||
auto exclude = [e = std::move(exclude_sstable)] (auto s) { return s != e; };
|
||||
for (auto&& s : *_sstables | boost::adaptors::filtered(exclude)) {
|
||||
for (auto&& s : *_sstables->all() | boost::adaptors::filtered(exclude)) {
|
||||
if (s->filter_has_key(*_schema, key)) {
|
||||
return partition_presence_checker_result::maybe_exists;
|
||||
}
|
||||
@@ -171,7 +172,7 @@ bool belongs_to_current_shard(const streamed_mutation& m) {
|
||||
|
||||
class range_sstable_reader final : public mutation_reader::impl {
|
||||
const query::partition_range& _pr;
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
mutation_reader _reader;
|
||||
// Use a pointer instead of copying, so we don't need to regenerate the reader if
|
||||
// the priority changes.
|
||||
@@ -179,7 +180,7 @@ class range_sstable_reader final : public mutation_reader::impl {
|
||||
query::clustering_key_filtering_context _ck_filtering;
|
||||
public:
|
||||
range_sstable_reader(schema_ptr s,
|
||||
lw_shared_ptr<sstable_list> sstables,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const query::partition_range& pr,
|
||||
query::clustering_key_filtering_context ck_filtering,
|
||||
const io_priority_class& pc)
|
||||
@@ -189,7 +190,7 @@ public:
|
||||
, _ck_filtering(ck_filtering)
|
||||
{
|
||||
std::vector<mutation_reader> readers;
|
||||
for (const lw_shared_ptr<sstables::sstable>& sst : *_sstables) {
|
||||
for (const lw_shared_ptr<sstables::sstable>& sst : *_sstables->all()) {
|
||||
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
|
||||
mutation_reader reader =
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, s, pr, _ck_filtering, _pc);
|
||||
@@ -213,14 +214,14 @@ class single_key_sstable_reader final : public mutation_reader::impl {
|
||||
sstables::key _key;
|
||||
std::vector<streamed_mutation> _mutations;
|
||||
bool _done = false;
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
// Use a pointer instead of copying, so we don't need to regenerate the reader if
|
||||
// the priority changes.
|
||||
const io_priority_class& _pc;
|
||||
query::clustering_key_filtering_context _ck_filtering;
|
||||
public:
|
||||
single_key_sstable_reader(schema_ptr schema,
|
||||
lw_shared_ptr<sstable_list> sstables,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const partition_key& key,
|
||||
query::clustering_key_filtering_context ck_filtering,
|
||||
const io_priority_class& pc)
|
||||
@@ -235,7 +236,7 @@ public:
|
||||
if (_done) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
}
|
||||
return parallel_for_each(*_sstables,
|
||||
return parallel_for_each(*_sstables->all(),
|
||||
[this](const lw_shared_ptr<sstables::sstable>& sstable) {
|
||||
return sstable->read_row(_schema, _key, _ck_filtering, _pc).then([this](auto smo) {
|
||||
if (smo) {
|
||||
@@ -281,8 +282,8 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
key_source column_family::sstables_as_key_source() const {
|
||||
return key_source([this] (const query::partition_range& range, const io_priority_class& pc) {
|
||||
std::vector<key_reader> readers;
|
||||
readers.reserve(_sstables->size());
|
||||
std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& sst) {
|
||||
readers.reserve(_sstables->all()->size());
|
||||
std::transform(_sstables->all()->begin(), _sstables->all()->end(), std::back_inserter(readers), [&] (auto&& sst) {
|
||||
auto rd = sstables::make_key_reader(_schema, sst, range, pc);
|
||||
if (sst->is_shared()) {
|
||||
rd = make_filtering_reader(std::move(rd), [] (const dht::decorated_key& dk) {
|
||||
@@ -344,7 +345,7 @@ column_family::make_reader(schema_ptr s,
|
||||
}
|
||||
|
||||
std::vector<mutation_reader> readers;
|
||||
readers.reserve(_memtables->size() + _sstables->size());
|
||||
readers.reserve(_memtables->size() + _sstables->all()->size());
|
||||
|
||||
// We're assuming that cache and memtables are both read atomically
|
||||
// for single-key queries, so we don't need to special case memtable
|
||||
@@ -587,8 +588,8 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
update_sstables_known_generation(comps.generation);
|
||||
|
||||
{
|
||||
auto i = boost::range::find_if(*_sstables, [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; });
|
||||
if (i != _sstables->end()) {
|
||||
auto i = boost::range::find_if(*_sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; });
|
||||
if (i != _sstables->all()->end()) {
|
||||
auto new_toc = sstdir + "/" + fname;
|
||||
throw std::runtime_error(sprint("Attempted to add sstable generation %d twice: new=%s existing=%s",
|
||||
comps.generation, new_toc, (*i)->toc_filename()));
|
||||
@@ -624,7 +625,7 @@ void column_family::add_sstable(sstables::sstable&& sstable) {
|
||||
|
||||
void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable) {
|
||||
// allow in-progress reads to continue using old list
|
||||
_sstables = make_lw_shared<sstable_list>(*_sstables);
|
||||
_sstables = make_lw_shared(*_sstables);
|
||||
update_stats_for_new_sstable(sstable->bytes_on_disk());
|
||||
_sstables->insert(std::move(sstable));
|
||||
}
|
||||
@@ -962,7 +963,7 @@ void column_family::rebuild_statistics() {
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
std::move(*_sstables))) {
|
||||
std::move(*_sstables->all()))) {
|
||||
update_stats_for_new_sstable(tab->data_size());
|
||||
}
|
||||
}
|
||||
@@ -981,7 +982,7 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// if the deletion fails (note deletion of shared sstables can take
|
||||
// unbounded time, because all shards must agree on the deletion).
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema);
|
||||
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
||||
|
||||
|
||||
@@ -993,15 +994,15 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables))) {
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) {
|
||||
// Checks if oldtab is a sstable not being compacted.
|
||||
if (!s.count(tab)) {
|
||||
new_sstable_list->insert(tab);
|
||||
new_sstable_list.insert(tab);
|
||||
} else {
|
||||
new_compacted_but_not_deleted.push_back(tab);
|
||||
}
|
||||
}
|
||||
_sstables = std::move(new_sstable_list);
|
||||
_sstables = make_lw_shared(std::move(new_sstable_list));
|
||||
_sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted);
|
||||
|
||||
rebuild_statistics();
|
||||
@@ -1108,8 +1109,8 @@ column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tab
|
||||
future<>
|
||||
column_family::compact_all_sstables() {
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(_sstables->size());
|
||||
for (auto&& sst : *_sstables) {
|
||||
sstables.reserve(_sstables->all()->size());
|
||||
for (auto&& sst : *_sstables->all()) {
|
||||
sstables.push_back(sst);
|
||||
}
|
||||
// FIXME: check if the lower bound min_compaction_threshold() from schema
|
||||
@@ -1143,7 +1144,7 @@ void column_family::set_compaction_strategy(sstables::compaction_strategy_type s
|
||||
}
|
||||
|
||||
size_t column_family::sstables_count() {
|
||||
return _sstables->size();
|
||||
return _sstables->all()->size();
|
||||
}
|
||||
|
||||
int64_t column_family::get_unleveled_sstables() const {
|
||||
@@ -1154,7 +1155,7 @@ int64_t column_family::get_unleveled_sstables() const {
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstable_list> column_family::get_sstables() {
|
||||
return _sstables;
|
||||
return _sstables->all();
|
||||
}
|
||||
|
||||
// Gets the list of all sstables in the column family, including ones that are
|
||||
@@ -1166,9 +1167,9 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
|
||||
// successfully deleted.
|
||||
lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_undeleted() {
|
||||
if (_sstables_compacted_but_not_deleted.empty()) {
|
||||
return _sstables;
|
||||
return get_sstables();
|
||||
}
|
||||
auto ret = make_lw_shared(*_sstables);
|
||||
auto ret = make_lw_shared(*_sstables->all());
|
||||
for (auto&& s : _sstables_compacted_but_not_deleted) {
|
||||
ret->insert(s);
|
||||
}
|
||||
@@ -2475,7 +2476,7 @@ seal_snapshot(sstring jsondir) {
|
||||
|
||||
future<> column_family::snapshot(sstring name) {
|
||||
return flush().then([this, name = std::move(name)]() {
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables);
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
|
||||
return do_with(std::move(tables), [this, name](std::vector<sstables::shared_sstable> & tables) {
|
||||
auto jsondir = _config.datadir + "/snapshots/" + name;
|
||||
|
||||
@@ -2738,10 +2739,10 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
db::replay_position rp;
|
||||
auto gc_trunc = to_gc_clock(truncated_at);
|
||||
|
||||
auto pruned = make_lw_shared<sstable_list>();
|
||||
auto pruned = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
|
||||
std::vector<sstables::shared_sstable> remove;
|
||||
|
||||
for (auto&p : *_sstables) {
|
||||
for (auto&p : *_sstables->all()) {
|
||||
if (p->max_data_age() <= gc_trunc) {
|
||||
rp = std::max(p->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(p);
|
||||
|
||||
Reference in New Issue
Block a user