query_processor: Add query-count metrics
... with labels for each consistency level. Fixes https://github.com/scylladb/scylla/issues/4309 ("add counters breaking up cql requests based on consistency_level"). Tests: unit (dev) Signed-off-by: Dejan Mircevski <dejan@scylladb.com> Message-Id: <1554127055-17705-1-git-send-email-dejan@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
be6905da84
commit
a66a5d423a
@@ -103,14 +103,23 @@ query_processor::query_processor(service::storage_proxy& proxy, database& db, qu
|
||||
std::chrono::milliseconds(_db.get_config().permissions_update_interval_in_ms()),
|
||||
mcfg.authorized_prepared_cache_size, authorized_prepared_statements_cache_log) {
|
||||
namespace sm = seastar::metrics;
|
||||
using clevel = db::consistency_level;
|
||||
sm::label cl_label("consistency_level");
|
||||
|
||||
_metrics.add_group(
|
||||
"query_processor",
|
||||
{
|
||||
sm::make_derive(
|
||||
"statements_prepared",
|
||||
_stats.prepare_invocations,
|
||||
sm::description("Counts a total number of parsed CQL requests."))});
|
||||
std::vector<sm::metric_definition> qp_group;
|
||||
qp_group.push_back(sm::make_derive(
|
||||
"statements_prepared",
|
||||
_stats.prepare_invocations,
|
||||
sm::description("Counts a total number of parsed CQL requests.")));
|
||||
for (auto cl = size_t(clevel::MIN_VALUE); cl <= size_t(clevel::MAX_VALUE); ++cl) {
|
||||
qp_group.push_back(
|
||||
sm::make_derive(
|
||||
"queries",
|
||||
_stats.queries_by_cl[cl],
|
||||
sm::description("Counts queries by consistency level."),
|
||||
{cl_label(clevel(cl))}));
|
||||
}
|
||||
_metrics.add_group("query_processor", qp_group);
|
||||
|
||||
_metrics.add_group(
|
||||
"cql",
|
||||
@@ -325,6 +334,8 @@ future<::shared_ptr<result_message>>
|
||||
query_processor::process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options) {
|
||||
auto& client_state = query_state.get_client_state();
|
||||
|
||||
++_stats.queries_by_cl[size_t(options.get_consistency())];
|
||||
|
||||
statement->validate(_proxy, client_state);
|
||||
|
||||
auto fut = statement->execute(_proxy, query_state, options);
|
||||
@@ -670,6 +681,7 @@ query_processor::process_batch(
|
||||
}).then([this, &query_state, &options, batch] {
|
||||
batch->validate();
|
||||
batch->validate(_proxy, query_state.get_client_state());
|
||||
_stats.queries_by_cl[size_t(options.get_consistency())] += batch->get_statements().size();
|
||||
return batch->execute(_proxy, query_state, options);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -112,6 +112,7 @@ private:
|
||||
|
||||
struct stats {
|
||||
uint64_t prepare_invocations = 0;
|
||||
uint64_t queries_by_cl[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
|
||||
} _stats;
|
||||
|
||||
cql_stats _cql_stats;
|
||||
|
||||
@@ -45,8 +45,11 @@
|
||||
|
||||
namespace db {
|
||||
|
||||
/// CQL consistency levels.
|
||||
///
|
||||
/// Values are guaranteed to be dense and in the tight range [MIN_VALUE, MAX_VALUE].
|
||||
enum class consistency_level {
|
||||
ANY,
|
||||
ANY, MIN_VALUE = ANY,
|
||||
ONE,
|
||||
TWO,
|
||||
THREE,
|
||||
@@ -56,7 +59,7 @@ enum class consistency_level {
|
||||
EACH_QUORUM,
|
||||
SERIAL,
|
||||
LOCAL_SERIAL,
|
||||
LOCAL_ONE
|
||||
LOCAL_ONE, MAX_VALUE = LOCAL_ONE
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, consistency_level cl);
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/range/algorithm/transform.hpp>
|
||||
#include <iterator>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <sstables/sstables.hh>
|
||||
@@ -26,6 +28,7 @@
|
||||
#include "cql_test_env.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include "utils/UUID_gen.hh"
|
||||
@@ -172,7 +175,8 @@ public:
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared(
|
||||
cql3::prepared_cache_key_type id,
|
||||
std::vector<cql3::raw_value> values) override
|
||||
std::vector<cql3::raw_value> values,
|
||||
db::consistency_level cl) override
|
||||
{
|
||||
auto prepared = local_qp().get_prepared(id);
|
||||
if (!prepared) {
|
||||
@@ -181,7 +185,7 @@ public:
|
||||
auto stmt = prepared->statement;
|
||||
assert(stmt->get_bound_terms() == values.size());
|
||||
|
||||
auto options = ::make_shared<cql3::query_options>(std::move(values));
|
||||
auto options = ::make_shared<cql3::query_options>(cl, infinite_timeout_config, std::move(values));
|
||||
options->prepare(prepared->bound_names);
|
||||
|
||||
auto qs = make_query_state();
|
||||
@@ -483,6 +487,31 @@ public:
|
||||
func(env).get();
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
|
||||
const std::vector<sstring_view>& queries, std::unique_ptr<cql3::query_options> qo) override {
|
||||
using cql3::statements::batch_statement;
|
||||
using cql3::statements::modification_statement;
|
||||
std::vector<batch_statement::single_statement> modifications;
|
||||
boost::transform(queries, back_inserter(modifications), [this](const auto& query) {
|
||||
auto stmt = local_qp().get_statement(query, _core_local.local().client_state);
|
||||
if (!dynamic_cast<modification_statement*>(stmt->statement.get())) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
|
||||
}
|
||||
return batch_statement::single_statement(static_pointer_cast<modification_statement>(stmt->statement));
|
||||
});
|
||||
auto batch = ::make_shared<batch_statement>(
|
||||
batch_statement::type::UNLOGGED,
|
||||
std::move(modifications),
|
||||
cql3::attributes::none(),
|
||||
local_qp().get_cql_stats());
|
||||
auto qs = make_query_state();
|
||||
auto& lqo = *qo;
|
||||
return local_qp().process_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo), this] {
|
||||
_core_local.local().client_state.merge(qs->get_client_state());
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const char* single_node_cql_env::ks_name = "ks";
|
||||
|
||||
@@ -81,10 +81,16 @@ public:
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(
|
||||
const sstring& text, std::unique_ptr<cql3::query_options> qo) = 0;
|
||||
|
||||
/// Processes queries (which must be modifying queries) as a batch.
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
|
||||
const std::vector<sstring_view>& queries, std::unique_ptr<cql3::query_options> qo) = 0;
|
||||
|
||||
virtual future<cql3::prepared_cache_key_type> prepare(sstring query) = 0;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared(
|
||||
cql3::prepared_cache_key_type id, std::vector<cql3::raw_value> values) = 0;
|
||||
cql3::prepared_cache_key_type id,
|
||||
std::vector<cql3::raw_value> values,
|
||||
db::consistency_level cl = db::consistency_level::ONE) = 0;
|
||||
|
||||
virtual future<> create_table(std::function<schema(const sstring&)> schema_maker) = 0;
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <iterator>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
@@ -31,6 +32,7 @@
|
||||
#include "tests/cql_assertions.hh"
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/metrics_api.hh>
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
@@ -173,3 +175,121 @@ SEASTAR_TEST_CASE(test_querying_with_consumer) {
|
||||
BOOST_CHECK_EQUAL(counter, 1010);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
using clevel = db::consistency_level;
|
||||
using seastar::metrics::impl::value_vector;
|
||||
|
||||
constexpr auto level_count = size_t(clevel::MAX_VALUE) - size_t(clevel::MIN_VALUE) + 1;
|
||||
|
||||
/// Retrieves query processor's query metrics as a map from each label to its value.
|
||||
std::unordered_map<sstring, uint64_t> get_query_metrics() {
|
||||
auto all_metrics = seastar::metrics::impl::get_values();
|
||||
const auto& all_metadata = *all_metrics->metadata;
|
||||
const auto qp_group = find_if(cbegin(all_metadata), cend(all_metadata),
|
||||
[](const auto& x) { return x.mf.name == "query_processor_queries"; });
|
||||
BOOST_REQUIRE(qp_group != cend(all_metadata));
|
||||
const auto values = all_metrics->values[distance(cbegin(all_metadata), qp_group)];
|
||||
std::vector<sstring> labels;
|
||||
for (const auto& metric : qp_group->metrics) {
|
||||
const auto found = metric.id.labels().find("consistency_level");
|
||||
BOOST_REQUIRE(found != metric.id.labels().cend());
|
||||
labels.push_back(found->second);
|
||||
}
|
||||
BOOST_REQUIRE(values.size() == level_count);
|
||||
BOOST_REQUIRE(labels.size() == level_count);
|
||||
std::unordered_map<sstring, uint64_t> label_to_value;
|
||||
for (size_t i = 0; i < labels.size(); ++i) {
|
||||
label_to_value[labels[i]] = values[i].ui();
|
||||
}
|
||||
return label_to_value;
|
||||
}
|
||||
|
||||
/// Creates query_options with cl, infinite timeout, and no named values.
|
||||
auto make_options(clevel cl) {
|
||||
return std::make_unique<cql3::query_options>(
|
||||
cl, infinite_timeout_config, std::vector<cql3::raw_value>());
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_counters) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
// Executes a query and waits for it to complete.
|
||||
auto process_query = [&e](const sstring& query, clevel cl) mutable {
|
||||
e.execute_cql(query, make_options(cl)).get();
|
||||
};
|
||||
|
||||
// Executes a prepared statement and waits for it to complete.
|
||||
auto process_prepared = [&e](const sstring& query, clevel cl) mutable {
|
||||
e.prepare(query).then([&e, cl](const auto& id) {
|
||||
return e.execute_prepared(id, {}, cl);})
|
||||
.get();
|
||||
};
|
||||
|
||||
// Executes a batch of (modifying) statements and waits for it to complete.
|
||||
auto process_batch = [&e](const std::vector<sstring_view>& queries, clevel cl) mutable {
|
||||
e.execute_batch(queries, make_options(cl)).get();
|
||||
};
|
||||
|
||||
auto expected = get_query_metrics();
|
||||
|
||||
process_query("create table ks.cf (k text, v int, primary key (k))", clevel::ANY);
|
||||
++expected["ANY"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::QUORUM);
|
||||
++expected["QUORUM"];
|
||||
process_query("select * from ks.cf", clevel::QUORUM);
|
||||
++expected["QUORUM"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::ONE);
|
||||
++expected["ONE"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::ALL);
|
||||
++expected["ALL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
process_prepared("select * from ks.cf", clevel::ALL);
|
||||
++expected["ALL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::LOCAL_QUORUM);
|
||||
++expected["LOCAL_QUORUM"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_prepared("insert into ks.cf (k, v) values ('0', 0)", clevel::EACH_QUORUM);
|
||||
++expected["EACH_QUORUM"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::SERIAL);
|
||||
++expected["SERIAL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
process_prepared("select * from ks.cf", clevel::SERIAL);
|
||||
++expected["SERIAL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
process_query("select * from ks.cf", clevel::SERIAL);
|
||||
++expected["SERIAL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_query("select * from ks.cf", clevel::LOCAL_SERIAL);
|
||||
++expected["LOCAL_SERIAL"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_prepared("select * from ks.cf", clevel::LOCAL_ONE);
|
||||
++expected["LOCAL_ONE"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_batch({"insert into ks.cf (k, v) values ('1', 1)"}, clevel::EACH_QUORUM);
|
||||
++expected["EACH_QUORUM"];
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
|
||||
process_batch(
|
||||
{"insert into ks.cf (k, v) values ('2', 2)", "insert into ks.cf (k, v) values ('3', 3)"},
|
||||
clevel::ANY);
|
||||
expected["ANY"] += 2;
|
||||
BOOST_CHECK_EQUAL(expected, get_query_metrics());
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user