Merge "nodetool toppartitions" from Rafi & Avi

Implementation of nodetool toppartiotion query, which samples most frequest PKs in read/write
operation over a period of time.

Content:
- data_listener classes: mechanism that interfaces with mutation readers in database and table classes,
- toppartition_query and toppartition_data_listener classes to implement toppartition-specific query (this
  interfaces with data_listeners and the REST api),
- REST api for toppartitions query.

Uses Top-k structure for handling stream summary statistics (based on implementation in C*, see #2811).

What's still missing:
- JMX interface to nodetool (interface customization may be required),
- Querying #rows and #bytes (currently, only #partitions is supported).

Fixes #2811

* https://github.com/avikivity/scylla rafie_toppartitions_v7.1:
  top_k: whitespace and minor fixes
  top_k: map template arguments
  top_k: std::list -> chunked_vector
  top_k: support for appending top_k results
  nodetool toppartitions: refactor table::config constructor
  nodetool toppartitions: data listeners
  nodetool toppartitions: add data_listeners to database/table
  nodetool toppartitions: fully_qualified_cf_name
  nodetool toppartitions: Toppartitions query implementation
  nodetool toppartitions: Toppartitions query REST API
  nodetool toppartitions: nodetool-toppartitions script
This commit is contained in:
Tomasz Grabiec
2018-12-28 16:31:21 +01:00
15 changed files with 756 additions and 62 deletions

View File

@@ -611,6 +611,54 @@
}
]
},
{
"path":"/column_family/toppartitions/{name}",
"operations":[
{
"method":"GET",
"summary":"Toppartitions query",
"type":"toppartitions_query_results",
"nickname":"toppartitions",
"produces":[
"application/json"
],
"parameters":[
{
"name":"name",
"description":"The column family name in keyspace:name format",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"duration",
"description":"Duration (in milliseconds) of monitoring operation",
"required":true,
"allowMultiple":false,
"type":"int",
"paramType":"query"
},
{
"name":"list_size",
"description":"number of the top partitions to list",
"required":false,
"allowMultiple":false,
"type":"int",
"paramType":"query"
},
{
"name":"capacity",
"description":"capacity of stream summary: determines amount of resources used in query processing",
"required":false,
"allowMultiple":false,
"type":"int",
"paramType":"query"
}
]
}
]
},
{
"path":"/column_family/metrics/memtable_columns_count/",
"operations":[
@@ -2816,6 +2864,44 @@
"description":"The column family type"
}
}
},
"toppartitions_record":{
"id":"toppartitions_record",
"description":"nodetool toppartitions query record",
"properties":{
"partition":{
"type":"string",
"description":"Partition key"
},
"count":{
"type":"long",
"description":"Number of read/write operations"
},
"error":{
"type":"long",
"description":"Indication of inaccuracy in counting PKs"
}
}
},
"toppartitions_query_results":{
"id":"toppartitions_query_results",
"description":"nodetool toppartitions query results",
"properties":{
"read":{
"type":"array",
"items":{
"type":"toppartitions_record"
},
"description":"Read results"
},
"write":{
"type":"array",
"items":{
"type":"toppartitions_record"
},
"description":"Write results"
}
}
}
}
}

View File

