system keyspace: record partitions with too many rows

Add "rows" field to system.large_partitions.  Add partitions to the
table when they are too large or have too many rows.

Fixes #9506

Signed-off-by: Michael Livshin <michael.livshin@scylladb.com>

Closes #9577
This commit is contained in:
Michael Livshin
2021-11-04 13:17:40 +02:00
committed by Avi Kivity
parent 98ec98ba36
commit a7511cf600
7 changed files with 61 additions and 75 deletions

View File

@@ -46,17 +46,20 @@ large_data_handler::large_data_handler(uint64_t partition_threshold_bytes, uint6
partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, rows_count_threshold);
}
future<bool> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) {
future<large_data_handler::partition_above_threshold> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows) {
assert(running());
if (partition_size > _partition_threshold_bytes) {
partition_above_threshold above_threshold{partition_size > _partition_threshold_bytes, rows > _rows_count_threshold};
if (above_threshold.size) [[unlikely]] {
++_stats.partitions_bigger_than_threshold;
return with_sem([&sst, &key, partition_size, this] {
return record_large_partitions(sst, key, partition_size);
}).then([] {
return true;
}
if (above_threshold.size || above_threshold.rows) [[unlikely]] {
return with_sem([&sst, &key, partition_size, rows, this] {
return record_large_partitions(sst, key, partition_size, rows);
}).then([above_threshold] {
return above_threshold;
});
}
return make_ready_future<bool>(false);
return make_ready_future<partition_above_threshold>();
}
void large_data_handler::start() {
@@ -82,24 +85,26 @@ future<> large_data_handler::maybe_delete_large_data_entries(sstables::shared_ss
auto schema = sst->get_schema();
auto filename = sst->get_filename();
auto data_size = sst->data_size();
using ldt = sstables::large_data_type;
auto above_threshold = [sst] (ldt type) -> bool {
auto entry = sst->get_large_data_stat(type);
return entry && entry->above_threshold;
};
future<> large_partitions = make_ready_future<>();
auto entry = sst->get_large_data_stat(sstables::large_data_type::partition_size);
if (entry && entry->above_threshold) {
if (above_threshold(ldt::partition_size) || above_threshold(ldt::rows_in_partition)) {
large_partitions = with_sem([schema, filename, this] () mutable {
return delete_large_data_entries(*schema, std::move(filename), db::system_keyspace::LARGE_PARTITIONS);
});
}
future<> large_rows = make_ready_future<>();
entry = sst->get_large_data_stat(sstables::large_data_type::row_size);
if (entry && entry->above_threshold) {
if (above_threshold(ldt::row_size)) {
large_rows = with_sem([schema, filename, this] () mutable {
return delete_large_data_entries(*schema, std::move(filename), db::system_keyspace::LARGE_ROWS);
});
}
future<> large_cells = make_ready_future<>();
entry = sst->get_large_data_stat(sstables::large_data_type::cell_size);
if (entry && entry->above_threshold) {
if (above_threshold(ldt::cell_size)) {
large_cells = with_sem([schema, filename, this] () mutable {
return delete_large_data_entries(*schema, std::move(filename), db::system_keyspace::LARGE_CELLS);
});
@@ -139,17 +144,8 @@ static future<> try_record(std::string_view large_table, const sstables::sstable
});
}
future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const {
return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {});
}
void cql_table_large_data_handler::log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t rows_count) const {
const schema& s = *sst.get_schema();
const auto sstable_name = sst.get_filename();
large_data_logger.warn("Writing a partition with too many rows [{}/{}:{}] ({} rows) to {}",
s.ks_name(), s.cf_name(), partition_key.to_partition_key(s).with_schema(s),
rows_count, sstable_name);
future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows) const {
return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {"rows"}, data_value((int64_t)rows));
}
future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,

View File

@@ -78,14 +78,6 @@ public:
void start();
future<> stop();
bool maybe_log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) {
if (__builtin_expect(rows_count > _rows_count_threshold, false)) {
log_too_many_rows(sst, partition_key, rows_count);
return true;
}
return false;
}
future<bool> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) {
assert(running());
@@ -99,7 +91,11 @@ public:
return make_ready_future<bool>(false);
}
future<bool> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size);
struct partition_above_threshold {
bool size = false;
bool rows = false;
};
future<partition_above_threshold> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows);
future<bool> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) {
@@ -132,12 +128,11 @@ public:
}
protected:
virtual void log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) const = 0;
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const = 0;
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0;
virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const = 0;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const = 0;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows) const = 0;
};
class cql_table_large_data_handler : public large_data_handler {
@@ -146,8 +141,7 @@ public:
: large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, rows_count_threshold) {}
protected:
virtual void log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) const override;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows) const override;
virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const override;
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override;
@@ -157,11 +151,7 @@ protected:
class nop_large_data_handler : public large_data_handler {
public:
nop_large_data_handler();
virtual void log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) const override {
return;
}
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override {
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows) const override {
return make_ready_future<>();
}

