sstables: restore indentation
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -125,195 +125,195 @@ static void delete_sstables_for_interrupted_compaction(std::vector<shared_sstabl
|
||||
future<std::vector<shared_sstable>>
|
||||
compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable()> creator,
|
||||
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup) {
|
||||
return seastar::async([sstables = std::move(sstables), &cf, creator = std::move(creator), max_sstable_size, sstable_level, cleanup] () mutable {
|
||||
std::vector<::mutation_reader> readers;
|
||||
uint64_t estimated_partitions = 0;
|
||||
auto ancestors = make_lw_shared<std::vector<unsigned long>>();
|
||||
auto info = make_lw_shared<compaction_info>();
|
||||
auto& cm = cf.get_compaction_manager();
|
||||
sstring sstable_logger_msg = "[";
|
||||
return seastar::async([sstables = std::move(sstables), &cf, creator = std::move(creator), max_sstable_size, sstable_level, cleanup] () mutable {
|
||||
std::vector<::mutation_reader> readers;
|
||||
uint64_t estimated_partitions = 0;
|
||||
auto ancestors = make_lw_shared<std::vector<unsigned long>>();
|
||||
auto info = make_lw_shared<compaction_info>();
|
||||
auto& cm = cf.get_compaction_manager();
|
||||
sstring sstable_logger_msg = "[";
|
||||
|
||||
info->type = (cleanup) ? compaction_type::Cleanup : compaction_type::Compaction;
|
||||
// register compaction_stats of starting compaction into compaction manager
|
||||
cm.register_compaction(info);
|
||||
info->type = (cleanup) ? compaction_type::Cleanup : compaction_type::Compaction;
|
||||
// register compaction_stats of starting compaction into compaction manager
|
||||
cm.register_compaction(info);
|
||||
|
||||
assert(sstables.size() > 0);
|
||||
assert(sstables.size() > 0);
|
||||
|
||||
db::replay_position rp;
|
||||
db::replay_position rp;
|
||||
|
||||
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
|
||||
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
std::vector<shared_sstable> not_compacted_sstables;
|
||||
boost::set_difference(*all_sstables | boost::adaptors::map_values, sstables,
|
||||
std::back_inserter(not_compacted_sstables), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
|
||||
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
std::vector<shared_sstable> not_compacted_sstables;
|
||||
boost::set_difference(*all_sstables | boost::adaptors::map_values, sstables,
|
||||
std::back_inserter(not_compacted_sstables), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
|
||||
auto schema = cf.schema();
|
||||
for (auto sst : sstables) {
|
||||
// We also capture the sstable, so we keep it alive while the read isn't done
|
||||
readers.emplace_back(make_mutation_reader<sstable_reader>(sst, schema));
|
||||
// FIXME: If the sstables have cardinality estimation bitmaps, use that
|
||||
// for a better estimate for the number of partitions in the merged
|
||||
// sstable than just adding up the lengths of individual sstables.
|
||||
estimated_partitions += sst->get_estimated_key_count();
|
||||
info->total_partitions += sst->get_estimated_key_count();
|
||||
// Compacted sstable keeps track of its ancestors.
|
||||
ancestors->push_back(sst->generation());
|
||||
sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), sst->get_sstable_level());
|
||||
info->start_size += sst->data_size();
|
||||
// TODO:
|
||||
// Note that this is not fully correct. Since we might be merging sstables that originated on
|
||||
// another shard (#cpu changed), we might be comparing RP:s with differing shard ids,
|
||||
// which might vary in "comparable" size quite a bit. However, since the worst that happens
|
||||
// is that we might miss a high water mark for the commit log replayer,
|
||||
// this is kind of ok, esp. since we will hopefully not be trying to recover based on
|
||||
// compacted sstables anyway (CL should be clean by then).
|
||||
rp = std::max(rp, sst->get_stats_metadata().position);
|
||||
}
|
||||
auto schema = cf.schema();
|
||||
for (auto sst : sstables) {
|
||||
// We also capture the sstable, so we keep it alive while the read isn't done
|
||||
readers.emplace_back(make_mutation_reader<sstable_reader>(sst, schema));
|
||||
// FIXME: If the sstables have cardinality estimation bitmaps, use that
|
||||
// for a better estimate for the number of partitions in the merged
|
||||
// sstable than just adding up the lengths of individual sstables.
|
||||
estimated_partitions += sst->get_estimated_key_count();
|
||||
info->total_partitions += sst->get_estimated_key_count();
|
||||
// Compacted sstable keeps track of its ancestors.
|
||||
ancestors->push_back(sst->generation());
|
||||
sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), sst->get_sstable_level());
|
||||
info->start_size += sst->data_size();
|
||||
// TODO:
|
||||
// Note that this is not fully correct. Since we might be merging sstables that originated on
|
||||
// another shard (#cpu changed), we might be comparing RP:s with differing shard ids,
|
||||
// which might vary in "comparable" size quite a bit. However, since the worst that happens
|
||||
// is that we might miss a high water mark for the commit log replayer,
|
||||
// this is kind of ok, esp. since we will hopefully not be trying to recover based on
|
||||
// compacted sstables anyway (CL should be clean by then).
|
||||
rp = std::max(rp, sst->get_stats_metadata().position);
|
||||
}
|
||||
|
||||
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(info->start_size) / max_sstable_size)));
|
||||
uint64_t partitions_per_sstable = ceil(double(estimated_partitions) / estimated_sstables);
|
||||
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(info->start_size) / max_sstable_size)));
|
||||
uint64_t partitions_per_sstable = ceil(double(estimated_partitions) / estimated_sstables);
|
||||
|
||||
sstable_logger_msg += "]";
|
||||
info->sstables = sstables.size();
|
||||
info->ks = schema->ks_name();
|
||||
info->cf = schema->cf_name();
|
||||
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
|
||||
sstable_logger_msg += "]";
|
||||
info->sstables = sstables.size();
|
||||
info->ks = schema->ks_name();
|
||||
info->cf = schema->cf_name();
|
||||
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
|
||||
|
||||
class compacting_reader final : public ::mutation_reader::impl {
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
::mutation_reader _reader;
|
||||
std::vector<shared_sstable> _not_compacted_sstables;
|
||||
gc_clock::time_point _now;
|
||||
std::vector<range<dht::token>> _sorted_owned_ranges;
|
||||
bool _cleanup;
|
||||
public:
|
||||
compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector<shared_sstable> not_compacted_sstables,
|
||||
std::vector<range<dht::token>> sorted_owned_ranges, bool cleanup)
|
||||
: _schema(std::move(schema))
|
||||
, _reader(make_combined_reader(std::move(readers)))
|
||||
, _not_compacted_sstables(std::move(not_compacted_sstables))
|
||||
, _now(gc_clock::now())
|
||||
, _sorted_owned_ranges(std::move(sorted_owned_ranges))
|
||||
, _cleanup(cleanup)
|
||||
{ }
|
||||
class compacting_reader final : public ::mutation_reader::impl {
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
::mutation_reader _reader;
|
||||
std::vector<shared_sstable> _not_compacted_sstables;
|
||||
gc_clock::time_point _now;
|
||||
std::vector<range<dht::token>> _sorted_owned_ranges;
|
||||
bool _cleanup;
|
||||
public:
|
||||
compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector<shared_sstable> not_compacted_sstables,
|
||||
std::vector<range<dht::token>> sorted_owned_ranges, bool cleanup)
|
||||
: _schema(std::move(schema))
|
||||
, _reader(make_combined_reader(std::move(readers)))
|
||||
, _not_compacted_sstables(std::move(not_compacted_sstables))
|
||||
, _now(gc_clock::now())
|
||||
, _sorted_owned_ranges(std::move(sorted_owned_ranges))
|
||||
, _cleanup(cleanup)
|
||||
{ }
|
||||
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _reader().then([] (auto sm) {
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _reader().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this] (mutation_opt m) {
|
||||
if (!bool(m)) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
}
|
||||
// Filter out mutation that doesn't belong to current shard.
|
||||
if (dht::shard_of(m->token()) != engine().cpu_id()) {
|
||||
return operator()();
|
||||
}
|
||||
if (_cleanup && !belongs_to_current_node(m->token(), _sorted_owned_ranges)) {
|
||||
return operator()();
|
||||
}
|
||||
auto max_purgeable = get_max_purgeable_timestamp(_schema, _not_compacted_sstables, m->decorated_key());
|
||||
m->partition().compact_for_compaction(*_schema, max_purgeable, _now);
|
||||
if (!m->partition().empty()) {
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*m)));
|
||||
}
|
||||
return operator()();
|
||||
});
|
||||
}
|
||||
};
|
||||
std::vector<range<dht::token>> owned_ranges;
|
||||
if (cleanup) {
|
||||
owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
|
||||
}
|
||||
auto reader = make_mutation_reader<compacting_reader>(schema, std::move(readers), std::move(not_compacted_sstables),
|
||||
std::move(owned_ranges), cleanup);
|
||||
|
||||
auto start_time = db_clock::now();
|
||||
|
||||
// We use a fixed-sized pipe between the producer fiber (which reads the
|
||||
// individual sstables and merges them) and the consumer fiber (which
|
||||
// only writes to the sstable). Things would have worked without this
|
||||
// pipe (the writing fiber would have also performed the reads), but we
|
||||
// prefer to do less work in the writer (which is a seastar::thread),
|
||||
// and also want the extra buffer to ensure we do fewer context switches
|
||||
// to that seastar::thread.
|
||||
// TODO: better tuning for the size of the pipe. Perhaps should take into
|
||||
// account the size of the individual mutations?
|
||||
seastar::pipe<mutation> output{16};
|
||||
auto output_reader = make_lw_shared<seastar::pipe_reader<mutation>>(std::move(output.reader));
|
||||
auto output_writer = make_lw_shared<seastar::pipe_writer<mutation>>(std::move(output.writer));
|
||||
|
||||
future<> read_done = repeat([output_writer, reader = std::move(reader), info] () mutable {
|
||||
if (info->is_stop_requested()) {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
throw compaction_stop_exception(info->ks, info->cf, info->stop_requested);
|
||||
}
|
||||
return reader().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this] (mutation_opt m) {
|
||||
if (!bool(m)) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
}).then([output_writer, info] (auto mopt) {
|
||||
if (mopt) {
|
||||
info->total_keys_written++;
|
||||
return output_writer->write(std::move(*mopt)).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
// Filter out mutation that doesn't belong to current shard.
|
||||
if (dht::shard_of(m->token()) != engine().cpu_id()) {
|
||||
return operator()();
|
||||
}
|
||||
if (_cleanup && !belongs_to_current_node(m->token(), _sorted_owned_ranges)) {
|
||||
return operator()();
|
||||
}
|
||||
auto max_purgeable = get_max_purgeable_timestamp(_schema, _not_compacted_sstables, m->decorated_key());
|
||||
m->partition().compact_for_compaction(*_schema, max_purgeable, _now);
|
||||
if (!m->partition().empty()) {
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*m)));
|
||||
}
|
||||
return operator()();
|
||||
});
|
||||
}
|
||||
};
|
||||
std::vector<range<dht::token>> owned_ranges;
|
||||
if (cleanup) {
|
||||
owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
|
||||
}
|
||||
auto reader = make_mutation_reader<compacting_reader>(schema, std::move(readers), std::move(not_compacted_sstables),
|
||||
std::move(owned_ranges), cleanup);
|
||||
}).then([output_writer] {});
|
||||
|
||||
auto start_time = db_clock::now();
|
||||
|
||||
// We use a fixed-sized pipe between the producer fiber (which reads the
|
||||
// individual sstables and merges them) and the consumer fiber (which
|
||||
// only writes to the sstable). Things would have worked without this
|
||||
// pipe (the writing fiber would have also performed the reads), but we
|
||||
// prefer to do less work in the writer (which is a seastar::thread),
|
||||
// and also want the extra buffer to ensure we do fewer context switches
|
||||
// to that seastar::thread.
|
||||
// TODO: better tuning for the size of the pipe. Perhaps should take into
|
||||
// account the size of the individual mutations?
|
||||
seastar::pipe<mutation> output{16};
|
||||
auto output_reader = make_lw_shared<seastar::pipe_reader<mutation>>(std::move(output.reader));
|
||||
auto output_writer = make_lw_shared<seastar::pipe_writer<mutation>>(std::move(output.writer));
|
||||
|
||||
future<> read_done = repeat([output_writer, reader = std::move(reader), info] () mutable {
|
||||
if (info->is_stop_requested()) {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
throw compaction_stop_exception(info->ks, info->cf, info->stop_requested);
|
||||
}
|
||||
return reader().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([output_writer, info] (auto mopt) {
|
||||
if (mopt) {
|
||||
info->total_keys_written++;
|
||||
return output_writer->write(std::move(*mopt)).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
struct queue_reader final : public ::mutation_reader::impl {
|
||||
lw_shared_ptr<seastar::pipe_reader<mutation>> pr;
|
||||
queue_reader(lw_shared_ptr<seastar::pipe_reader<mutation>> pr) : pr(std::move(pr)) {}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return pr->read().then([] (std::experimental::optional<mutation> m) mutable {
|
||||
if (!m) {
|
||||
return streamed_mutation_opt();
|
||||
}
|
||||
return streamed_mutation_opt(streamed_mutation_from_mutation(std::move(*m)));
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).then([output_writer] {});
|
||||
};
|
||||
|
||||
struct queue_reader final : public ::mutation_reader::impl {
|
||||
lw_shared_ptr<seastar::pipe_reader<mutation>> pr;
|
||||
queue_reader(lw_shared_ptr<seastar::pipe_reader<mutation>> pr) : pr(std::move(pr)) {}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return pr->read().then([] (std::experimental::optional<mutation> m) mutable {
|
||||
if (!m) {
|
||||
return streamed_mutation_opt();
|
||||
// If there is a maximum size for a sstable, it's possible that more than
|
||||
// one sstable will be generated for all partitions to be written.
|
||||
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] {
|
||||
return output_reader->read().then(
|
||||
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] (auto mut) {
|
||||
// Check if mutation is available from the pipe for a new sstable to be written. If not, just stop writing.
|
||||
if (!mut) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return streamed_mutation_opt(streamed_mutation_from_mutation(std::move(*m)));
|
||||
});
|
||||
}
|
||||
};
|
||||
// If a mutation is available, we must unread it for write_components to read it afterwards.
|
||||
output_reader->unread(std::move(*mut));
|
||||
|
||||
// If there is a maximum size for a sstable, it's possible that more than
|
||||
// one sstable will be generated for all partitions to be written.
|
||||
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] {
|
||||
return output_reader->read().then(
|
||||
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] (auto mut) {
|
||||
// Check if mutation is available from the pipe for a new sstable to be written. If not, just stop writing.
|
||||
if (!mut) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
// If a mutation is available, we must unread it for write_components to read it afterwards.
|
||||
output_reader->unread(std::move(*mut));
|
||||
auto newtab = creator();
|
||||
info->new_sstables.push_back(newtab);
|
||||
newtab->get_metadata_collector().set_replay_position(rp);
|
||||
newtab->get_metadata_collector().sstable_level(sstable_level);
|
||||
for (auto ancestor : *ancestors) {
|
||||
newtab->add_ancestor(ancestor);
|
||||
}
|
||||
|
||||
auto newtab = creator();
|
||||
info->new_sstables.push_back(newtab);
|
||||
newtab->get_metadata_collector().set_replay_position(rp);
|
||||
newtab->get_metadata_collector().sstable_level(sstable_level);
|
||||
for (auto ancestor : *ancestors) {
|
||||
newtab->add_ancestor(ancestor);
|
||||
}
|
||||
::mutation_reader mutation_queue_reader = make_mutation_reader<queue_reader>(output_reader);
|
||||
|
||||
::mutation_reader mutation_queue_reader = make_mutation_reader<queue_reader>(output_reader);
|
||||
|
||||
auto&& priority = service::get_local_compaction_priority();
|
||||
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size, false, priority).then([newtab, info] {
|
||||
return newtab->open_data().then([newtab, info] {
|
||||
info->end_size += newtab->data_size();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
auto&& priority = service::get_local_compaction_priority();
|
||||
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size, false, priority).then([newtab, info] {
|
||||
return newtab->open_data().then([newtab, info] {
|
||||
info->end_size += newtab->data_size();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
}).handle_exception([sst = newtab] (auto ep) {
|
||||
logger.error("Compaction found an exception when writing sstable {} : {}",
|
||||
sst->get_filename(), ep);
|
||||
return make_exception_future<stop_iteration>(ep);
|
||||
});
|
||||
}).handle_exception([sst = newtab] (auto ep) {
|
||||
logger.error("Compaction found an exception when writing sstable {} : {}",
|
||||
sst->get_filename(), ep);
|
||||
return make_exception_future<stop_iteration>(ep);
|
||||
});
|
||||
});
|
||||
}).then([output_reader] {});
|
||||
}).then([output_reader] {});
|
||||
|
||||
sstring ex;
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user