@@ -41,6 +41,8 @@
#include "system.hh"
#include "api/config.hh"
logging::logger apilog("api");
namespace api {
static std::unique_ptr<reply> exception_reply(std::exception_ptr eptr) {

View File

@@ -25,6 +25,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/units/detail/utility.hpp>
#include "api/api-doc/utils.json.hh"
#include "utils/histogram.hh"
#include <seastar/http/exception.hh>
@@ -216,4 +217,27 @@ std::vector<T> concat(std::vector<T> a, std::vector<T>&& b) {
return a;
}
template <class T, class Base = T>
class req_param {
public:
sstring name;
sstring param;
T value;
req_param(const request& req, sstring name, T default_val) : name(name) {
param = req.get_query_param(name);
if (param.empty()) {
value = default_val;
return;
}
try {
value = T{boost::lexical_cast<Base>(param)};
} catch (boost::bad_lexical_cast&) {
throw bad_param_exception(format("{} ({}): type error - should be {}", name, param, boost::units::detail::demangle(typeid(Base).name())));
}
}
operator T() const { return value; }
};
}

View File

@@ -27,6 +27,10 @@
#include "utils/estimated_histogram.hh"
#include <algorithm>
#include "db/data_listeners.hh"
extern logging::logger apilog;
namespace api {
using namespace httpd;
@@ -34,7 +38,7 @@ using namespace std;
using namespace json;
namespace cf = httpd::column_family_json;
const utils::UUID& get_uuid(const sstring& name, const database& db) {
std::tuple<sstring, sstring> parse_fully_qualified_cf_name(sstring name) {
auto pos = name.find("%3A");
size_t end;
if (pos == sstring::npos) {
@@ -46,11 +50,15 @@ const utils::UUID& get_uuid(const sstring& name, const database& db) {
} else {
end = pos + 3;
}
return std::make_tuple(name.substr(0, pos), name.substr(end));
}
const utils::UUID& get_uuid(const sstring& name, const database& db) {
auto [ks, cf] = parse_fully_qualified_cf_name(name);
try {
return db.find_uuid(name.substr(0, pos), name.substr(end));
return db.find_uuid(ks, cf);
} catch (std::out_of_range& e) {
throw bad_param_exception("Column family '" + name.substr(0, pos) + ":"
+ name.substr(end) + "' not found");
throw bad_param_exception(format("Column family '{}:{}' not found", ks, cf));
}
}
@@ -920,5 +928,45 @@ void set_column_family(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(container_to_vec(res));
});
});
cf::toppartitions.set(r, [&ctx] (std::unique_ptr<request> req) {
auto name_param = req->param["name"];
auto [ks, cf] = parse_fully_qualified_cf_name(name_param);
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
api::req_param<unsigned> capacity(*req, "capacity", 256);
api::req_param<unsigned> list_size(*req, "list_size", 10);
apilog.info("toppartitions query: name={} duration={} list_size={} capacity={}",
name_param, duration.param, list_size.param, capacity.param);
return seastar::do_with(db::toppartitions_query(ctx.db, ks, cf, duration.value, list_size, capacity), [&ctx](auto& q) {
return q.scatter().then([&q] {
return sleep(q.duration()).then([&q] {
return q.gather(q.capacity()).then([&q] (auto topk_results) {
apilog.debug("toppartitions query: processing results");
cf::toppartitions_query_results results;
for (auto& d: topk_results.read.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = sstring(d.item);
r.count = d.count;
r.error = d.error;
results.read.push(r);
}
for (auto& d: topk_results.write.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = sstring(d.item);
r.count = d.count;
r.error = d.error;
results.write.push(r);
}
return make_ready_future<json::json_return_type>(results);
});
});
});
});
});
}
}

View File