View File

@@ -565,7 +565,10 @@ schema_ptr system_keyspace::size_estimates() {
{"partition_key", utf8_type}
}, // CLUSTERING ORDER BY (partition_size DESC)
// regular columns
{{"compaction_time", timestamp_type}},
{
{"rows", long_type},
{"compaction_time", timestamp_type}
},
// static columns
{},
// regular column name type

View File

@@ -6,11 +6,12 @@ This section describes layouts and usage of system.* tables.
Scylla performs better if partitions, rows, or cells are not too
large. To help diagnose cases where these grow too large, scylla keeps
3 tables that record large partitions, rows, and cells, respectively.
3 tables that record large partitions (including those with too many
rows), rows, and cells, respectively.
The meaning of an entry in each of these tables is similar. It means
that there is a particular sstable with a large partition, row, or
cell. In particular, this implies that:
that there is a particular sstable with a large partition, row, cell,
or a partition with too many rows. In particular, this implies that:
* There is no entry until compaction aggregates enough data in a
single sstable.
@@ -20,7 +21,8 @@ In addition, the entries also have a TTL of 30 days.
## system.large\_partitions
Large partition table can be used to trace largest partitions in a cluster.
Large partition table can be used to trace largest partitions in a
cluster. Partitions with too many rows are also recorded there.
Schema:
~~~
@@ -30,6 +32,7 @@ CREATE TABLE system.large_partitions (
sstable_name text,
partition_size bigint,
partition_key text,
rows bigint,
compaction_time timestamp,
PRIMARY KEY ((keyspace_name, table_name), sstable_name, partition_size, partition_key)
) WITH CLUSTERING ORDER BY (sstable_name ASC, partition_size DESC, partition_key ASC);

View File

@@ -689,8 +689,7 @@ private:
std::optional<gc_clock::time_point> local_deletion_time;
};
void maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size);
void maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count);
void maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows);
void maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const uint64_t row_size);
void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
@@ -1040,23 +1039,18 @@ void writer::consume(tombstone t) {
}
}
void writer::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) {
auto& entry = _large_data_stats.map.at(large_data_type::partition_size);
if (entry.max_value < partition_size) {
entry.max_value = partition_size;
void writer::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t partition_size, uint64_t rows) {
auto& size_entry = _large_data_stats.map.at(large_data_type::partition_size);
auto& row_count_entry = _large_data_stats.map.at(large_data_type::rows_in_partition);
size_entry.max_value = std::max(size_entry.max_value, partition_size);
row_count_entry.max_value = std::max(row_count_entry.max_value, rows);
auto ret = _sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size, rows).get0();
if (ret.size) [[unlikely]] {
size_entry.above_threshold++;
}
if (_sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size).get0()) {
entry.above_threshold++;
};
}
void writer::maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) {
auto& entry = _large_data_stats.map.at(large_data_type::rows_in_partition);
if (entry.max_value < rows_count) {
entry.max_value = rows_count;
}
if (_sst.get_large_data_handler().maybe_log_too_many_rows(sst, partition_key, rows_count)) {
entry.above_threshold++;
if (ret.rows) [[unlikely]] {
row_count_entry.above_threshold++;
}
}
@@ -1462,8 +1456,7 @@ stop_iteration writer::consume_end_of_partition() {
// compute size of the current row.
_c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset;
maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size);
maybe_record_too_many_rows(_sst, *_partition_key, _c_stats.rows_count);
maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size, _c_stats.rows_count);
// update is about merging column_stats with the data being stored by collector.

View File

@@ -52,6 +52,12 @@ SEASTAR_TEST_CASE(test_large_partitions) {
return do_with_cql_env([](cql_test_env& e) { return make_ready_future<>(); }, cfg);
}
SEASTAR_TEST_CASE(test_large_row_count) {
auto cfg = make_shared<db::config>();
cfg->compaction_rows_count_warning_threshold(0);
return do_with_cql_env([](cql_test_env& e) { return make_ready_future<>(); }, cfg);
}
static void flush(cql_test_env& e) {
e.db().invoke_on_all([](database& dbi) {
return dbi.flush_all_memtables();

View File

@@ -5151,13 +5151,6 @@ struct large_row_handler : public db::large_data_handler {
start();
}
virtual void log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t rows_count) const override {
const schema_ptr s = sst.get_schema();
callback(*s, partition_key, nullptr, rows_count);
return;
}
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) const override {
const schema_ptr s = sst.get_schema();
@@ -5171,7 +5164,9 @@ struct large_row_handler : public db::large_data_handler {
}
virtual future<> record_large_partitions(const sstables::sstable& sst,
const sstables::key& partition_key, uint64_t partition_size) const override {
const sstables::key& partition_key, uint64_t partition_size, uint64_t rows_count) const override {
const schema_ptr s = sst.get_schema();
callback(*s, partition_key, nullptr, rows_count);
return make_ready_future<>();
}