Merge 'sstables: writer: don't block topology changes while writing sstables' from Avi Kivity

The sstable writer held the effective_replication_map_ptr while writing
sstables, which is both a layering violation and slows down tablet load
balancing. It was needed in order to ensure the sharder was stable. But
it turns out that sharding metadata is unnecessary for tablets, so just
skip the whole thing when writing an sstable for tablets.

Closes scylladb/scylladb#16953

* github.com:scylladb/scylladb:
  sstables: writer: don't require effective_replication_map for sharding metadata
  schema: provide method to get sharder, iff it is static
This commit is contained in:
Botond Dénes
2024-01-25 12:12:01 +02:00
7 changed files with 24 additions and 15 deletions

View File

@@ -1121,7 +1121,6 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
try {
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
cfg.backup = incremental_backups_enabled();
cfg.erm = _erm;
auto newtab = make_sstable();
newtabs.push_back(newtab);
@@ -3078,7 +3077,6 @@ public:
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
cfg.erm = _t.get_effective_replication_map();
return cfg;
}
api::timestamp_type min_memtable_timestamp() const override {

View File

@@ -151,13 +151,22 @@ const dht::i_partitioner& schema::get_partitioner() const {
return _raw._partitioner.get();
}
const dht::sharder& schema::get_sharder() const {
const dht::sharder* schema::try_get_static_sharder() const {
auto t = maybe_table();
if (t && !t->uses_static_sharding()) {
// Use table()->get_effective_replication_map()->get_sharder() instead.
return nullptr;
}
return &_raw._sharder.get();
}
const dht::sharder& schema::get_sharder() const {
auto* s = try_get_static_sharder();
if (!s) {
// Use table()->get_effective_replication_map()->get_sharder() instead.
on_internal_error(dblog, format("Attempted to obtain static sharder for table {}.{}", ks_name(), cf_name()));
}
return _raw._sharder.get();
return *s;
}
replica::table* schema::maybe_table() const {

View File

@@ -821,6 +821,10 @@ public:
// To obtain a sharder which is valid for all kinds of tables, use table::get_effective_replication_map()->get_sharder()
const dht::sharder& get_sharder() const;
// Returns a sharder for this table, but only if it is a static sharder (token->shard mappings
// don't change while the node is up)
const dht::sharder* try_get_static_sharder() const;
// Returns a pointer to the table if the local database has a table which this object references by id().
// The table pointer is not guaranteed to be stable, schema_ptr doesn't keep the table alive.
replica::table* maybe_table() const;

View File

@@ -1472,9 +1472,7 @@ void writer::consume_end_of_stream() {
{ large_data_type::elements_in_collection, std::move(_elements_in_collection_entry) },
}
});
const dht::sharder& sharder = _cfg.erm ? _cfg.erm->get_sharder(_schema)
: _schema.get_sharder(); // Used in tests
_sst.write_scylla_metadata(_shard, sharder, std::move(features), std::move(identifier), std::move(ld_stats), _cfg.origin);
_sst.write_scylla_metadata(_shard, std::move(features), std::move(identifier), std::move(ld_stats), _cfg.origin);
_sst.seal_sstable(_cfg.backup).get();
}

View File

@@ -1629,16 +1629,17 @@ sstable::read_scylla_metadata() noexcept {
}
void
sstable::write_scylla_metadata(shard_id shard, const dht::sharder& sharder, sstable_enabled_features features, struct run_identifier identifier,
sstable::write_scylla_metadata(shard_id shard, sstable_enabled_features features, struct run_identifier identifier,
std::optional<scylla_metadata::large_data_stats> ld_stats, sstring origin) {
auto&& first_key = get_first_decorated_key();
auto&& last_key = get_last_decorated_key();
auto sm = create_sharding_metadata(_schema, sharder, first_key, last_key, shard);
auto* sharder_opt = _schema->try_get_static_sharder();
auto sm = sharder_opt ? std::optional(create_sharding_metadata(_schema, *sharder_opt, first_key, last_key, shard)) : std::nullopt;
// sstable write may fail to generate empty metadata if mutation source has only data from other shard.
// see https://github.com/scylladb/scylla/issues/2932 for details on how it can happen.
if (sm.token_ranges.elements.empty()) {
if (sm && sm->token_ranges.elements.empty()) {
throw std::runtime_error(format("Failed to generate sharding metadata for {}", get_filename()));
}
@@ -1646,7 +1647,9 @@ sstable::write_scylla_metadata(shard_id shard, const dht::sharder& sharder, ssta
_components->scylla_metadata.emplace();
}
_components->scylla_metadata->data.set<scylla_metadata_type::Sharding>(std::move(sm));
if (sm) {
_components->scylla_metadata->data.set<scylla_metadata_type::Sharding>(std::move(*sm));
}
_components->scylla_metadata->data.set<scylla_metadata_type::Features>(std::move(features));
_components->scylla_metadata->data.set<scylla_metadata_type::RunIdentifier>(std::move(identifier));
if (ld_stats) {

View File

@@ -39,7 +39,7 @@
#include "readers/flat_mutation_reader_fwd.hh"
#include "tracing/trace_state.hh"
#include "utils/updateable_value.hh"
#include "locator/abstract_replication_strategy.hh"
#include "dht/decorated_key.hh"
#include <seastar/util/optimized_optional.hh>
@@ -111,7 +111,6 @@ struct sstable_writer_config {
run_id run_identifier = run_id::create_random_id();
size_t summary_byte_cost;
sstring origin;
locator::effective_replication_map_ptr erm;
private:
explicit sstable_writer_config() {}
@@ -642,7 +641,6 @@ private:
future<> read_scylla_metadata() noexcept;
void write_scylla_metadata(shard_id shard,
const dht::sharder& sharder,
sstable_enabled_features features,
run_identifier identifier,
std::optional<scylla_metadata::large_data_stats> ld_stats,

View File

@@ -58,7 +58,6 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
schema_ptr s = reader.schema();
auto cfg = cf->get_sstables_manager().configure_writer(origin);
cfg.erm = cf->get_effective_replication_map();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cfg, encoding_stats{}).then([sst] {
return sst->open_data();