@@ -362,6 +362,7 @@ scylla_tests = [
'tests/top_k_test',
'tests/utf8_test',
'tests/small_vector_test',
'tests/data_listeners_test',
]
perf_tests = [
@@ -581,6 +582,7 @@ scylla_core = (['database.cc',
'db/commitlog/commitlog.cc',
'db/commitlog/commitlog_replayer.cc',
'db/commitlog/commitlog_entry.cc',
'db/data_listeners.cc',
'db/hints/manager.cc',
'db/hints/resource_manager.cc',
'db/config.cc',

View File

@@ -83,6 +83,8 @@
#include "db/timeout_clock.hh"
#include "db/data_listeners.hh"
using namespace std::chrono_literals;
logging::logger dblog("database");
@@ -700,7 +702,12 @@ table::make_reader(schema_ptr s,
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
auto comb_reader = make_combined_reader(s, std::move(readers), fwd, fwd_mr);
if (_config.data_listeners && !_config.data_listeners->empty()) {
return _config.data_listeners->on_read(s, range, slice, std::move(comb_reader));
} else {
return comb_reader;
}
}
sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<sstring> subdir) {
@@ -2228,6 +2235,7 @@ database::database(const db::config& cfg, database_config dbcfg)
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
, _large_partition_handler(std::make_unique<db::cql_table_large_partition_handler>(_cfg->compaction_large_partition_warning_threshold_mb()*1024*1024))
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>(*this))
{
local_schema_registry().init(*this); // TODO: we're never unbound.
setup_metrics();
@@ -2776,7 +2784,7 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family:
future<> database::add_column_family_and_make_directory(schema_ptr schema) {
auto& ks = find_keyspace(schema->ks_name());
add_column_family(ks, schema, ks.make_column_family_config(*schema, get_config(), get_large_partition_handler()));
add_column_family(ks, schema, ks.make_column_family_config(*schema, *this));
find_column_family(schema).get_index_manager().reload();
return ks.make_directory_for_column_family(schema->cf_name(), schema->id());
}
@@ -2948,8 +2956,10 @@ void keyspace::update_from(::lw_shared_ptr<keyspace_metadata> ksm) {
}
column_family::config
keyspace::make_column_family_config(const schema& s, const db::config& db_config, db::large_partition_handler* lp_handler) const {
keyspace::make_column_family_config(const schema& s, const database& db) const {
column_family::config cfg;
const db::config& db_config = db.get_config();
for (auto& extra : _config.all_datadirs) {
cfg.all_datadirs.push_back(column_family_directory(extra, s.cf_name(), s.id()));
}
@@ -2972,9 +2982,10 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.streaming_scheduling_group = _config.streaming_scheduling_group;
cfg.statement_scheduling_group = _config.statement_scheduling_group;
cfg.enable_metrics_reporting = db_config.enable_keyspace_column_family_metrics();
cfg.large_partition_handler = lp_handler;
cfg.large_partition_handler = db.get_large_partition_handler();
cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore;
cfg.view_update_concurrency_semaphore_limit = _config.view_update_concurrency_semaphore_limit;
cfg.data_listeners = &db.data_listeners();
return cfg;
}
@@ -3604,6 +3615,9 @@ void dirty_memory_manager::start_reclaiming() noexcept {
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
auto& cf = find_column_family(m.column_family_id());
data_listeners().on_write(m_schema, m);
return cf.dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h)]() mutable {
try {
auto& cf = find_column_family(m.column_family_id());

View File

@@ -127,6 +127,7 @@ class commitlog;
class config;
class extensions;
class rp_handle;
class data_listeners;
namespace system_keyspace {
void make(database& db, bool durable, bool volatile_testing_only);
@@ -318,6 +319,7 @@ public:
db::large_partition_handler* large_partition_handler;
db::timeout_semaphore* view_update_concurrency_semaphore;
size_t view_update_concurrency_semaphore_limit;
db::data_listeners* data_listeners = nullptr;
};
struct no_commitlog {};
struct stats {
@@ -870,7 +872,7 @@ public:
return _index_manager;
}
db::large_partition_handler* get_large_partition_handler() {
db::large_partition_handler* get_large_partition_handler() const {
assert(_config.large_partition_handler);
return _config.large_partition_handler;
}
@@ -1107,7 +1109,7 @@ public:
*/
locator::abstract_replication_strategy& get_replication_strategy();
const locator::abstract_replication_strategy& get_replication_strategy() const;
column_family::config make_column_family_config(const schema& s, const db::config& db_config, db::large_partition_handler* lp_handler) const;
column_family::config make_column_family_config(const schema& s, const database& db) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
void add_or_update_column_family(const schema_ptr& s) {
_metadata->add_or_update_column_family(s);
@@ -1254,6 +1256,11 @@ private:
std::unique_ptr<db::large_partition_handler> _large_partition_handler;
query::result_memory_limiter _result_memory_limiter;
friend db::data_listeners;
std::unique_ptr<db::data_listeners> _data_listeners;
future<> init_commitlog();
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);
@@ -1270,8 +1277,6 @@ private:
future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
query::result_memory_limiter _result_memory_limiter;
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
tracing::trace_state_ptr trace_state);
@@ -1397,7 +1402,7 @@ public:
}
const db::extensions& extensions() const;
db::large_partition_handler* get_large_partition_handler() {
db::large_partition_handler* get_large_partition_handler() const {
return _large_partition_handler.get();
}
@@ -1458,6 +1463,10 @@ public:
}
friend class distributed_loader;
db::data_listeners& data_listeners() const {
return *_data_listeners;
}
};
// Creates a streaming reader that reads from all shards.

139
db/data_listeners.cc Executable file
View File

@@ -0,0 +1,139 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "db/data_listeners.hh"
#include "database.hh"
#include "db_clock.hh"
#include <tuple>
extern logging::logger dblog;
namespace db {
void data_listeners::install(data_listener* listener) {
_listeners.emplace(listener);
dblog.debug("data_listeners: install listener {}", listener);
}
void data_listeners::uninstall(data_listener* listener) {
dblog.debug("data_listeners: uninstall listener {}", listener);
_listeners.erase(listener);
}
bool data_listeners::exists(data_listener* listener) const {
return _listeners.count(listener) != 0;
}
flat_mutation_reader data_listeners::on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) {
for (auto&& li : _listeners) {
rd = li->on_read(s, range, slice, std::move(rd));
}
return std::move(rd);
}
void data_listeners::on_write(const schema_ptr& s, const frozen_mutation& m) {
for (auto&& li : _listeners) {
li->on_write(s, m);
}
}
toppartitons_item_key::operator sstring() const {
std::ostringstream oss;
oss << key.key().with_schema(*schema);
return oss.str();
}
toppartitions_data_listener::toppartitions_data_listener(database& db, sstring ks, sstring cf) : _db(db), _ks(ks), _cf(cf) {
dblog.debug("toppartitions_data_listener: installing {}", this);
_db.data_listeners().install(this);
}
toppartitions_data_listener::~toppartitions_data_listener() {
dblog.debug("toppartitions_data_listener: uninstalling {}", this);
_db.data_listeners().uninstall(this);
}
future<> toppartitions_data_listener::stop() {
dblog.debug("toppartitions_data_listener: stopping {}", this);
return make_ready_future<>();
}
flat_mutation_reader toppartitions_data_listener::on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) {
if (s->ks_name() != _ks || s->cf_name() != _cf) {
return std::move(rd);
}
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
return make_filtering_reader(std::move(rd), [this, &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
_top_k_read.append(toppartitons_item_key{s, dk});
return true;
});
}
void toppartitions_data_listener::on_write(const schema_ptr& s, const frozen_mutation& m) {
if (s->ks_name() != _ks || s->cf_name() != _cf) {
return;
}
dblog.trace("toppartitions_data_listener::on_write: {}.{}", _ks, _cf);
_top_k_write.append(toppartitons_item_key{s, m.decorated_key(*s)});
}
toppartitions_query::toppartitions_query(distributed<database>& xdb, sstring ks, sstring cf,
std::chrono::milliseconds duration, size_t list_size, size_t capacity)
: _xdb(xdb), _ks(ks), _cf(cf), _duration(duration), _list_size(list_size), _capacity(capacity) {
dblog.debug("toppartitions_query on {}.{}", _ks, _cf);
}
future<> toppartitions_query::scatter() {
return _query.start(std::ref(_xdb), _ks, _cf);
}
using top_t = toppartitions_data_listener::top_k::results;
future<toppartitions_query::results> toppartitions_query::gather(unsigned res_size) {
dblog.debug("toppartitions_query::gather");
auto map = [res_size, this] (toppartitions_data_listener& listener) {
dblog.trace("toppartitions_query::map_reduce with listener {}", &listener);
top_t rd = listener._top_k_read.top(res_size);
top_t wr = listener._top_k_write.top(res_size);
return std::tuple<top_t, top_t>{std::move(rd), std::move(wr)};
};
auto reduce = [this] (results res, std::tuple<top_t, top_t> rd_wr) {
res.read.append(std::get<0>(rd_wr));
res.write.append(std::get<1>(rd_wr));
return std::move(res);
};
return _query.map_reduce0(map, results{res_size}, reduce)
.handle_exception([] (auto ep) {
dblog.error("toppartitions_query::gather: {}", ep);
return make_exception_future<results>(ep);
}).finally([this] () {
dblog.debug("toppartitions_query::gather: stopping query");
return _query.stop().then([this] {
dblog.debug("toppartitions_query::gather: query stopped");
});
});
}
} // namespace db

153
db/data_listeners.hh Executable file
View File

@@ -0,0 +1,153 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/distributed.hh>
#include <seastar/core/future.hh>
#include <seastar/core/distributed.hh>
#include "schema.hh"
#include "flat_mutation_reader.hh"
#include "mutation_reader.hh"
#include "frozen_mutation.hh"
#include "utils/top_k.hh"
#include <vector>
#include <set>
namespace db {
class data_listener {
public:
// Invoked for each write, with partition granularity.
// The schema_ptr passed is the one which corresponds to the incoming mutation, not the current schema of the table.
virtual void on_write(const schema_ptr&, const frozen_mutation&) { }
// Invoked for each query (both data query and mutation query) when a mutation reader is created.
// Paging queries may invoke this once for a page, or less often, depending on whether they hit in the querier cache or not.
//
// The flat_mutation_reader passed to this method is the reader from which the query results are built (uncompacted).
// This method replaces that reader with the one returned from this method.
// This allows the listener to install on-the-fly processing for the mutation stream.
//
// The schema_ptr passed is the one which corresponds to the reader, not the current schema of the table.
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) {
return std::move(rd);
}
};
class data_listeners {
database& _db;
std::set<data_listener*> _listeners;
public:
data_listeners(database& db) : _db(db) {}
void install(data_listener* listener);
void uninstall(data_listener* listener);
flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd);
void on_write(const schema_ptr& s, const frozen_mutation& m);
bool exists(data_listener* listener) const;
bool empty() const { return _listeners.empty(); }
};
struct toppartitons_item_key {
schema_ptr schema;
dht::decorated_key key;
toppartitons_item_key(const schema_ptr& schema, const dht::decorated_key& key) : schema(schema), key(key) {}
toppartitons_item_key(const toppartitons_item_key& key) noexcept : schema(key.schema), key(key.key) {}
struct hash {
size_t operator()(const toppartitons_item_key& k) const {
return std::hash<dht::token>()(k.key.token());
}
};
struct comp {
bool operator()(const toppartitons_item_key& k1, const toppartitons_item_key& k2) const {
return k1.schema == k2.schema && k1.key.equal(*k2.schema, k2.key);
}
};
explicit operator sstring() const;
};
class toppartitions_data_listener : public data_listener {
friend class toppartitions_query;
database& _db;
sstring _ks;
sstring _cf;
public:
using top_k = utils::space_saving_top_k<toppartitons_item_key, toppartitons_item_key::hash, toppartitons_item_key::comp>;
private:
top_k _top_k_read;
top_k _top_k_write;
public:
toppartitions_data_listener(database& db, sstring ks, sstring cf);
~toppartitions_data_listener();
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) override;
virtual void on_write(const schema_ptr& s, const frozen_mutation& m) override;
future<> stop();
};
class toppartitions_query {
distributed<database>& _xdb;
sstring _ks;
sstring _cf;
std::chrono::milliseconds _duration;
size_t _list_size;
size_t _capacity;
sharded<toppartitions_data_listener> _query;
public:
toppartitions_query(seastar::distributed<database>& xdb, sstring ks, sstring cf,
std::chrono::milliseconds duration, size_t list_size, size_t capacity);
struct results {
toppartitions_data_listener::top_k read;
toppartitions_data_listener::top_k write;
results(size_t capacity) : read(capacity), write(capacity) {}
};
std::chrono::milliseconds duration() const { return _duration; }
size_t list_size() const { return _list_size; }
size_t capacity() const { return _capacity; }
future<> scatter();
future<results> gather(unsigned results_size = 256);
};
} // namespace db

