audit: Add the audit subsystem
This change introduces a new audit subsystem that allows tracking and logging of database operations for security and compliance purposes. Key features include: - Configurable audit logging to either syslog or a dedicated system table (audit.audit_log) - Selective auditing based on: - Operation categories (QUERY, DML, DDL, DCL, AUTH, ADMIN) - Specific keyspaces - Specific tables - New configuration options: - audit: Controls audit destination (none/syslog/table) - audit_categories: Comma-separated list of operation categories to audit - audit_tables: Specific tables to audit - audit_keyspaces: Specific keyspaces to audit - audit_unix_socket_path: Path for syslog socket - audit_syslog_write_buffer_size: Buffer size for syslog writes The audit logs capture details including: - Operation timestamp - Node and client IP addresses - Operation category and query - Username - Success/failure status - Affected keyspace and table names
This commit is contained in:
@@ -261,6 +261,7 @@ add_custom_target(compiler-training)
|
||||
|
||||
add_subdirectory(api)
|
||||
add_subdirectory(alternator)
|
||||
add_subdirectory(audit)
|
||||
add_subdirectory(db)
|
||||
add_subdirectory(auth)
|
||||
add_subdirectory(cdc)
|
||||
@@ -300,6 +301,7 @@ add_version_library(scylla_version
|
||||
add_executable(scylla
|
||||
main.cc)
|
||||
set(scylla_libs
|
||||
audit
|
||||
scylla-main
|
||||
api
|
||||
auth
|
||||
|
||||
19
audit/CMakeLists.txt
Normal file
19
audit/CMakeLists.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
include(add_whole_archive)
|
||||
|
||||
add_library(scylla_audit STATIC)
|
||||
target_sources(scylla_audit
|
||||
PRIVATE
|
||||
audit.cc
|
||||
audit_cf_storage_helper.cc
|
||||
audit_syslog_storage_helper.cc)
|
||||
target_include_directories(scylla_audit
|
||||
PUBLIC
|
||||
${CMAKE_SOURCE_DIR})
|
||||
target_link_libraries(scylla_audit
|
||||
PUBLIC
|
||||
Seastar::seastar
|
||||
xxHash::xxhash
|
||||
PRIVATE
|
||||
cql3)
|
||||
|
||||
add_whole_archive(audit scylla_audit)
|
||||
263
audit/audit.cc
Normal file
263
audit/audit.cc
Normal file
@@ -0,0 +1,263 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include "audit/audit.hh"
|
||||
#include "db/config.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "storage_helper.hh"
|
||||
#include "audit.hh"
|
||||
#include "../db/config.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
|
||||
|
||||
namespace audit {
|
||||
|
||||
logging::logger logger("audit");
|
||||
|
||||
sstring audit_info::category_string() const {
|
||||
switch (_category) {
|
||||
case statement_category::QUERY: return "QUERY";
|
||||
case statement_category::DML: return "DML";
|
||||
case statement_category::DDL: return "DDL";
|
||||
case statement_category::DCL: return "DCL";
|
||||
case statement_category::AUTH: return "AUTH";
|
||||
case statement_category::ADMIN: return "ADMIN";
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
audit::audit(locator::shared_token_metadata& token_metadata,
|
||||
sstring&& storage_helper_name,
|
||||
std::set<sstring>&& audited_keyspaces,
|
||||
std::map<sstring, std::set<sstring>>&& audited_tables,
|
||||
category_set&& audited_categories)
|
||||
: _token_metadata(token_metadata)
|
||||
, _audited_keyspaces(std::move(audited_keyspaces))
|
||||
, _audited_tables(std::move(audited_tables))
|
||||
, _audited_categories(std::move(audited_categories))
|
||||
, _storage_helper_class_name(std::move(storage_helper_name))
|
||||
{ }
|
||||
|
||||
audit::~audit() = default;
|
||||
|
||||
static category_set parse_audit_categories(const sstring& data) {
|
||||
category_set result;
|
||||
if (!data.empty()) {
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, data, boost::is_any_of(","));
|
||||
for (sstring& category : tokens) {
|
||||
boost::trim(category);
|
||||
if (category == "QUERY") {
|
||||
result.set(statement_category::QUERY);
|
||||
} else if (category == "DML") {
|
||||
result.set(statement_category::DML);
|
||||
} else if (category == "DDL") {
|
||||
result.set(statement_category::DDL);
|
||||
} else if (category == "DCL") {
|
||||
result.set(statement_category::DCL);
|
||||
} else if (category == "AUTH") {
|
||||
result.set(statement_category::AUTH);
|
||||
} else if (category == "ADMIN") {
|
||||
result.set(statement_category::ADMIN);
|
||||
} else {
|
||||
throw audit_exception(fmt::format("Bad configuration: invalid 'audit_categories': {}", data));
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::map<sstring, std::set<sstring>> parse_audit_tables(const sstring& data) {
|
||||
std::map<sstring, std::set<sstring>> result;
|
||||
if (!data.empty()) {
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, data, boost::is_any_of(","));
|
||||
for (sstring& token : tokens) {
|
||||
std::vector<sstring> parts;
|
||||
boost::split(parts, token, boost::is_any_of("."));
|
||||
if (parts.size() != 2) {
|
||||
throw audit_exception(fmt::format("Bad configuration: invalid 'audit_tables': {}", data));
|
||||
}
|
||||
boost::trim(parts[0]);
|
||||
boost::trim(parts[1]);
|
||||
result[parts[0]].insert(std::move(parts[1]));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
|
||||
std::set<sstring> result;
|
||||
if (!data.empty()) {
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, data, boost::is_any_of(","));
|
||||
for (sstring& token : tokens) {
|
||||
boost::trim(token);
|
||||
result.insert(std::move(token));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm) {
|
||||
sstring storage_helper_name;
|
||||
if (cfg.audit() == "table") {
|
||||
storage_helper_name = "audit_cf_storage_helper";
|
||||
} else if (cfg.audit() == "syslog") {
|
||||
storage_helper_name = "audit_syslog_storage_helper";
|
||||
} else if (cfg.audit() == "none") {
|
||||
// Audit is off
|
||||
logger.info("Audit is disabled");
|
||||
|
||||
return make_ready_future<>();
|
||||
} else {
|
||||
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit()));
|
||||
}
|
||||
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
|
||||
if (!audited_categories) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
|
||||
std::set<sstring> audited_keyspaces = parse_audit_keyspaces(cfg.audit_keyspaces());
|
||||
if (audited_tables.empty()
|
||||
&& audited_keyspaces.empty()
|
||||
&& !audited_categories.contains(statement_category::AUTH)
|
||||
&& !audited_categories.contains(statement_category::ADMIN)
|
||||
&& !audited_categories.contains(statement_category::DCL)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
logger.info("Audit is enabled. Auditing to: \"{}\", with the following categories: \"{}\", keyspaces: \"{}\", and tables: \"{}\"",
|
||||
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
|
||||
|
||||
return audit_instance().start(std::ref(stm),
|
||||
std::move(storage_helper_name),
|
||||
std::move(audited_keyspaces),
|
||||
std::move(audited_tables),
|
||||
std::move(audited_categories));
|
||||
}
|
||||
|
||||
future<> audit::start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) {
|
||||
return local_audit.start(cfg, qp.local(), mm.local());
|
||||
});
|
||||
}
|
||||
|
||||
future<> audit::stop_audit() {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
|
||||
return local_audit.shutdown();
|
||||
}).then([] {
|
||||
return audit::audit::audit_instance().stop();
|
||||
});
|
||||
}
|
||||
|
||||
audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table) {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return nullptr;
|
||||
}
|
||||
return std::make_unique<audit_info>(cat, keyspace, table);
|
||||
}
|
||||
|
||||
audit_info_ptr audit::create_no_audit_info() {
|
||||
return audit_info_ptr();
|
||||
}
|
||||
|
||||
future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) {
|
||||
try {
|
||||
_storage_helper_ptr = create_object<storage_helper>(_storage_helper_class_name, qp, mm);
|
||||
} catch (no_such_class& e) {
|
||||
logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name);
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
return _storage_helper_ptr->start(cfg);
|
||||
}
|
||||
|
||||
future<> audit::stop() {
|
||||
return _storage_helper_ptr->stop();
|
||||
}
|
||||
|
||||
future<> audit::shutdown() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> audit::log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error) {
|
||||
const service::client_state& client_state = query_state.get_client_state();
|
||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||
db::consistency_level cl = options.get_consistency();
|
||||
thread_local static sstring no_username("undefined");
|
||||
static const sstring anonymous_username("anonymous");
|
||||
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
|
||||
socket_address client_ip = client_state.get_client_address().addr();
|
||||
return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, audit_info, node_ip, client_ip, cl, username, error)
|
||||
.handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) {
|
||||
logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}",
|
||||
node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(),
|
||||
audit_info->query(), client_ip, audit_info->table(),username, ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
|
||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||
return futurize_invoke(std::mem_fn(&storage_helper::write_login), _storage_helper_ptr, username, node_ip, client_ip, error)
|
||||
.handle_exception([username, node_ip, client_ip, error] (auto ep) {
|
||||
logger.error("Unexpected exception when writing login log with: node_ip {} client_ip {} username {} error {} exception {}",
|
||||
node_ip, client_ip, username, error, ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error) {
|
||||
cql3::statements::batch_statement* batch = dynamic_cast<cql3::statements::batch_statement*>(statement.get());
|
||||
if (batch != nullptr) {
|
||||
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
|
||||
return inspect(m.statement, query_state, options, error);
|
||||
});
|
||||
} else {
|
||||
auto audit_info = statement->get_audit_info();
|
||||
if (bool(audit_info) && audit::local_audit_instance().should_log(audit_info)) {
|
||||
return audit::local_audit_instance().log(audit_info, query_state, options, error);
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> inspect_login(const sstring& username, socket_address client_ip, bool error) {
|
||||
if (!audit::audit_instance().local_is_initialized() || !audit::local_audit_instance().should_log_login()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit::local_audit_instance().log_login(username, client_ip, error);
|
||||
}
|
||||
|
||||
bool audit::should_log_table(const sstring& keyspace, const sstring& name) const {
|
||||
auto keyspace_it = _audited_tables.find(keyspace);
|
||||
return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend();
|
||||
}
|
||||
|
||||
bool audit::should_log(const audit_info* audit_info) const {
|
||||
return _audited_categories.contains(audit_info->category())
|
||||
&& (_audited_keyspaces.find(audit_info->keyspace()) != _audited_keyspaces.cend()
|
||||
|| should_log_table(audit_info->keyspace(), audit_info->table())
|
||||
|| audit_info->category() == statement_category::AUTH
|
||||
|| audit_info->category() == statement_category::ADMIN
|
||||
|| audit_info->category() == statement_category::DCL);
|
||||
}
|
||||
|
||||
}
|
||||
141
audit/audit.hh
Normal file
141
audit/audit.hh
Normal file
@@ -0,0 +1,141 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
|
||||
#include "enum_set.hh"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace db {
|
||||
|
||||
class config;
|
||||
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class cql_statement;
|
||||
class query_processor;
|
||||
class query_options;
|
||||
|
||||
}
|
||||
|
||||
namespace service {
|
||||
|
||||
class migration_manager;
|
||||
class query_state;
|
||||
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
|
||||
class shared_token_metadata;
|
||||
|
||||
}
|
||||
|
||||
namespace audit {
|
||||
|
||||
extern logging::logger logger;
|
||||
|
||||
class audit_exception : public std::exception {
|
||||
sstring _what;
|
||||
public:
|
||||
explicit audit_exception(sstring&& what) : _what(std::move(what)) { }
|
||||
const char* what() const noexcept override {
|
||||
return _what.c_str();
|
||||
}
|
||||
};
|
||||
|
||||
enum class statement_category {
|
||||
QUERY, DML, DDL, DCL, AUTH, ADMIN
|
||||
};
|
||||
|
||||
using category_set = enum_set<super_enum<statement_category, statement_category::QUERY,
|
||||
statement_category::DML,
|
||||
statement_category::DDL,
|
||||
statement_category::DCL,
|
||||
statement_category::AUTH,
|
||||
statement_category::ADMIN>>;
|
||||
|
||||
class audit_info final {
|
||||
statement_category _category;
|
||||
sstring _keyspace;
|
||||
sstring _table;
|
||||
sstring _query;
|
||||
public:
|
||||
audit_info(statement_category cat, sstring keyspace, sstring table)
|
||||
: _category(cat)
|
||||
, _keyspace(std::move(keyspace))
|
||||
, _table(std::move(table))
|
||||
{ }
|
||||
void set_query_string(const std::string_view& query_string) {
|
||||
_query = sstring(query_string);
|
||||
}
|
||||
const sstring& keyspace() const { return _keyspace; }
|
||||
const sstring& table() const { return _table; }
|
||||
const sstring& query() const { return _query; }
|
||||
sstring category_string() const;
|
||||
statement_category category() const { return _category; }
|
||||
};
|
||||
|
||||
using audit_info_ptr = std::unique_ptr<audit_info>;
|
||||
|
||||
class storage_helper;
|
||||
|
||||
class audit final : public seastar::async_sharded_service<audit> {
|
||||
locator::shared_token_metadata& _token_metadata;
|
||||
const std::set<sstring> _audited_keyspaces;
|
||||
// Maps keyspace name to set of table names in that keyspace
|
||||
const std::map<sstring, std::set<sstring>> _audited_tables;
|
||||
const category_set _audited_categories;
|
||||
sstring _storage_helper_class_name;
|
||||
std::unique_ptr<storage_helper> _storage_helper_ptr;
|
||||
|
||||
bool should_log_table(const sstring& keyspace, const sstring& name) const;
|
||||
public:
|
||||
static seastar::sharded<audit>& audit_instance() {
|
||||
// FIXME: leaked intentionally to avoid shutdown problems, see #293
|
||||
static seastar::sharded<audit>* audit_inst = new seastar::sharded<audit>();
|
||||
|
||||
return *audit_inst;
|
||||
}
|
||||
|
||||
static audit& local_audit_instance() {
|
||||
return audit_instance().local();
|
||||
}
|
||||
static future<> create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm);
|
||||
static future<> start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
|
||||
static future<> stop_audit();
|
||||
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
|
||||
static audit_info_ptr create_no_audit_info();
|
||||
audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name,
|
||||
std::set<sstring>&& audited_keyspaces,
|
||||
std::map<sstring, std::set<sstring>>&& audited_tables,
|
||||
category_set&& audited_categories);
|
||||
~audit();
|
||||
future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm);
|
||||
future<> stop();
|
||||
future<> shutdown();
|
||||
bool should_log(const audit_info* audit_info) const;
|
||||
bool should_log_login() const { return _audited_categories.contains(statement_category::AUTH); }
|
||||
future<> log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error);
|
||||
future<> log_login(const sstring& username, socket_address client_ip, bool error) noexcept;
|
||||
};
|
||||
|
||||
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error);
|
||||
|
||||
future<> inspect_login(const sstring& username, socket_address client_ip, bool error);
|
||||
|
||||
}
|
||||
202
audit/audit_cf_storage_helper.cc
Normal file
202
audit/audit_cf_storage_helper.cc
Normal file
@@ -0,0 +1,202 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "audit/audit_cf_storage_helper.hh"
|
||||
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "data_dictionary/keyspace_metadata.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/statements/ks_prop_defs.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
namespace audit {
|
||||
|
||||
const sstring audit_cf_storage_helper::KEYSPACE_NAME("audit");
|
||||
const sstring audit_cf_storage_helper::TABLE_NAME("audit_log");
|
||||
|
||||
audit_cf_storage_helper::audit_cf_storage_helper(cql3::query_processor& qp, service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _mm(mm)
|
||||
, _table(KEYSPACE_NAME, TABLE_NAME,
|
||||
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
|
||||
"date timestamp, "
|
||||
"node inet, "
|
||||
"event_time timeuuid, "
|
||||
"category text, "
|
||||
"consistency text, "
|
||||
"table_name text, "
|
||||
"keyspace_name text, "
|
||||
"operation text, "
|
||||
"source inet, "
|
||||
"username text, "
|
||||
"error boolean, "
|
||||
"PRIMARY KEY ((date, node), event_time))",
|
||||
KEYSPACE_NAME, TABLE_NAME),
|
||||
fmt::format("INSERT INTO {}.{} ("
|
||||
"date,"
|
||||
"node,"
|
||||
"event_time,"
|
||||
"category,"
|
||||
"consistency,"
|
||||
"table_name,"
|
||||
"keyspace_name,"
|
||||
"operation,"
|
||||
"source,"
|
||||
"username,"
|
||||
"error) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
KEYSPACE_NAME, TABLE_NAME))
|
||||
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit())
|
||||
{
|
||||
}
|
||||
|
||||
future<> audit_cf_storage_helper::migrate_audit_table(service::group0_guard group0_guard) {
|
||||
while (true) {
|
||||
auto const ks = _qp.db().try_find_keyspace(KEYSPACE_NAME);
|
||||
if (ks && ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") {
|
||||
data_dictionary::database db = _qp.db();
|
||||
cql3::statements::ks_prop_defs old_ks_prop_defs;
|
||||
auto old_ks_metadata = old_ks_prop_defs.as_ks_metadata_update(
|
||||
ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features());
|
||||
std::map<sstring, sstring> strategy_opts;
|
||||
for (const auto &dc: _qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters())
|
||||
strategy_opts[dc] = "3";
|
||||
|
||||
auto new_ks_metadata = keyspace_metadata::new_keyspace(KEYSPACE_NAME,
|
||||
"org.apache.cassandra.locator.NetworkTopologyStrategy",
|
||||
strategy_opts,
|
||||
std::nullopt, // initial_tablets
|
||||
old_ks_metadata->durable_writes(),
|
||||
old_ks_metadata->get_storage_options(),
|
||||
old_ks_metadata->tables());
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
try {
|
||||
co_await _mm.announce(
|
||||
service::prepare_keyspace_update_announcement(db.real_database(), new_ks_metadata, ts),
|
||||
std::move(group0_guard), format("audit: Alter {} keyspace", KEYSPACE_NAME));
|
||||
break;
|
||||
} catch (::service::group0_concurrent_modification &) {
|
||||
logger.info("Concurrent operation is detected while altering {} keyspace, retrying.", KEYSPACE_NAME);
|
||||
}
|
||||
group0_guard = co_await _mm.start_group0_operation();
|
||||
} else {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> audit_cf_storage_helper::start(const db::config &cfg) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (auto ks = _qp.db().try_find_keyspace(KEYSPACE_NAME);
|
||||
!ks ||
|
||||
ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") {
|
||||
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
if (ks = _qp.db().try_find_keyspace(KEYSPACE_NAME); !ks) {
|
||||
// releasing, because table_helper::setup_keyspace creates a raft guard of its own
|
||||
service::release_guard(std::move(group0_guard));
|
||||
co_return co_await table_helper::setup_keyspace(_qp, _mm, KEYSPACE_NAME,
|
||||
"org.apache.cassandra.locator.NetworkTopologyStrategy",
|
||||
"3", _dummy_query_state, {&_table});
|
||||
} else if (ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") {
|
||||
// We want to migrate the old (pre-Scylla 6.0) SimpleStrategy to a newer one.
|
||||
// The migrate_audit_table() function will do nothing if it races with another strategy change:
|
||||
// - either by another node doing the same thing in parallel,
|
||||
// - or a user manually changing the strategy of the same table.
|
||||
// Note we only check the strategy, not the replication factor.
|
||||
co_return co_await migrate_audit_table(std::move(group0_guard));
|
||||
} else {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> audit_cf_storage_helper::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> audit_cf_storage_helper::write(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) {
|
||||
return _table.insert(_qp, _mm, _dummy_query_state, make_data, audit_info, node_ip, client_ip, cl, username, error);
|
||||
}
|
||||
|
||||
future<> audit_cf_storage_helper::write_login(const sstring& username,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
bool error) {
|
||||
return _table.insert(_qp, _mm, _dummy_query_state, make_login_data, node_ip, client_ip, username, error);
|
||||
}
|
||||
|
||||
cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) {
|
||||
auto time = std::chrono::system_clock::now();
|
||||
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(time.time_since_epoch()).count();
|
||||
auto ticks_per_day = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::hours(24)).count();
|
||||
auto date = millis_since_epoch / ticks_per_day * ticks_per_day;
|
||||
thread_local static int64_t last_nanos = 0;
|
||||
auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time));
|
||||
auto consistency_level = fmt::format("{}", cl);
|
||||
std::vector<cql3::raw_value> values {
|
||||
cql3::raw_value::make_value(timestamp_type->decompose(date)),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())),
|
||||
cql3::raw_value::make_value(uuid_type->decompose(time_id)),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(audit_info->category_string())),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring(consistency_level))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(audit_info->table())),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(audit_info->keyspace())),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(audit_info->query())),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(client_ip.addr())),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(username)),
|
||||
cql3::raw_value::make_value(boolean_type->decompose(error)),
|
||||
};
|
||||
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
const sstring& username,
|
||||
bool error) {
|
||||
auto time = std::chrono::system_clock::now();
|
||||
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(time.time_since_epoch()).count();
|
||||
auto ticks_per_day = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::hours(24)).count();
|
||||
auto date = millis_since_epoch / ticks_per_day * ticks_per_day;
|
||||
thread_local static int64_t last_nanos = 0;
|
||||
auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time));
|
||||
std::vector<cql3::raw_value> values {
|
||||
cql3::raw_value::make_value(timestamp_type->decompose(date)),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())),
|
||||
cql3::raw_value::make_value(uuid_type->decompose(time_id)),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring("AUTH"))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring(""))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring(""))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring(""))),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(sstring("LOGIN"))),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(client_ip.addr())),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(username)),
|
||||
cql3::raw_value::make_value(boolean_type->decompose(error)),
|
||||
};
|
||||
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
|
||||
}
|
||||
|
||||
using registry = class_registrator<storage_helper, audit_cf_storage_helper, cql3::query_processor&, service::migration_manager&>;
|
||||
static registry registrator1("audit_cf_storage_helper");
|
||||
|
||||
}
|
||||
67
audit/audit_cf_storage_helper.hh
Normal file
67
audit/audit_cf_storage_helper.hh
Normal file
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "audit/audit.hh"
|
||||
#include "table_helper.hh"
|
||||
#include "storage_helper.hh"
|
||||
#include "db/config.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class query_processor;
|
||||
|
||||
}
|
||||
|
||||
namespace service {
|
||||
|
||||
class migration_manager;
|
||||
|
||||
}
|
||||
|
||||
namespace audit {
|
||||
|
||||
class audit_cf_storage_helper : public storage_helper {
|
||||
static const sstring KEYSPACE_NAME;
|
||||
static const sstring TABLE_NAME;
|
||||
cql3::query_processor& _qp;
|
||||
service::migration_manager& _mm;
|
||||
table_helper _table;
|
||||
service::query_state _dummy_query_state;
|
||||
static cql3::query_options make_data(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error);
|
||||
static cql3::query_options make_login_data(socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
const sstring& username,
|
||||
bool error);
|
||||
|
||||
future<> migrate_audit_table(service::group0_guard guard);
|
||||
|
||||
public:
|
||||
explicit audit_cf_storage_helper(cql3::query_processor& qp, service::migration_manager& mm);
|
||||
virtual ~audit_cf_storage_helper() {}
|
||||
virtual future<> start(const db::config& cfg) override;
|
||||
virtual future<> stop() override;
|
||||
virtual future<> write(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) override;
|
||||
virtual future<> write_login(const sstring& username,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
bool error) override;
|
||||
};
|
||||
|
||||
}
|
||||
134
audit/audit_syslog_storage_helper.cc
Normal file
134
audit/audit_syslog_storage_helper.cc
Normal file
@@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "audit/audit_syslog_storage_helper.hh"
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <string.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <syslog.h>
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/net/api.hh>
|
||||
|
||||
#include <fmt/chrono.h>
|
||||
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class query_processor;
|
||||
|
||||
}
|
||||
|
||||
namespace audit {
|
||||
|
||||
namespace {
|
||||
|
||||
future<> syslog_send_helper(net::datagram_channel& sender,
|
||||
const socket_address& address,
|
||||
const sstring& msg) {
|
||||
return sender.send(address, net::packet{msg.data(), msg.size()}).handle_exception([address](auto&& exception_ptr) {
|
||||
auto error_msg = seastar::format(
|
||||
"Syslog audit backend failed (sending a message to {} resulted in {}).",
|
||||
address,
|
||||
exception_ptr
|
||||
);
|
||||
logger.error("{}", error_msg);
|
||||
throw audit_exception(std::move(error_msg));
|
||||
});
|
||||
}
|
||||
|
||||
static auto syslog_address_helper(const db::config& cfg)
|
||||
{
|
||||
return cfg.audit_unix_socket_path.is_set()
|
||||
? unix_domain_addr(cfg.audit_unix_socket_path())
|
||||
: unix_domain_addr(_PATH_LOG);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
audit_syslog_storage_helper::audit_syslog_storage_helper(cql3::query_processor& qp, service::migration_manager&) :
|
||||
_syslog_address(syslog_address_helper(qp.db().get_config())),
|
||||
_sender(make_unbound_datagram_channel(AF_UNIX)) {
|
||||
}
|
||||
|
||||
audit_syslog_storage_helper::~audit_syslog_storage_helper() {
|
||||
}
|
||||
|
||||
/*
|
||||
* We don't use openlog and syslog directly because it's already used by logger.
|
||||
* Audit needs to use different ident so than logger but syslog.h uses a global ident
|
||||
* and it's not possible to use more than one in a program.
|
||||
*
|
||||
* To work around it we directly communicate with the socket.
|
||||
*/
|
||||
future<> audit_syslog_storage_helper::start(const db::config& cfg) {
|
||||
if (this_shard_id() != 0) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
return syslog_send_helper(_sender, _syslog_address, "Initializing syslog audit backend.");
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::stop() {
|
||||
_sender.shutdown_output();
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) {
|
||||
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
tm time;
|
||||
localtime_r(&now, &time);
|
||||
sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\"",
|
||||
LOG_NOTICE | LOG_USER,
|
||||
time,
|
||||
node_ip,
|
||||
audit_info->category_string(),
|
||||
cl,
|
||||
(error ? "true" : "false"),
|
||||
audit_info->keyspace(),
|
||||
audit_info->query(),
|
||||
client_ip,
|
||||
audit_info->table(),
|
||||
username);
|
||||
|
||||
return syslog_send_helper(_sender, _syslog_address, msg);
|
||||
}
|
||||
|
||||
future<> audit_syslog_storage_helper::write_login(const sstring& username,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
bool error) {
|
||||
|
||||
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
tm time;
|
||||
localtime_r(&now, &time);
|
||||
sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"AUTH\", \"\", \"\", \"\", \"\", \"{}\", \"{}\", \"{}\"",
|
||||
LOG_NOTICE | LOG_USER,
|
||||
time,
|
||||
node_ip,
|
||||
client_ip,
|
||||
username,
|
||||
(error ? "true" : "false"));
|
||||
|
||||
co_await syslog_send_helper(_sender, _syslog_address, msg.c_str());
|
||||
}
|
||||
|
||||
using registry = class_registrator<storage_helper, audit_syslog_storage_helper, cql3::query_processor&, service::migration_manager&>;
|
||||
static registry registrator1("audit_syslog_storage_helper");
|
||||
|
||||
}
|
||||
44
audit/audit_syslog_storage_helper.hh
Normal file
44
audit/audit_syslog_storage_helper.hh
Normal file
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/net/api.hh>
|
||||
|
||||
#include "audit/audit.hh"
|
||||
#include "storage_helper.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
class migration_manager;
|
||||
|
||||
};
|
||||
|
||||
namespace audit {
|
||||
|
||||
class audit_syslog_storage_helper : public storage_helper {
|
||||
socket_address _syslog_address;
|
||||
net::datagram_channel _sender;
|
||||
public:
|
||||
explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&);
|
||||
virtual ~audit_syslog_storage_helper();
|
||||
virtual future<> start(const db::config& cfg) override;
|
||||
virtual future<> stop() override;
|
||||
virtual future<> write(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) override;
|
||||
virtual future<> write_login(const sstring& username,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
bool error) override;
|
||||
};
|
||||
|
||||
}
|
||||
34
audit/storage_helper.hh
Normal file
34
audit/storage_helper.hh
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "audit/audit.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
namespace audit {
|
||||
|
||||
class storage_helper {
|
||||
public:
|
||||
using ptr_type = std::unique_ptr<storage_helper>;
|
||||
storage_helper() {}
|
||||
virtual ~storage_helper() {}
|
||||
virtual future<> start(const db::config& cfg) = 0;
|
||||
virtual future<> stop() = 0;
|
||||
virtual future<> write(const audit_info* audit_info,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
db::consistency_level cl,
|
||||
const sstring& username,
|
||||
bool error) = 0;
|
||||
virtual future<> write_login(const sstring& username,
|
||||
socket_address node_ip,
|
||||
socket_address client_ip,
|
||||
bool error) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -564,6 +564,28 @@ commitlog_total_space_in_mb: -1
|
||||
# it to 0.0.0.0 to listen on all interfaces.
|
||||
# prometheus_address: 1.2.3.4
|
||||
|
||||
# audit settings
|
||||
# By default, Scylla does not audit anything.
|
||||
# 'audit' config option controls if and where to output audited events:
|
||||
# - "none": auditing is disabled (default)
|
||||
# - "table": save audited events in audit.audit_log column family
|
||||
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
|
||||
# audit: "none"
|
||||
#
|
||||
# List of statement categories that should be audited.
|
||||
# audit_categories: "DCL,DDL,AUTH"
|
||||
#
|
||||
# List of tables that should be audited.
|
||||
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
||||
#
|
||||
# List of keyspaces that should be fully audited.
|
||||
# All tables in those keyspaces will be audited
|
||||
# audit_keyspaces: "<keyspace_name>,<keyspace_name>"
|
||||
#
|
||||
# Overrides the Unix socket path used to connect to syslog. If left unset, it'll
|
||||
# use the default on the build system, which is usually "/dev/log"
|
||||
# audit_unix_socket_path: "/dev/log"
|
||||
|
||||
# Distribution of data among cores (shards) within a node
|
||||
#
|
||||
# Scylla distributes data within a node among shards, using a round-robin
|
||||
|
||||
@@ -1124,6 +1124,9 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'tracing/trace_state.cc',
|
||||
'tracing/traced_file.cc',
|
||||
'table_helper.cc',
|
||||
'audit/audit.cc',
|
||||
'audit/audit_cf_storage_helper.cc',
|
||||
'audit/audit_syslog_storage_helper.cc',
|
||||
'tombstone_gc_options.cc',
|
||||
'tombstone_gc.cc',
|
||||
'utils/disk-error-handler.cc',
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "timeout_config.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "audit/audit.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -45,6 +46,7 @@ using cql_warnings_vec = std::vector<sstring>;
|
||||
|
||||
class cql_statement {
|
||||
timeout_config_selector _timeout_config_selector;
|
||||
audit::audit_info_ptr _audit_info;
|
||||
public:
|
||||
// CQL statement text
|
||||
seastar::sstring raw_cql_statement;
|
||||
@@ -55,7 +57,8 @@ public:
|
||||
}
|
||||
|
||||
explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {}
|
||||
|
||||
cql_statement(cql_statement&& o) = default;
|
||||
cql_statement(const cql_statement& o) : _timeout_config_selector(o._timeout_config_selector), _audit_info(o._audit_info ? std::make_unique<audit::audit_info>(*o._audit_info) : nullptr) { }
|
||||
virtual ~cql_statement()
|
||||
{ }
|
||||
|
||||
@@ -111,6 +114,11 @@ public:
|
||||
virtual bool is_conditional() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
audit::audit_info* get_audit_info() { return _audit_info.get(); }
|
||||
void set_audit_info(audit::audit_info_ptr&& info) { _audit_info = std::move(info); }
|
||||
|
||||
virtual void sanitize_audit_info() {}
|
||||
};
|
||||
|
||||
class cql_statement_no_metadata : public cql_statement {
|
||||
|
||||
@@ -87,6 +87,8 @@ public:
|
||||
std::unique_ptr<attributes> attrs,
|
||||
cql_stats& stats);
|
||||
|
||||
const std::vector<single_statement>& statements() const { return _statements; }
|
||||
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
@@ -28,6 +28,7 @@ class modification_statement;
|
||||
namespace raw {
|
||||
|
||||
class modification_statement : public cf_statement {
|
||||
sstring _raw_cql;
|
||||
protected:
|
||||
const std::unique_ptr<attributes::raw> _attrs;
|
||||
const std::optional<expr::expression> _conditions;
|
||||
@@ -41,6 +42,8 @@ public:
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
::shared_ptr<cql_statement_opt_metadata> prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats);
|
||||
::shared_ptr<cql3::statements::modification_statement> prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const;
|
||||
void add_raw(sstring&& raw) { _raw_cql = std::move(raw); }
|
||||
const sstring& get_raw_cql() const { return _raw_cql; }
|
||||
protected:
|
||||
virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(data_dictionary::database db, schema_ptr schema,
|
||||
prepare_context& ctx, std::unique_ptr<attributes> attrs, cql_stats& stats) const = 0;
|
||||
|
||||
@@ -237,9 +237,10 @@ keyspace_metadata::new_keyspace(std::string_view name,
|
||||
locator::replication_strategy_config_options options,
|
||||
std::optional<unsigned> initial_tablets,
|
||||
bool durables_writes,
|
||||
storage_options storage_opts)
|
||||
storage_options storage_opts,
|
||||
std::vector<schema_ptr> cf_defs)
|
||||
{
|
||||
return ::make_lw_shared<keyspace_metadata>(name, strategy_name, options, initial_tablets, durables_writes, std::vector<schema_ptr>{}, user_types_metadata{}, storage_opts);
|
||||
return ::make_lw_shared<keyspace_metadata>(name, strategy_name, options, initial_tablets, durables_writes, cf_defs, user_types_metadata{}, storage_opts);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
|
||||
@@ -48,7 +48,8 @@ public:
|
||||
locator::replication_strategy_config_options options,
|
||||
std::optional<unsigned> initial_tablets,
|
||||
bool durables_writes = true,
|
||||
storage_options storage_opts = {});
|
||||
storage_options storage_opts = {},
|
||||
std::vector<schema_ptr> cf_defs = {});
|
||||
static lw_shared_ptr<keyspace_metadata>
|
||||
new_keyspace(const keyspace_metadata& ksm);
|
||||
void validate(const gms::feature_service&, const locator::topology&) const;
|
||||
|
||||
13
db/config.cc
13
db/config.cc
@@ -1259,11 +1259,24 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table")
|
||||
|
||||
, audit(this, "audit", value_status::Used, "none",
|
||||
"Controls the audit feature:\n"
|
||||
"\n"
|
||||
"\tnone : No auditing enabled.\n"
|
||||
"\tsyslog : Audit messages sent to Syslog.\n"
|
||||
"\ttable : Audit messages written to column family named audit.audit_log.\n")
|
||||
, audit_categories(this, "audit_categories", value_status::Used, "DCL,DDL,AUTH", "Comma separated list of operation categories that should be audited.")
|
||||
, audit_tables(this, "audit_tables", value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
|
||||
, audit_keyspaces(this, "audit_keyspaces", value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
|
||||
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writting to syslog. Only applicable when audit is set to syslog.")
|
||||
, audit_syslog_write_buffer_size(this, "audit_syslog_write_buffer_size", value_status::Used, 1048576, "The size (in bytes) of a write buffer used when writting to syslog socket.")
|
||||
, ldap_url_template(this, "ldap_url_template", value_status::Used, "", "LDAP URL template used by LDAPRoleManager for crafting queries.")
|
||||
, ldap_attr_role(this, "ldap_attr_role", value_status::Used, "", "LDAP attribute containing Scylla role.")
|
||||
, ldap_bind_dn(this, "ldap_bind_dn", value_status::Used, "", "Distinguished name used by LDAPRoleManager for binding to LDAP server.")
|
||||
, ldap_bind_passwd(this, "ldap_bind_passwd", value_status::Used, "", "Password used by LDAPRoleManager for binding to LDAP server.")
|
||||
, saslauthd_socket_path(this, "saslauthd_socket_path", value_status::Used, "", "UNIX domain socket on which saslauthd is listening.")
|
||||
|
||||
, error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.")
|
||||
, topology_barrier_stall_detector_threshold_seconds(this, "topology_barrier_stall_detector_threshold_seconds", value_status::Used, 2, "Report sites blocking topology barrier if it takes longer than this.")
|
||||
, enable_tablets(this, "enable_tablets", value_status::Used, false, "Enable tablets for newly created keyspaces.")
|
||||
|
||||
@@ -504,6 +504,13 @@ public:
|
||||
|
||||
named_value<uint32_t> service_levels_interval;
|
||||
|
||||
named_value<sstring> audit;
|
||||
named_value<sstring> audit_categories;
|
||||
named_value<sstring> audit_tables;
|
||||
named_value<sstring> audit_keyspaces;
|
||||
named_value<sstring> audit_unix_socket_path;
|
||||
named_value<size_t> audit_syslog_write_buffer_size;
|
||||
|
||||
named_value<sstring> ldap_url_template;
|
||||
named_value<sstring> ldap_attr_role;
|
||||
named_value<sstring> ldap_bind_dn;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
static logging::logger tlogger("table_helper");
|
||||
|
||||
@@ -139,7 +140,8 @@ future<> table_helper::insert(cql3::query_processor& qp, service::migration_mana
|
||||
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);
|
||||
}
|
||||
|
||||
future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
|
||||
future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_strategy_name,
|
||||
sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return;
|
||||
}
|
||||
@@ -165,6 +167,15 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
|
||||
if (!db.has_keyspace(keyspace_name)) {
|
||||
std::map<sstring, sstring> opts;
|
||||
if (replication_strategy_name == "org.apache.cassandra.locator.NetworkTopologyStrategy") {
|
||||
for (const auto &dc: qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters())
|
||||
opts[dc] = replication_factor;
|
||||
}
|
||||
else {
|
||||
opts["replication_factor"] = replication_factor;
|
||||
}
|
||||
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, replication_strategy_name, std::move(opts), true);
|
||||
try {
|
||||
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
|
||||
std::move(group0_guard), seastar::format("table_helper: create {} keyspace", keyspace_name));
|
||||
|
||||
@@ -99,7 +99,8 @@ public:
|
||||
|
||||
future<> insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
|
||||
|
||||
static future<> setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
|
||||
static future<> setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_strategy_name,
|
||||
sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
|
||||
|
||||
/**
|
||||
* Makes a monotonically increasing value in 100ns ("nanos") based on the given time
|
||||
|
||||
@@ -28,6 +28,7 @@ target_link_libraries(test-lib
|
||||
Seastar::seastar
|
||||
xxHash::xxhash
|
||||
PRIVATE
|
||||
audit
|
||||
auth
|
||||
cdc
|
||||
compaction
|
||||
|
||||
@@ -212,7 +212,7 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migration_manager& mm) {
|
||||
_qp_anchor = &qp;
|
||||
_mm_anchor = &mm;
|
||||
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
|
||||
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "org.apache.cassandra.locator.SimpleStrategy", "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
|
||||
}
|
||||
|
||||
gms::inet_address trace_keyspace_helper::my_address() const noexcept {
|
||||
|
||||
Reference in New Issue
Block a user