tools/scylla-nodetool: implement tablestats

Refs #15588

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2024-02-18 11:11:50 +08:00
parent a7a2cf64cc
commit c627d9134e
2 changed files with 1152 additions and 1 deletions

View File

@@ -0,0 +1,510 @@
#
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import json
import math
import pytest
import random
import re
import statistics
import yaml
from collections import defaultdict
from textwrap import indent
from typing import NamedTuple
from rest_api_mock import expected_request
class Table(NamedTuple):
ks: str
cf: str
type: str
def response_from_list(keys, values):
return [dict(zip(keys, value)) for value in values]
def histogram(count=0, sum_=0, min_=0, max_=0, variance=0, mean=0, sample=[]):
assert count == len(sample)
result = {
'count': count,
'sum': sum_,
'min': min_,
'max': max_,
'variance': variance,
'mean': mean
}
if count == 0:
return result
result['sample'] = sample
return result
def make_random_histogram(count):
lower = 100
upper = 100
#upper = 3000
samples = [random.randint(lower, upper) for _ in range(count)]
return histogram(count,
sum(samples),
min(samples),
max(samples),
statistics.variance(samples),
statistics.mean(samples),
samples)
def make_moving_avg_and_histogram(count):
# 1, 5, 15 minutes rates
rates = [random.random() for _ in (1, 5, 15)]
# the mean rate from startup
mean_rate = random.random()
hist = make_random_histogram(count)
return {
'meter': {
'rates': rates,
'mean_rate': mean_rate,
'count': count,
},
'hist': hist,
}
class table_stats:
def __init__(self, ks, cf):
self.ks = ks
self.cf = cf
self.read = 351
self.read_latency_hist = make_moving_avg_and_histogram(self.read)
self.write = 278
self.write_latency_hist = make_moving_avg_and_histogram(self.write)
self.live_ss_table_count = 16
self.sst_per_level = [16]
self.live_disk_space_used = 1146924
self.total_disk_space_used = 1146924
self.snapshots_size = 0
self.memtable_off_heap_size = 0
self.bloom_filter_off_heap_memory_used = 304
self.index_summary_off_heap_memory_used = 37248
self.compression_metadata_off_heap_memory_used = 0
self.compression_ratio = 0
self.estimated_row_count = 100
self.memtable_columns_count = 0
self.memtable_live_data_size = 0
self.memtable_switch_count = 16
self.read_latency = 198
self.write_latency = 152
self.pending_flushes = 0
# it's but a dummy value, scylla does not support it.
self.percent_repaired = 0.0
self.bloom_filter_false_positives = 0
self.recent_bloom_filter_false_ratio = 0.0
self.bloom_filter_disk_space_used = 304
self.min_row_size = 216
self.max_row_size = 258
self.mean_row_size = 258
self.live_scanned_histogram = histogram()
self.tombstone_scanned_histogram = histogram()
# it's but a dummy value, scylla does not support it.
self.dropped_mutations = 0
@property
def table_name(self):
return f'{self.ks}:{self.cf}'
@property
def local_read_count(self):
return self.read_latency_hist['hist']['count']
@property
def local_read_latency(self):
return self.read_latency_hist['hist']['mean']
@property
def local_write_count(self):
return self.write_latency_hist['hist']['count']
@property
def local_write_latency(self):
return self.write_latency_hist['hist']['mean']
@property
def total_off_heap_memory_used(self):
return (self.memtable_off_heap_size +
self.bloom_filter_off_heap_memory_used +
self.index_summary_off_heap_memory_used +
self.compression_metadata_off_heap_memory_used)
def sstable_count_in_each_level(self):
# scylla hardwire FANOUT_SIZE to 10
sstable_fanout_size = 10
for level, count in enumerate(self.sst_per_level):
if level == 0:
max_count = 4
else:
max_count = int(max.pow(sstable_fanout_size, level))
if count > max_count:
yield f'{count}/{max_count}'
else:
yield f'{count}'
@property
def sstables_in_each_level(self):
return ', '.join(self.sstable_count_in_each_level())
@property
def avg_live_cells_per_slice(self):
return self.live_scanned_histogram['mean']
@property
def max_live_cells_per_slice(self):
return self.live_scanned_histogram['max']
@property
def avg_tombstones_per_slice(self):
return self.tombstone_scanned_histogram['mean']
@property
def max_tombstones_per_slice(self):
return self.tombstone_scanned_histogram['max']
def req(self, name, response, **kwargs):
return expected_request('GET', f'/column_family/metrics/{name}/{self.table_name}',
response=response, **kwargs)
def hist(self, name, response):
return expected_request('GET', f'/column_family/metrics/{name}/moving_average_histogram/{self.table_name}',
multiple=expected_request.ANY,
response=response)
def ssttables_per_level(self):
return expected_request('GET', f'/column_family/sstables/per_level/{self.table_name}',
response=self.sst_per_level)
def expected_summary_requests(self, is_scylla):
if is_scylla:
return [
self.hist('write_latency', self.write_latency_hist),
self.req('write_latency', self.write_latency),
self.hist('read_latency', self.read_latency_hist),
self.req('read_latency', self.read_latency),
self.req('pending_flushes', self.pending_flushes)]
else:
return [
self.hist('write_latency', self.write_latency_hist),
self.hist('read_latency', self.read_latency_hist),
self.req('read_latency', self.read_latency),
self.req('write_latency', self.write_latency),
self.req('pending_flushes', self.pending_flushes)]
def expected_details_requests(self, is_scylla):
if is_scylla:
return [
# scylla only requests for this metric for plain text output
self.req('live_ss_table_count', self.live_ss_table_count,
multiple=expected_request.ANY),
self.ssttables_per_level(),
self.req('live_disk_space_used', self.live_disk_space_used),
self.req('total_disk_space_used', self.total_disk_space_used),
self.req('snapshots_size', self.snapshots_size),
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
self.req('compression_ratio', self.compression_ratio),
self.req('estimated_row_count', self.estimated_row_count),
self.req('memtable_columns_count', self.memtable_columns_count),
self.req('memtable_live_data_size', self.memtable_live_data_size),
self.req('memtable_switch_count', self.memtable_switch_count),
self.hist('read_latency', self.read_latency_hist),
self.hist('write_latency', self.write_latency_hist),
self.req('pending_flushes', self.pending_flushes),
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
self.req('min_row_size', self.min_row_size),
self.req('max_row_size', self.max_row_size),
self.req('mean_row_size', self.mean_row_size),
self.req('live_scanned_histogram', self.live_scanned_histogram),
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
]
else:
return [
self.req('live_ss_table_count', self.live_ss_table_count),
self.ssttables_per_level(),
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
self.req('live_disk_space_used', self.live_disk_space_used),
self.req('total_disk_space_used', self.total_disk_space_used),
self.req('snapshots_size', self.snapshots_size),
self.req('compression_ratio', self.compression_ratio),
self.req('estimated_row_count', self.estimated_row_count),
self.req('memtable_columns_count', self.memtable_columns_count),
self.req('memtable_live_data_size', self.memtable_live_data_size),
self.req('memtable_switch_count', self.memtable_switch_count),
self.hist('read_latency', self.read_latency_hist),
self.hist('write_latency', self.write_latency_hist),
self.req('pending_flushes', self.pending_flushes),
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
self.req('min_row_size', self.min_row_size),
self.req('max_row_size', self.max_row_size),
self.req('mean_row_size', self.mean_row_size),
self.req('live_scanned_histogram', self.live_scanned_histogram),
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
]
def format(self):
return f'''\
Table: {self.cf}
SSTable count: {self.live_ss_table_count}
SSTables in each level: [{self.sstables_in_each_level}]
Space used (live): {self.live_disk_space_used}
Space used (total): {self.total_disk_space_used}
Space used by snapshots (total): {self.snapshots_size}
Off heap memory used (total): {self.total_off_heap_memory_used}
SSTable Compression Ratio: {self.compression_ratio:.1f}
Number of partitions (estimate): {self.estimated_row_count}
Memtable cell count: {self.memtable_columns_count}
Memtable data size: {self.memtable_live_data_size}
Memtable off heap memory used: {self.memtable_off_heap_size}
Memtable switch count: {self.memtable_switch_count}
Local read count: {self.read}
Local read latency: {self.local_read_latency / 1000:.3f} ms
Local write count: {self.write}
Local write latency: {self.local_write_latency / 1000:.3f} ms
Pending flushes: {self.pending_flushes}
Percent repaired: {self.percent_repaired}
Bloom filter false positives: {self.bloom_filter_false_positives}
Bloom filter false ratio: {self.recent_bloom_filter_false_ratio:.5f}
Bloom filter space used: {self.bloom_filter_disk_space_used}
Bloom filter off heap memory used: {self.bloom_filter_off_heap_memory_used}
Index summary off heap memory used: {self.index_summary_off_heap_memory_used}
Compression metadata off heap memory used: {self.compression_metadata_off_heap_memory_used}
Compacted partition minimum bytes: {self.min_row_size}
Compacted partition maximum bytes: {self.max_row_size}
Compacted partition mean bytes: {self.mean_row_size}
Average live cells per slice (last five minutes): {self.avg_live_cells_per_slice:.1f}
Maximum live cells per slice (last five minutes): {self.max_live_cells_per_slice}
Average tombstones per slice (last five minutes): {self.avg_tombstones_per_slice:.1f}
Maximum tombstones per slice (last five minutes): {self.max_tombstones_per_slice}
Dropped Mutations: {self.dropped_mutations}
'''
def to_map(self):
return {
'sstables_in_each_level': list(self.sstable_count_in_each_level()),
'space_used_live': f'{self.live_disk_space_used}',
'space_used_total': f'{self.total_disk_space_used}',
'space_used_by_snapshots_total': f'{self.snapshots_size}',
'off_heap_memory_used_total': f'{self.total_off_heap_memory_used}',
'sstable_compression_ratio': self.compression_ratio,
'number_of_partitions_estimate': self.estimated_row_count,
'memtable_cell_count': self.memtable_columns_count,
'memtable_data_size': f'{self.memtable_live_data_size}',
'memtable_off_heap_memory_used': f'{self.memtable_off_heap_size}',
'memtable_switch_count': self.memtable_switch_count,
'local_read_count': self.read,
'local_read_latency_ms': f'{self.local_read_latency / 1000:.3f}',
'local_write_count': self.write,
'local_write_latency_ms': f'{self.local_write_latency / 1000:.3f}',
'pending_flushes': self.pending_flushes,
'percent_repaired': self.percent_repaired,
'bloom_filter_false_positives': self.bloom_filter_false_positives,
'bloom_filter_false_ratio': f'{self.recent_bloom_filter_false_ratio:01.5f}',
'bloom_filter_space_used': f'{self.bloom_filter_disk_space_used}',
'bloom_filter_off_heap_memory_used': f'{self.bloom_filter_off_heap_memory_used}',
'index_summary_off_heap_memory_used': f'{self.index_summary_off_heap_memory_used}',
'compression_metadata_off_heap_memory_used': f'{self.compression_metadata_off_heap_memory_used}',
'compacted_partition_minimum_bytes': self.min_row_size,
'compacted_partition_maximum_bytes': self.max_row_size,
'compacted_partition_mean_bytes': self.mean_row_size,
'average_live_cells_per_slice_last_five_minutes': self.avg_live_cells_per_slice,
'maximum_live_cells_per_slice_last_five_minutes': self.max_live_cells_per_slice,
'average_tombstones_per_slice_last_five_minutes': self.avg_tombstones_per_slice,
'maximum_tombstones_per_slice_last_five_minutes': self.max_live_cells_per_slice,
'dropped_mutations': f'{self.dropped_mutations}',
}
class scientific_notation:
# Python and {fmt} prints a float like "1.234E-04",
# while Java prints like "1.23E-4"
def __init__(self, value, is_scylla):
self.value = value
self.is_scylla = is_scylla
def __format__(self, _):
if self.is_scylla:
if math.isnan(self.value):
return 'nan'
return f'{self.value:.15E}'
else:
if math.isnan(self.value):
return 'NaN'
matched = re.match(r'(\d.\d+)E(\+|\-)(\d+)', f'{self.value:.15E}')
assert matched
m, sign, e = matched.group(1), matched.group(2), int(matched.group(3))
return f'{m}E{sign}{e}'
class keyspace_stats:
def __init__(self, is_scylla):
self.tables = []
self.read_count = 0
self.total_read_time = 0
self.write_count = 0
self.total_write_time = 0
self.pending_flushes = 0
self.is_scylla = is_scylla
def add_table(self, table):
self.tables.append(table)
if table.read > 0:
self.read_count += table.local_read_count
self.total_read_time += table.read_latency
if table.local_write_count > 0:
self.write_count = table.local_write_count
self.total_write_time += table.write_latency
self.pending_flushes += table.pending_flushes
@property
def read_latency(self):
if self.read_count == 0:
v = math.nan
else:
v = self.total_read_time / self.read_count / 1000
return scientific_notation(v, self.is_scylla)
@property
def write_latency(self):
if self.write_count == 0:
v = math.nan
else:
v = self.total_write_time / self.write_count / 1000
return scientific_notation(v, self.is_scylla)
def to_map(self, is_scylla):
m = {
'read_count': self.read_count,
'read_latency_ms': self.read_latency.value,
'write_count': self.write_count,
'write_latency_ms': self.write_latency.value,
'pending_flushes': self.pending_flushes,
}
if not is_scylla:
# cassandra nodetool has a duplicated item
m['read_latency'] = self.read_latency.value
return m
@pytest.mark.parametrize('command', ['tablestats', 'cfstats'])
def test_plain_text_output(request, nodetool, command):
is_scylla = request.config.getoption("nodetool") == 'scylla'
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
Table('system', 'local', 'ColumnFamilies')]
expected_requests = [expected_request(
'GET', '/column_family/',
multiple=expected_request.MULTIPLE,
response=response_from_list(['ks', 'cf', 'type'], tables))]
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
for table in tables:
stats = table_stats(table.ks, table.cf)
if not is_scylla:
# scylla tallies the table stats afterwards
expected_requests += stats.expected_summary_requests(is_scylla)
keyspaces[table.ks].add_table(stats)
if not is_scylla:
expected_requests.append(expected_request(
'GET', '/storage_service/keyspaces',
response=sorted(keyspaces.keys())))
total_nr_tables = sum(len(ks.tables) for ks in keyspaces.values())
expected_output = f'''\
Total number of tables: {total_nr_tables}
----------------
'''
for ks_name in sorted(keyspaces.keys()):
keyspace = keyspaces[ks_name]
if is_scylla:
for table in keyspace.tables:
# cassandra nodetool tallies the table stats in the loop above
expected_requests += table.expected_summary_requests(is_scylla)
expected_output += f'''\
Keyspace : {ks_name}
\tRead Count: {keyspace.read_count}
\tRead Latency: {keyspace.read_latency:.15E} ms
\tWrite Count: {keyspace.write_count}
\tWrite Latency: {keyspace.write_latency:.15E} ms
\tPending Flushes: {keyspace.pending_flushes}
'''
for table in keyspace.tables:
expected_requests.extend(table.expected_details_requests(is_scylla))
expected_output += indent(table.format(), '\t\t')
expected_output += '----------------\n'
actual_output = nodetool(command, expected_requests=expected_requests)
assert actual_output == expected_output
@pytest.mark.parametrize('output_format', ['json', 'yaml'])
def test_output_format(request, nodetool, output_format):
is_scylla = request.config.getoption("nodetool") == 'scylla'
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
Table('system', 'local', 'ColumnFamilies')]
expected_requests = [expected_request(
'GET', '/column_family/',
multiple=expected_request.MULTIPLE,
response=response_from_list(['ks', 'cf', 'type'], tables))]
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
for table in tables:
stats = table_stats(table.ks, table.cf)
if not is_scylla:
# scylla tallies the table stats afterwards
expected_requests += stats.expected_summary_requests(is_scylla)
keyspaces[table.ks].add_table(stats)
if not is_scylla:
expected_requests.append(expected_request(
'GET', '/storage_service/keyspaces',
response=sorted(keyspaces.keys())))
total_nr_tables = sum(len(ks.tables) for ks in keyspaces.values())
expected_dict = {'total_number_of_tables': total_nr_tables}
for ks_name in sorted(keyspaces.keys()):
keyspace = keyspaces[ks_name]
if is_scylla:
for table in keyspace.tables:
# cassandra nodetool tallies the table stats in the loop above
expected_requests += table.expected_summary_requests(is_scylla)
expected_dict[ks_name] = keyspace.to_map(is_scylla)
expected_tables = {}
for table in keyspace.tables:
expected_requests.extend(table.expected_details_requests(is_scylla))
expected_tables[table.cf] = table.to_map()
expected_dict[ks_name]['tables'] = expected_tables
parsers = {
'yaml': lambda m: yaml.load(m, Loader=yaml.Loader),
'json': json.loads
}
actual_output = nodetool('tablestats', '--format', output_format,
expected_requests=expected_requests)
actual_dict = parsers[output_format](actual_output)
assert actual_dict == expected_dict