View File

@@ -1631,7 +1631,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
db.add_keyspace(ks_name, std::move(_ks));
}
auto& ks = db.find_keyspace(ks_name);
auto cfg = ks.make_column_family_config(*table, db.get_config(), db.get_large_partition_handler());
auto cfg = ks.make_column_family_config(*table, db);
if (maybe_write_in_user_memory(table, db)) {
cfg.dirty_memory_manager = &db._dirty_memory_manager;
} else {

75
scripts/nodetool-toppartitions Executable file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
#
# Copyright (C) 2018 ScyllaDB
#
#
# This file is part of Scylla.
#
# Scylla is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scylla is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
import os
import json
from functools import reduce
import requests
def scylla_api_get(item, params={}):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
response = requests.get('http://127.0.0.1:10000/{}'.format(item), headers=headers, params=params)
return response
def toppartitions(kscf, duration, list_size, capacity):
r = scylla_api_get('column_family/toppartitions/{}'.format(kscf), {
'duration': str(duration),
'list_size': str(list_size),
'capacity': str(capacity)})
return json.loads(r.text)
def print_pks(title, map):
print(title)
w = reduce(lambda n, r: max(len(r['partition']), n), map, len('Partition')+3)
print((" %-{}s%s".format(w)) % ('Partition', 'Count'))
for r in map:
print((" %-{}s%s".format(w)) % (r['partition'], r['count']))
print()
ap = argparse.ArgumentParser(description='Samples database reads and writes and reports the most active partitions in a specified table')
ap.add_argument('-k', type=int,
default=10, dest='list_size',
help='The number of the top partitions to list (default: 10)')
ap.add_argument('-s', type=int,
default=256, dest='capacity',
help='The capacity of stream summary (default: 256)')
ap.add_argument('keyspace',
help='Name of keyspace')
ap.add_argument('table',
help='Name of column family')
ap.add_argument('duration', type=int,
help='Query duration in milliseconds')
args = ap.parse_args()
res = toppartitions("{}:{}".format(args.keyspace, args.table), args.duration, args.list_size, args.capacity)
if res == {}:
print("(nothing reported)")
exit(1)
print_pks("READ", res["read"])
print_pks("WRITE", res["write"])
exit(0)

View File

@@ -123,6 +123,7 @@ boost_tests = [
'top_k_test',
'utf8_test',
'small_vector_test',
'data_listeners_test',
]
other_tests = [

129
tests/data_listeners_test.cc Executable file
View File

@@ -0,0 +1,129 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/test/unit_test.hpp>
#include "tests/test-utils.hh"
#include "tests/cql_test_env.hh"
#include "tests/cql_assertions.hh"
#include "cql3/query_processor.hh"
#include "db/data_listeners.hh"
using namespace std;
using namespace std::chrono_literals;
logging::logger testlog("test");
class table_listener : public db::data_listener {
sstring _cf_name;
public:
table_listener(sstring cf_name) : _cf_name(cf_name) {}
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) {
if (s->cf_name() == _cf_name) {
return make_filtering_reader(std::move(rd), [this, &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
testlog.info("listener {}: read {}", this, dk);
++read;
return true;
});
}
return std::move(rd);
}
virtual void on_write(const schema_ptr& s, const frozen_mutation& m) override {
if (s->cf_name() == _cf_name) {
++write;
}
}
unsigned read = 0;
unsigned write = 0;
};
struct results {
unsigned read = 0;
unsigned write = 0;
};
//---------------------------------------------------------------------------------------------
results test_data_listeners(cql_test_env& e, sstring cf_name) {
testlog.info("starting test_data_listeners");
std::vector<std::unique_ptr<table_listener>> listeners;
e.db().invoke_on_all([&listeners, &cf_name] (database& db) {
auto listener = std::make_unique<table_listener>(cf_name);
db.data_listeners().install(&*listener);
testlog.info("installed listener {}", &*listener);
listeners.push_back(std::move(listener));
}).get();
e.execute_cql("CREATE TABLE t1 (k int, c int, PRIMARY KEY (k, c));").get();
e.execute_cql("INSERT INTO t1 (k, c) VALUES (1, 1);").get();
e.execute_cql("INSERT INTO t1 (k, c) VALUES (2, 2);").get();
e.execute_cql("INSERT INTO t1 (k, c) VALUES (3, 3);").get();
e.execute_cql("SELECT k, c FROM t1;").get();
auto res = e.db().map_reduce0(
[&listeners] (database& db) {
for (auto& listener: listeners) {
auto li = &*listener;
if (!db.data_listeners().exists(li)) {
continue;
}
results res{li->read, li->write};
testlog.info("uninstalled listener {}: rd={} wr={}", li, li->read, li->write);
db.data_listeners().uninstall(li);
return std::move(res);
}
return std::move(results{});
},
results{},
[] (results res, results li_res) {
res.read += li_res.read;
res.write += li_res.write;
return std::move(res);
}).get0();
testlog.info("test_data_listeners: rd={} wr={}", res.read, res.write);
return res;
}
SEASTAR_TEST_CASE(test_dlistener_t1) {
return do_with_cql_env_thread([] (auto& e) {
auto res = test_data_listeners(e, "t1");
BOOST_REQUIRE_EQUAL(3, res.read);
BOOST_REQUIRE_EQUAL(3, res.write);
});
}
SEASTAR_TEST_CASE(test_dlistener_t2) {
return do_with_cql_env_thread([] (auto& e) {
auto res = test_data_listeners(e, "t2");
BOOST_REQUIRE_EQUAL(0, res.read);
BOOST_REQUIRE_EQUAL(0, res.write);
});
}

View File

@@ -64,7 +64,7 @@ BOOST_AUTO_TEST_CASE(test_top_k_straight_insertion) {
top.append(i);
}
}
vector<unsigned> res = count(top.top(10));
BOOST_REQUIRE_EQUAL(res, exp_results());
}
@@ -87,7 +87,7 @@ BOOST_AUTO_TEST_CASE(test_top_k_interleaved_insertion) {
}
}
} while (!all_0);
auto res{count(top.top(10))};
BOOST_REQUIRE_EQUAL(res, exp_results());
}
@@ -124,15 +124,17 @@ BOOST_AUTO_TEST_CASE(test_top_k_single_value) {
struct bad_boy {
unsigned n;
bad_boy(unsigned n) : n(n) {}
bad_boy(const bad_boy& bb) {
static unsigned x = 0;
if (++x == 100) {
bad_boy(const bad_boy& bb) noexcept = default;
bool operator==(const bad_boy& x) const {
static unsigned zz = 0;
if (++zz == 100) {
throw "explode";
}
return n == x.n;
}
bool operator==(const bad_boy& x) const { return n == x.n; }
operator unsigned() const { return n; }
};
@@ -143,7 +145,7 @@ struct hash<bad_boy>
size_t operator()(const bad_boy& x) const { return hash<unsigned>()(x.n); }
};
}
BOOST_AUTO_TEST_CASE(test_top_k_fail) {
utils::space_saving_top_k<bad_boy> top(32);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2011 Clearspring Technologies, Inc.
* Copyright (C) 2011 Clearspring Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,19 +40,19 @@
/*
Based on the following implementation ([2]) for the Space-Saving algorithm from [1].
[1] Metwally, A., Agrawal, D., & El Abbadi, A. (2005, January).
Efficient computation of frequent and top-k elements in data streams.
[1] Metwally, A., Agrawal, D., & El Abbadi, A. (2005, January).
Efficient computation of frequent and top-k elements in data streams.
In International Conference on Database Theory (pp. 398-412). Springer, Berlin, Heidelberg.
http://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf
[2] https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/StreamSummary.java
The algorithm keeps a map between keys seen and their counts, keeping a bound on the number of tracked keys.
Replacement policy evicts the key with the lowest count while inheriting its count, and recording an estimation
of the error which results from that.
This error estimation can be later used to prove if the distribution we arrived at corresponds to the real top-K,
The algorithm keeps a map between keys seen and their counts, keeping a bound on the number of tracked keys.
Replacement policy evicts the key with the lowest count while inheriting its count, and recording an estimation
of the error which results from that.
This error estimation can be later used to prove if the distribution we arrived at corresponds to the real top-K,
which we can display alongside the results.
Accuracy depends on the number of tracked keys.
Accuracy depends on the number of tracked keys.
*/
@@ -65,12 +65,13 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include "utils/chunked_vector.hh"
namespace utils {
using namespace seastar;
template <class T>
template <class T, class Hash = std::hash<T>, class KeyEqual = std::equal_to<T>>
class space_saving_top_k {
private:
struct bucket;
@@ -81,18 +82,18 @@ private:
T item;
unsigned count = 0;
unsigned error = 0;
counter(T item) : item(item) {}
counter(T item, unsigned count = 0, unsigned error = 0) : item(item), count(count), error(error) {}
};
using counter_ptr = lw_shared_ptr<counter>;
using counters = std::list<counter_ptr>;
using counters_iterator = typename counters::iterator;
using counters_map = std::unordered_map<T, counters_iterator>;
using counters_map = std::unordered_map<T, counters_iterator, Hash, KeyEqual>;
using counters_map_iterator = typename counters_map::iterator;
struct bucket {
std::list<counter_ptr> counters;
unsigned count;
@@ -102,14 +103,14 @@ private:
counters.push_back(ctr);
}
bucket(T item, unsigned count) {
counters.push_back(make_lw_shared<counter>(item));
bucket(T item, unsigned count, unsigned error) {
counters.push_back(make_lw_shared<counter>(item, count, error));
this->count = count;
}
};
using buckets = std::list<bucket>;
size_t _capacity;
counters_map _counters_map;
buckets _buckets; // buckets list in ascending order
@@ -121,7 +122,7 @@ public:
size_t capacity() const { return _capacity; }
size_t size() const {
size_t size() const {
if (!_valid) {
throw std::runtime_error("space_saving_top_k state is invalid");
}
@@ -131,13 +132,17 @@ public:
bool valid() const { return _valid; }
// returns true if item is a new one
bool append(T item, unsigned inc = 1) { return std::get<0>(append_return_all(std::move(item), inc)); }
bool append(T item, unsigned inc = 1, unsigned err = 0) {
return std::get<0>(append_return_all(std::move(item), inc, err));
}
// returns optionally dropped item (due to capacity overflow)
std::optional<T> append_return_dropped(T item, unsigned inc = 1) { return std::get<1>(append_return_all(std::move(item), inc)); }
std::optional<T> append_return_dropped(T item, unsigned inc = 1, unsigned err = 0) {
return std::get<1>(append_return_all(std::move(item), inc, err));
}
// returns whether an element is new and an optionally dropped item (due to capacity overflow)
std::tuple<bool, std::optional<T>> append_return_all(T item, unsigned inc = 1) {
std::tuple<bool, std::optional<T>> append_return_all(T item, unsigned inc = 1, unsigned err = 0) {
if (!_valid) {
return {false, std::optional<T>()};
}
@@ -148,20 +153,19 @@ public:
counters_iterator counter_it;
if (is_new_item) {
if (size() < _capacity) {
_buckets.emplace_front(bucket(item, 0)); // count set to 0, increment_counter() required to complete bucket construction
_buckets.emplace_front(bucket(std::move(item), 0, err)); // inc added later via increment_counter
buckets_iterator new_bucket_it = _buckets.begin();
counter_it = new_bucket_it->counters.begin();
(*counter_it)->bucket_it = new_bucket_it;
} else {
buckets_iterator min_buck = _buckets.begin();
assert(min_buck != _buckets.end());
counter_it = min_buck->counters.begin();
assert(counter_it != min_buck->counters.end());
buckets_iterator min_bucket = _buckets.begin();
assert(min_bucket != _buckets.end());
counter_it = min_bucket->counters.begin();
assert(counter_it != min_bucket->counters.end());
counter_ptr ctr = *counter_it;
dropped_item = ctr->item;
_counters_map.erase(*dropped_item);
ctr->item = item;
ctr->error = min_buck->count;
_counters_map.erase(ctr->item);
dropped_item = std::exchange(ctr->item, std::move(item));
ctr->error = min_bucket->count + err;
}
_counters_map[item] = std::move(counter_it);
} else {
@@ -184,7 +188,7 @@ private:
buckets_iterator old_bucket_it = ctr->bucket_it;
auto& old_buck = *old_bucket_it;
old_buck.counters.erase(counter_it);
ctr->count += inc;
buckets_iterator bi_prev = old_bucket_it;
@@ -205,7 +209,7 @@ private:
if (bi_next == _buckets.end()) {
bucket buck{ctr};
counter_it = std::prev(buck.counters.end());
counter_it = buck.counters.begin();
bi_next = _buckets.insert(std::next(bi_prev), std::move(buck));
}
ctr->bucket_it = bi_next;
@@ -225,9 +229,9 @@ public:
unsigned error;
};
using results = std::list<result>;
results top(unsigned k) const
using results = chunked_vector<result>;
results top(unsigned k) const
{
if (!_valid) {
throw std::runtime_error("space_saving_top_k state is invalid");
@@ -247,16 +251,22 @@ public:
return list;
}
void append(const results& res) {
for (auto& r: res) {
append(r.item, r.count, r.error);
}
}
//-----------------------------------------------------------------------------------------
// Diagnostics
public:
template <class TT>
template <class TT>
friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k<TT>::counter& c);
template <class TT>
template <class TT>
friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k<TT>::counters_map& counters_map);
template <class TT>
template <class TT>
friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k<TT>::buckets& buckets);
template <class TT>
template <class TT>
friend std::ostream& operator<<(std::ostream& out, const space_saving_top_k<TT>& top_k);
};
@@ -273,7 +283,7 @@ std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k<T>
out << "{\n";
for (auto const& [item, counter_i]: counters_map) {
out << item << " => " << **counter_i << "\n";
}
}
out << "}\n";
return out;
}