system_keyspace: Added infrastructure for table `system.clients'

I used the following as a reference:
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/virtual/ClientsTable.java
At this moment there is only info about IP, clients outgoing port,
client 'type' (i.e. CQL/thrift/alternator), shard ID and username.
Column `request_count' is NOT present and CK consists of
(`port', `client_type'), contrary to what C*'s has: (`port').

Code that notifies `system.clients` about new connections goes
to top-level files `connection_notifier.*`. Currently only CQL
clients are observed, but enum `client_type` can be used in future
to notify about connections with other protocols.
This commit is contained in:
Juliusz Stasiewicz
2019-11-20 17:42:11 +01:00
parent 9ec98324ed
commit 7fdc8563bf
8 changed files with 208 additions and 1 deletions

View File

@@ -476,6 +476,7 @@ scylla_core = (['database.cc',
'table.cc',
'atomic_cell.cc',
'collection_mutation.cc',
'connection_notifier.cc',
'hashers.cc',
'schema.cc',
'frozen_schema.cc',

71
connection_notifier.cc Normal file
View File

@@ -0,0 +1,71 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "connection_notifier.hh"
#include "db/query_context.hh"
#include "cql3/constants.hh"
#include "database.hh"
#include "service/storage_proxy.hh"
#include <stdexcept>
namespace db::system_keyspace {
extern const char *const CLIENTS;
}
static sstring to_string(client_type ct) {
switch (ct) {
case client_type::cql: return "cql";
case client_type::thrift: return "thrift";
case client_type::alternator: return "alternator";
default: throw std::runtime_error("Invalid client_type");
}
}
future<> notify_new_client(client_data cd) {
// FIXME: consider prepared statement
const static sstring req
= format("INSERT INTO system.{} (address, port, client_type, shard_id, protocol_version, username) "
"VALUES (?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
return db::execute_cql(req,
std::move(cd.ip), cd.port, to_string(cd.ct), cd.shard_id,
cd.protocol_version.has_value() ? data_value(*cd.protocol_version) : data_value::make_null(int32_type),
cd.username.value_or("anonymous")).discard_result();
}
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port) {
// FIXME: consider prepared statement
const static sstring req
= format("DELETE FROM system.{} where address=? AND port=? AND client_type=?;",
db::system_keyspace::CLIENTS);
return db::execute_cql(req, addr.addr(), port, to_string(ct)).discard_result();
}
future<> clear_clientlist() {
auto& db_local = service::get_storage_proxy().local().get_db().local();
return db_local.truncate(
db_local.find_keyspace(db::system_keyspace_name()),
db_local.find_column_family(db::system_keyspace_name(),
db::system_keyspace::CLIENTS),
[] { return make_ready_future<db_clock::time_point>(db_clock::now()); },
false /* with_snapshot */);
}

57
connection_notifier.hh Normal file
View File

@@ -0,0 +1,57 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "gms/inet_address.hh"
#include <seastar/core/sstring.hh>
#include <optional>
enum class client_type {
cql = 0,
thrift,
alternator,
};
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
struct client_data {
gms::inet_address ip;
int32_t port;
client_type ct;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
// `optional' column means that it's nullable (possibly because it's
// unimplemented yet). If you want to fill ("implement") any of them,
// remember to update the query in `notify_new_client()'.
std::optional<sstring> connection_stage;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
std::optional<bool> ssl_enabled;
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
};
future<> notify_new_client(client_data cd);
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port);
future<> clear_clientlist();

View File

@@ -553,6 +553,44 @@ static schema_ptr large_cells() {
return scylla_local;
}
/** Layout based on C*-4.0.0 with extra columns `shard_id' and `client_type'
* but without `request_count'. Also CK is different: C* has only (`port'). */
static schema_ptr clients() {
thread_local auto clients = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(NAME, CLIENTS), NAME, CLIENTS,
// partition key
{{"address", inet_addr_type}},
// clustering key
{{"port", int32_type}, {"client_type", utf8_type}},
// regular columns
{
{"shard_id", int32_type},
{"connection_stage", utf8_type},
{"driver_name", utf8_type},
{"driver_version", utf8_type},
{"hostname", utf8_type},
{"protocol_version", int32_type},
{"ssl_cipher_suite", utf8_type},
{"ssl_enabled", boolean_type},
{"ssl_protocol", utf8_type},
{"username", utf8_type}
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"list of connected clients"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::no);
}();
return clients;
}
const char *const CLIENTS = "clients";
namespace v3 {
schema_ptr batches() {
@@ -1719,7 +1757,7 @@ std::vector<schema_ptr> all_tables() {
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
peers(), peer_events(), range_xfers(),
compactions_in_progress(), compaction_history(),
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
sstable_activity(), clients(), size_estimates(), large_partitions(), large_rows(), large_cells(),
scylla_local(), v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),

View File

@@ -93,6 +93,7 @@ static constexpr auto LARGE_PARTITIONS = "large_partitions";
static constexpr auto LARGE_ROWS = "large_rows";
static constexpr auto LARGE_CELLS = "large_cells";
static constexpr auto SCYLLA_LOCAL = "scylla_local";
extern const char *const CLIENTS;
namespace v3 {
static constexpr auto BATCHES = "batches";

View File

@@ -167,6 +167,10 @@ public:
gms::inet_address get_client_address() const {
return gms::inet_address(_remote_address);
}
::in_port_t get_client_port() const {
return _remote_address.port();
}
client_state(internal_tag)
: _keyspace("system")

View File

@@ -45,6 +45,7 @@
#include "service/query_state.hh"
#include "service/client_state.hh"
#include "exceptions/exceptions.hh"
#include "connection_notifier.hh"
#include "auth/authenticator.hh"
@@ -254,6 +255,21 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
});
}
future<>
cql_server::advertise_new_connection(shared_ptr<connection> conn) {
client_data cd = conn->make_client_data();
clogger.trace("Advertising new connection from CQL client {}:{}", cd.ip, cd.port);
return notify_new_client(std::move(cd));
}
future<>
cql_server::unadvertise_connection(shared_ptr<connection> conn) {
const auto ip = conn->get_client_state().get_client_address().addr();
const auto port = conn->get_client_state().get_client_port();
clogger.trace("Advertising disconnection of CQL client {}:{}", ip, port);
return notify_disconnected_client(ip, client_type::cql, port);
}
unsigned
cql_server::connection::frame_size() const {
if (_version < 3) {
@@ -515,6 +531,19 @@ future<> cql_server::connection::shutdown()
return make_ready_future<>();
}
client_data cql_server::connection::make_client_data() const {
client_data cd;
cd.ip = _client_state.get_client_address().addr();
cd.port = _client_state.get_client_port();
cd.ct = client_type::cql;
cd.shard_id = engine().cpu_id();
cd.protocol_version = _version;
if (const auto user_ptr = _client_state.user(); user_ptr) {
cd.username = user_ptr->name;
}
return cd;
}
thread_local cql_server::connection::execution_stage_type
cql_server::connection::_process_request_stage{"transport", &connection::process_request_one};

View File

@@ -47,6 +47,7 @@ class registrations;
}
class database;
struct client_data;
namespace cql_transport {
@@ -181,6 +182,8 @@ private:
future<> process();
future<> process_request();
future<> shutdown();
client_data make_client_data() const;
const service::client_state& get_client_state() const { return _client_state; }
private:
const ::timeout_config& timeout_config() const { return _server.timeout_config(); }
friend class process_request_executor;
@@ -237,6 +240,9 @@ private:
uint64_t _current_connections = 0;
uint64_t _connections_being_accepted = 0;
private:
future<> advertise_new_connection(shared_ptr<connection> conn);
future<> unadvertise_connection(shared_ptr<connection> conn);
void maybe_idle() {
if (_stopping && !_connections_being_accepted && !_current_connections) {
_all_connections_stopped.set_value();