View File

@@ -7,6 +7,10 @@
*/
#include <algorithm>
#include <chrono>
#include <concepts>
#include <limits>
#include <numeric>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/join.hpp>
@@ -14,7 +18,6 @@
#include <boost/lexical_cast.hpp>
#include <boost/make_shared.hpp>
#include <boost/range/adaptor/map.hpp>
#include <chrono>
#include <fmt/chrono.h>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
@@ -50,11 +53,20 @@ using namespace tools::utils;
// mimic the behavior of FileUtils::stringifyFileSize
struct file_size_printer {
uint64_t value;
bool human_readable;
file_size_printer(uint64_t value, bool human_readable = true)
: value{value}
, human_readable{human_readable}
{}
};
template <>
struct fmt::formatter<file_size_printer> : fmt::formatter<std::string_view> {
auto format(file_size_printer size, auto& ctx) const {
if (!size.human_readable) {
return fmt::format_to(ctx.out(), "{}", size.value);
}
using unit_t = std::pair<uint64_t, std::string_view>;
const unit_t units[] = {
{1UL << 40, "TB"},
@@ -1429,6 +1441,616 @@ void stop_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
client.post("/compaction_manager/stop_compaction", {{"type", compaction_type}});
}
std::map<sstring, std::vector<sstring>> get_ks_to_cfs(scylla_rest_client& client) {
auto res = client.get("/column_family/");
std::map<sstring, std::vector<sstring>> keyspaces;
for (auto& element : res.GetArray()) {
const auto& cf_info = element.GetObject();
auto ks = rjson::to_string_view(cf_info["ks"]);
auto cf = rjson::to_string_view(cf_info["cf"]);
keyspaces[sstring(ks)].push_back(sstring(cf));
}
return keyspaces;
}
// inspired by http://wg21.link/p2098
template<typename T, template<typename...> class C>
struct is_specialization_of : std::false_type {};
template<template<typename...> class C, typename... Args>
struct is_specialization_of<C<Args...>, C> : std::true_type {};
template<typename T, template<typename...> class C>
inline constexpr bool is_specialization_of_v = is_specialization_of<T, C>::value;
template<typename T>
concept is_vector = is_specialization_of_v<T, std::vector>;
using metrics_value = std::variant<bool,
double,
int64_t,
uint64_t,
std::string,
std::vector<std::string>>;
void dump_map_to_yaml(YAML::Emitter& emitter,
const std::map<std::string, metrics_value>& map) {
for (auto& [key, value] : map) {
emitter << YAML::Key << key;
emitter << YAML::Value;
std::visit([&] (auto&& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (std::convertible_to<T, std::string>) {
emitter << YAML::SingleQuoted << std::string(v);
} else if constexpr (is_vector<T>) {
emitter << YAML::BeginSeq;
for (auto& element : v) {
emitter << element;
}
emitter << YAML::EndSeq;
} else {
emitter << v;
}
}, value);
}
}
void dump_map_to_json(rjson::streaming_writer& writer,
const std::map<std::string, metrics_value>& map) {
for (auto& [key, value] : map) {
writer.Key(key);
std::visit([&] (auto&& v) {
using T = std::decay_t<decltype(v)>;
if constexpr (is_vector<T>) {
writer.StartArray();
for (auto& element : v) {
writer.Write(element);
}
writer.EndArray();
} else {
writer.Write(v);
}
}, value);
}
}
class table_metrics {
scylla_rest_client& _client;
std::string _ks_name;
std::string _cf_name;
template<typename T>
T get(std::string_view metric_name) {
auto res = _client.get(fmt::format("/column_family/metrics/{}/{}:{}",
metric_name, _ks_name, _cf_name));
return res.Get<T>();
}
struct histogram {
uint64_t count;
uint64_t sum;
uint64_t min;
uint64_t max;
double variance;
double mean;
};
histogram get_histogram(std::string_view metric_name) {
auto res = _client.get(fmt::format("/column_family/metrics/{}/{}:{}",
metric_name, _ks_name, _cf_name));
const auto& obj = res.GetObject();
// we don't use sample here
return histogram(obj["count"].GetUint64(),
obj["sum"].GetUint64(),
obj["min"].GetUint64(),
obj["max"].GetUint64(),
obj["variance"].GetDouble(),
obj["mean"].GetDouble());
}
public:
table_metrics(scylla_rest_client& client,
std::string_view keyspace_name,
std::string_view table_name)
: _client{client}
, _ks_name{keyspace_name}
, _cf_name{table_name}
{}
const std::string& name() const {
return _cf_name;
}
uint64_t live_ss_table_count() {
return get<uint64_t>("live_ss_table_count");
}
std::vector<int64_t> ss_table_count_per_level() {
auto res = _client.get(fmt::format("/column_family/sstables/per_level/{}:{}",
_ks_name, _cf_name));
std::vector<int64_t> levels;
for (auto& element : res.GetArray()) {
levels.push_back(element.GetInt64());
}
return levels;
}
uint64_t space_used_live() {
return get<uint64_t>("live_disk_space_used");
}
uint64_t space_used_total() {
return get<uint64_t>("total_disk_space_used");
}
uint64_t space_used_by_snapshots_total() {
return get<uint64_t>("snapshots_size");
}
uint64_t memtable_off_heap_size() {
return get<uint64_t>("memtable_off_heap_size");
}
uint64_t bloom_filter_off_heap_memory_used() {
return get<uint64_t>("bloom_filter_off_heap_memory_used");
}
uint64_t index_summary_off_heap_memory_used() {
return get<uint64_t>("index_summary_off_heap_memory_used");
}
uint64_t compression_metadata_off_heap_memory_used() {
return get<uint64_t>("compression_metadata_off_heap_memory_used");
}
double compression_ratio() {
return get<double>("compression_ratio");
}
uint64_t estimated_row_count() {
return get<uint64_t>("estimated_row_count");
}
uint64_t memtable_columns_count() {
return get<uint64_t>("memtable_columns_count");
}
uint64_t memtable_live_data_size() {
return get<uint64_t>("memtable_live_data_size");
}
uint64_t memtable_switch_count() {
return get<uint64_t>("memtable_switch_count");
}
uint64_t read() {
return get<uint64_t>("read");
}
uint64_t read_latency() {
return get<uint64_t>("read_latency");
}
uint64_t write() {
return get<uint64_t>("write");
}
uint64_t write_latency() {
return get<uint64_t>("write_latency");
}
uint64_t pending_flushes() {
return get<uint64_t>("pending_flushes");
}
uint64_t bloom_filter_false_positives() {
return get<uint64_t>("bloom_filter_false_positives");
}
double recent_bloom_filter_false_ratio() {
return get<double>("recent_bloom_filter_false_ratio");
}
uint64_t bloom_filter_disk_space_used() {
return get<uint64_t>("bloom_filter_disk_space_used");
}
uint64_t min_row_size() {
return get<uint64_t>("min_row_size");
}
uint64_t max_row_size() {
return get<uint64_t>("max_row_size");
}
uint64_t mean_row_size() {
return get<uint64_t>("mean_row_size");
}
histogram live_scanned_histogram() {
return get_histogram("live_scanned_histogram");
}
histogram tombstone_scanned_histogram() {
return get_histogram("tombstone_scanned_histogram");
}
struct latency_hist {
int64_t count;
float latency;
};
latency_hist latency_histogram(std::string_view name) {
auto res = _client.get(fmt::format("/column_family/metrics/{}/moving_average_histogram/{}:{}",
name, _ks_name, _cf_name));
const auto& moving_avg_and_hist = res.GetObject();
const auto& hist = moving_avg_and_hist["hist"].GetObject();
return latency_hist{
hist["count"].GetInt64(),
hist["mean"].GetDouble() / 1000
};
}
static std::vector<std::string> print_ss_table_levels(const std::vector<int64_t>& counts) {
constexpr int LEVEL_FANOUT_SIZE = 10;
int64_t max_count;
std::vector<std::string> levels;
for (unsigned level = 0; level < counts.size(); level++) {
std::string fmt_sstable;
auto out = std::back_inserter(fmt_sstable);
auto count = counts[level];
if (level == 0) {
max_count = 4;
} else if (level == 1) {
max_count = LEVEL_FANOUT_SIZE;
} else {
max_count *= LEVEL_FANOUT_SIZE;
}
out = fmt::format_to(out, "{}", count);
if (count > max_count) {
fmt::format_to(out, "/{}", max_count);
}
levels.push_back(std::move(fmt_sstable));
}
return levels;
}
static std::pair<double, double> mean_max(const std::vector<double> v) {
if (v.empty()) {
return {0, 0};
}
auto sum = std::accumulate(v.begin(), v.end(), .0);
auto max_element = std::ranges::max_element(v);
return {sum / v.size(), *max_element};
}
std::map<std::string, metrics_value> to_map(bool human_readable) {
std::map<std::string, metrics_value> map;
if (auto levels = ss_table_count_per_level(); !levels.empty()) {
map.emplace("sstables_in_each_level", print_ss_table_levels(levels));
}
map.emplace("space_used_live",
fmt::to_string(file_size_printer(space_used_live(), human_readable)));
map.emplace("space_used_total",
fmt::to_string(file_size_printer(space_used_total(), human_readable)));
map.emplace("space_used_by_snapshots_total",
fmt::to_string(file_size_printer(space_used_by_snapshots_total(), human_readable)));
auto memtable_off_heap_mem_size = memtable_off_heap_size();
auto bloom_filter_off_heap_mem_size = bloom_filter_off_heap_memory_used();
auto index_summary_off_heap_mem_size = index_summary_off_heap_memory_used();
auto compression_metadata_off_heap_mem_size = compression_metadata_off_heap_memory_used();
auto total_off_heap_size = (memtable_off_heap_mem_size +
bloom_filter_off_heap_mem_size +
index_summary_off_heap_mem_size +
compression_metadata_off_heap_mem_size);
map.emplace("off_heap_memory_used_total",
fmt::to_string(file_size_printer(total_off_heap_size, human_readable)));
map.emplace("sstable_compression_ratio", compression_ratio());
map.emplace("number_of_partitions_estimate", estimated_row_count());
map.emplace("memtable_cell_count", memtable_columns_count());
map.emplace("memtable_data_size",
fmt::to_string(file_size_printer(memtable_live_data_size(), human_readable)));
map.emplace("memtable_off_heap_memory_used",
fmt::to_string(file_size_printer(memtable_off_heap_mem_size, human_readable)));
map.emplace("memtable_switch_count", memtable_switch_count());
auto local_reads = latency_histogram("read_latency");
map.emplace("local_read_count", local_reads.count);
map.emplace("local_read_latency_ms",
fmt::format("{:.3f}", local_reads.latency));
auto local_writes = latency_histogram("write_latency");
map.emplace("local_write_count", local_writes.count);
map.emplace("local_write_latency_ms",
fmt::format("{:.3f}", local_writes.latency));
map.emplace("pending_flushes", pending_flushes());
// scylla does not support it.
map.emplace("percent_repaired", 0.0);
map.emplace("bloom_filter_false_positives", bloom_filter_false_positives());
map.emplace("bloom_filter_false_ratio",
fmt::format("{:01.5f}", recent_bloom_filter_false_ratio()));
map.emplace("bloom_filter_space_used",
fmt::to_string(file_size_printer(bloom_filter_disk_space_used(), human_readable)));
map.emplace("bloom_filter_off_heap_memory_used",
fmt::to_string(file_size_printer(bloom_filter_off_heap_mem_size, human_readable)));
map.emplace("index_summary_off_heap_memory_used",
fmt::to_string(file_size_printer(index_summary_off_heap_mem_size, human_readable)));
map.emplace("compression_metadata_off_heap_memory_used",
fmt::to_string(file_size_printer(compression_metadata_off_heap_mem_size, human_readable)));
map.emplace("compacted_partition_minimum_bytes", min_row_size());
map.emplace("compacted_partition_maximum_bytes", max_row_size());
map.emplace("compacted_partition_mean_bytes", mean_row_size());
auto live_cells_per_slice = live_scanned_histogram();
map.emplace("average_live_cells_per_slice_last_five_minutes", live_cells_per_slice.mean);
map.emplace("maximum_live_cells_per_slice_last_five_minutes", live_cells_per_slice.max);
auto tombstones_per_slice = tombstone_scanned_histogram();
map.emplace("average_tombstones_per_slice_last_five_minutes", tombstones_per_slice.mean);
map.emplace("maximum_tombstones_per_slice_last_five_minutes", live_cells_per_slice.max);
// scylla does not support it.
map.emplace("dropped_mutations", fmt::to_string(0));
return map;
}
void print(bool human_readable) {
fmt::print("\t\tTable: {}\n", name());
fmt::print("\t\tSSTable count: {}\n", live_ss_table_count());
if (auto levels = ss_table_count_per_level(); !levels.empty()) {
fmt::print("\t\tSSTables in each level: [{}]\n",
fmt::join(print_ss_table_levels(levels), ", "));
}
fmt::print("\t\tSpace used (live): {}\n",
file_size_printer(space_used_live(), human_readable));
fmt::print("\t\tSpace used (total): {}\n",
file_size_printer(space_used_total(), human_readable));
fmt::print("\t\tSpace used by snapshots (total): {}\n",
file_size_printer(space_used_by_snapshots_total(), human_readable));
auto memtable_off_heap_mem_size = memtable_off_heap_size();
auto bloom_filter_off_heap_mem_size = bloom_filter_off_heap_memory_used();
auto index_summary_off_heap_mem_size = index_summary_off_heap_memory_used();
auto compression_metadata_off_heap_mem_size = compression_metadata_off_heap_memory_used();
auto total_off_heap_size = (memtable_off_heap_mem_size +
bloom_filter_off_heap_mem_size +
index_summary_off_heap_mem_size +
compression_metadata_off_heap_mem_size);
fmt::print("\t\tOff heap memory used (total): {}\n", file_size_printer(total_off_heap_size, human_readable));
fmt::print("\t\tSSTable Compression Ratio: {:.1f}\n", compression_ratio());
fmt::print("\t\tNumber of partitions (estimate): {}\n", estimated_row_count());
fmt::print("\t\tMemtable cell count: {}\n", memtable_columns_count());
fmt::print("\t\tMemtable data size: {}\n", memtable_live_data_size());
fmt::print("\t\tMemtable off heap memory used: {}\n", file_size_printer(memtable_off_heap_mem_size, human_readable));
fmt::print("\t\tMemtable switch count: {}\n", memtable_switch_count());
auto local_reads = latency_histogram("read_latency");
fmt::print("\t\tLocal read count: {}\n", local_reads.count);
fmt::print("\t\tLocal read latency: {:.3f} ms\n", local_reads.latency);
auto local_writes = latency_histogram("write_latency");
fmt::print("\t\tLocal write count: {}\n", local_writes.count);
fmt::print("\t\tLocal write latency: {:.3f} ms\n", local_writes.latency);
fmt::print("\t\tPending flushes: {}\n", pending_flushes());
// scylla does not support it.
fmt::print("\t\tPercent repaired: {:.1f}\n", 0.0);
fmt::print("\t\tBloom filter false positives: {}\n", bloom_filter_false_positives());
fmt::print("\t\tBloom filter false ratio: {:01.5f}\n", recent_bloom_filter_false_ratio());
fmt::print("\t\tBloom filter space used: {}\n",
file_size_printer(bloom_filter_disk_space_used(), human_readable));
fmt::print("\t\tBloom filter off heap memory used: {}\n",
file_size_printer(bloom_filter_off_heap_mem_size, human_readable));
fmt::print("\t\tIndex summary off heap memory used: {}\n",
file_size_printer(index_summary_off_heap_mem_size, human_readable));
fmt::print("\t\tCompression metadata off heap memory used: {}\n",
file_size_printer(compression_metadata_off_heap_mem_size, human_readable));
fmt::print("\t\tCompacted partition minimum bytes: {}\n", min_row_size());
fmt::print("\t\tCompacted partition maximum bytes: {}\n", max_row_size());
fmt::print("\t\tCompacted partition mean bytes: {}\n", mean_row_size());
auto live_cells_per_slice = live_scanned_histogram();
fmt::print("\t\tAverage live cells per slice (last five minutes): {:.1f}\n",
live_cells_per_slice.mean);
fmt::print("\t\tMaximum live cells per slice (last five minutes): {}\n",
live_cells_per_slice.max);
auto tombstones_per_slice = tombstone_scanned_histogram();
fmt::print("\t\tAverage tombstones per slice (last five minutes): {:.1f}\n",
tombstones_per_slice.mean);
fmt::print("\t\tMaximum tombstones per slice (last five minutes): {}\n",
tombstones_per_slice.max);
// scylla does not support it.
fmt::print("\t\tDropped Mutations: {}\n\n", 0);
}
};
struct keyspace_stats {
sstring name;
uint64_t read_count = 0;
double total_read_time = .0;
uint64_t write_count = 0;
double total_write_time = .0;
uint64_t pending_flushes = 0;
std::vector<table_metrics> tables;
void add(table_metrics&& metrics, bool is_included) {
auto write_hist = metrics.latency_histogram("write_latency");
if (write_hist.count > 0) {
write_count += write_hist.count;
total_write_time += metrics.write_latency();
}
auto read_hist = metrics.latency_histogram("read_latency");
if (read_hist.count > 0) {
read_count += read_hist.count;
total_read_time += metrics.read_latency();
}
pending_flushes += metrics.pending_flushes();
if (is_included) {
tables.push_back(std::move(metrics));
}
}
double read_latency() const {
if (read_count == 0) {
return std::numeric_limits<double>::quiet_NaN();
}
return total_read_time / read_count / 1000;
}
double write_latency() const {
if (write_count == 0) {
return std::numeric_limits<double>::quiet_NaN();
}
return total_write_time / write_count / 1000;
}
std::map<std::string, metrics_value> to_map() {
std::map<std::string, metrics_value> map;
map.emplace("read_count", read_count);
map.emplace("read_latency_ms", read_latency());
map.emplace("write_count", write_count);
map.emplace("write_latency_ms", write_latency());
map.emplace("pending_flushes", pending_flushes);
return map;
}
};
class table_filter {
enum class mode {
inclusive,
exclusive,
};
mode _mode;
std::map<sstring, std::set<sstring>> _filter;
static std::map<sstring, std::set<sstring>> init_filter(const std::vector<sstring>& specs) {
std::map<sstring, std::set<sstring>> filter;
for (auto& s : specs) {
// Usually, keyspace name and table is are separated by a
// dot, but to allow names which themselves contain a dot
// (this is allowed in Alternator), also allow to separate
// the two parts with a slash instead:
// Allow the syntax "keyspace.name/" to represent a
// keyspace with a dot in its name.
auto delim = s.find('/');
if (delim == s.npos) {
delim = s.find('.');
}
auto first = s.substr(0, delim);
auto [ks, inserted] = filter.try_emplace(first);
auto second = s.substr(delim);
if (!second.empty()) {
ks->second.insert(second);
}
}
return filter;
}
bool belongs_to_filter(const sstring& ks, const sstring& cf) const {
if (_filter.empty()) {
// an empty _filter implies the universal set
return true;
}
auto found = _filter.find(ks);
if (found == _filter.end()) {
return false;
}
auto& tables = found->second;
if (tables.empty()) {
// this implies all tables in this ks
return true;
}
return tables.contains(cf);
}
public:
table_filter(bool ignore_specified, const std::vector<sstring>& specs)
: _mode(ignore_specified ? mode::exclusive : mode::inclusive)
, _filter(init_filter(specs))
{}
bool operator()(const sstring& ks, const sstring& cf) const {
if (belongs_to_filter(ks, cf)) {
return _mode == mode::inclusive;
} else {
return _mode == mode::exclusive;
}
}
};
void table_stats_print_plain(scylla_rest_client& client,
const table_filter& is_included,
bool human_readable) {
fmt::print("Total number of tables: {}\n", client.get("/column_family/").GetArray().Size());
fmt::print("----------------\n");
for (auto& [keyspace_name, table_names] : get_ks_to_cfs(client)) {
keyspace_stats keyspace;
for (auto& table_name : table_names) {
keyspace.add(table_metrics(client, keyspace_name, table_name),
is_included(keyspace_name, table_name));
}
fmt::print("Keyspace : {}\n", keyspace_name);
fmt::print("\tRead Count: {}\n", keyspace.read_count);
fmt::print("\tRead Latency: {:.15E} ms\n", keyspace.read_latency());
fmt::print("\tWrite Count: {}\n", keyspace.write_count);
fmt::print("\tWrite Latency: {:.15E} ms\n", keyspace.write_latency());
fmt::print("\tPending Flushes: {}\n", keyspace.pending_flushes);
for (auto& table : keyspace.tables) {
table.print(human_readable);
}
fmt::print("----------------\n");
}
}
void table_stats_print_json(scylla_rest_client& client,
const table_filter& is_included,
bool human_readable) {
// {
// 'total_number_of_tables': ...,
// 'keyspace1' : {
// 'read_latency': ...,
// # ...
// 'tables': {
// 'cf1': {
// 'sstables_in_each_level': ...
// # ...
// },
// # ...
// }
// }
// # ...
// }
rjson::streaming_writer writer;
writer.StartObject();
writer.Key("total_number_of_tables");
writer.Write(client.get("/column_family/").GetArray().Size());
for (auto& [keyspace_name, table_names] : get_ks_to_cfs(client)) {
keyspace_stats keyspace;
for (auto& table_name : table_names) {
keyspace.add(table_metrics(client, keyspace_name, table_name),
is_included(keyspace_name, table_name));
}
writer.Key(keyspace_name);
writer.StartObject();
dump_map_to_json(writer, keyspace.to_map());
writer.Key("tables");
writer.StartObject();
for (auto& table : keyspace.tables) {
writer.Key(table.name());
writer.StartObject();
dump_map_to_json(writer, table.to_map(human_readable));
writer.EndObject();
}
writer.EndObject();
writer.EndObject();
}
writer.EndObject();
}
void table_stats_print_yaml(scylla_rest_client& client,
const table_filter& is_included,
bool human_readable) {
// the structure of generated yaml is identical to the one generated by
// table_stats_print_json()
YAML::Emitter emitter(std::cout);
emitter << YAML::BeginMap;
emitter << YAML::Key << "total_number_of_tables";
emitter << YAML::Value << client.get("/column_family/").GetArray().Size();
for (auto& [keyspace_name, table_names] : get_ks_to_cfs(client)) {
keyspace_stats keyspace;
for (auto& table_name : table_names) {
keyspace.add(table_metrics(client, keyspace_name, table_name),
is_included(keyspace_name, table_name));
}
emitter << YAML::Key << keyspace_name;
emitter << YAML::BeginMap;
dump_map_to_yaml(emitter, keyspace.to_map());
emitter << YAML::Key << "tables";
emitter << YAML::BeginMap;
for (auto& table : keyspace.tables) {
emitter << YAML::Key << table.name();
emitter << YAML::BeginMap;
dump_map_to_yaml(emitter, table.to_map(human_readable));
emitter << YAML::EndMap;
}
emitter << YAML::EndMap;
emitter << YAML::EndMap;
}
emitter << YAML::EndMap;
}
void table_stats_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
std::vector<sstring> match_set;
if (vm.contains("tables")) {
match_set = vm["tables"].as<std::vector<sstring>>();
}
table_filter is_included(vm["ignore"].as<bool>(), match_set);
const auto format = vm["format"].as<sstring>();
const auto human_readable = vm["human-readable"].as<bool>();
if (format == "json") {
table_stats_print_json(client, is_included, human_readable);
} else if (format == "yaml") {
table_stats_print_yaml(client, is_included, human_readable);
} else {
table_stats_print_plain(client, is_included, human_readable);
}
}
void toppartitions_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
// sanity check the arguments
auto list_size = vm["size"].as<int>();
@@ -2252,6 +2874,25 @@ Fore more information, see: https://opensource.docs.scylladb.com/stable/operatin
},
stop_operation
},
{
{
"tablestats",
{"cfstats"},
"Print statistics on tables",
R"(
Fore more information, see: https://opensource.docs.scylladb.com/stable/operating-scylla/nodetool-commands/tablestats.html
)",
{
typed_option<bool>("ignore,i", false, "Ignore the list of tables and display the remaining tables"),
typed_option<bool>("human-readable,H", false, "Display bytes in human readable form, i.e. KiB, MiB, GiB, TiB"),
typed_option<sstring>("format,F", "plain", "Output format (json, yaml)"),
},
{
typed_option<std::vector<sstring>>("tables", "List of tables (or keyspace) names", -1),
},
},
table_stats_operation
},
{
{
"toppartitions",