merge: Forbid internal schema changes for distributed tables

Merged patch set from Piotr Sarna:

This series addresses issue #6700 again (it was reopened),
by forbidding all non-local schema changes to be performed
from within the database via CQL interface. These changes
are dangerous since they are not directly propagated to other
nodes.

Tests: unit(dev)
Fixes #6700

Piotr Sarna (4):
  test: make schema changes in query_processor_test global
  cql3: refuse to change schema internally for distributed tables
  test: expand testing internal schema changes
  cql3: add explanatory comments to execute_internal

 cql3/query_processor.hh                      | 13 ++++++++++++-
 cql3/statements/alter_table_statement.cc     |  6 ------
 cql3/statements/schema_altering_statement.cc | 15 +++++++++++++++
 test/boost/cql_query_test.cc                 |  8 ++++++--
 test/boost/query_processor_test.cc           | 16 ++++++++--------
 5 files changed, 41 insertions(+), 17 deletions(-)
This commit is contained in:
Nadav Har'El
2020-07-07 13:58:40 +03:00
committed by Avi Kivity
5 changed files with 41 additions and 17 deletions

View File

@@ -207,6 +207,12 @@ public:
service::query_state& query_state,
query_options& options);
// NOTICE: Internal queries should be used with care, as they are expected
// to be used for local tables (e.g. from the `system` keyspace).
// Data modifications will usually be performed with consistency level ONE
// and schema changes will not be announced to other nodes.
// Because of that, changing global schema state (e.g. modifying non-local tables,
// creating namespaces, etc) is explicitly forbidden via this interface.
future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
return execute_internal(query_string, db::consistency_level::ONE,
@@ -290,7 +296,12 @@ public:
const sstring& query_string,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)>&& f);
// NOTICE: Internal queries should be used with care, as they are expected
// to be used for local tables (e.g. from the `system` keyspace).
// Data modifications will usually be performed with consistency level ONE
// and schema changes will not be announced to other nodes.
// Because of that, changing global schema state (e.g. modifying non-local tables,
// creating namespaces, etc) is explicitly forbidden via this interface.
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,

View File

@@ -291,12 +291,6 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
throw exceptions::invalid_request_exception("Cannot use ALTER TABLE on Materialized View");
}
const auto& ks = db.find_keyspace(keyspace());
auto replication_type = ks.get_replication_strategy().get_type();
if (is_local_only && replication_type != locator::replication_strategy_type::local) {
throw std::logic_error(format("Internal queries should not try to alter table schema for non-local tables, because it leads to inconsistencies: {}.{}",
s->ks_name(), s->cf_name()));
}
auto cfm = schema_builder(s);
if (_properties->get_id()) {

View File

@@ -40,6 +40,8 @@
*/
#include "cql3/statements/schema_altering_statement.hh"
#include "locator/abstract_replication_strategy.hh"
#include "database.hh"
#include "transport/messages/result_message.hh"
@@ -110,6 +112,19 @@ schema_altering_statement::execute0(service::storage_proxy& proxy, service::quer
future<::shared_ptr<messages::result_message>>
schema_altering_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) const {
bool internal = state.get_client_state().is_internal();
if (internal) {
auto replication_type = locator::replication_strategy_type::everywhere_topology;
database& db = proxy.get_db().local();
if (_cf_name && _cf_name->has_keyspace()) {
const auto& ks = db.find_keyspace(_cf_name->get_keyspace());
replication_type = ks.get_replication_strategy().get_type();
}
if (replication_type != locator::replication_strategy_type::local) {
sstring info = _cf_name ? _cf_name->to_string() : "schema";
throw std::logic_error(format("Attempted to modify {} via internal query: such schema changes are not propagated and thus illegal", info));
}
}
return execute0(proxy, state, options, internal).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
auto permissions_granted_fut = internal
? make_ready_future<>()

View File

@@ -4575,12 +4575,16 @@ SEASTAR_TEST_CASE(ck_slice_with_null_is_forbidden) {
});
}
SEASTAR_TEST_CASE(test_internal_alter_table_on_a_distributed_table) {
SEASTAR_TEST_CASE(test_internal_schema_changes_on_a_distributed_table) {
return do_with_cql_env([](cql_test_env& e) {
return seastar::async([&e] {
cquery_nofail(e, "create table t (p int primary key, v int)");
const auto local_err = exception_predicate::message_contains("local");
const auto local_err = exception_predicate::message_contains("internal query");
BOOST_REQUIRE_EXCEPTION(db::execute_cql("alter table ks.t add col abcd").get(), std::logic_error, local_err);
BOOST_REQUIRE_EXCEPTION(db::execute_cql("create table ks.t2 (id int primary key)").get(), std::logic_error, local_err);
BOOST_REQUIRE_EXCEPTION(db::execute_cql("create index on ks.t(v)").get(), std::logic_error, local_err);
BOOST_REQUIRE_EXCEPTION(db::execute_cql("drop table ks.t").get(), std::logic_error, local_err);
BOOST_REQUIRE_EXCEPTION(db::execute_cql("drop keyspace ks").get(), std::logic_error, local_err);
});
});
}

View File

@@ -41,8 +41,8 @@
SEASTAR_TEST_CASE(test_execute_internal_insert) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
return qp.execute_internal("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(rs->empty());
return e.execute_cql("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(dynamic_pointer_cast<cql_transport::messages::result_message::schema_change>(rs));
}).then([&qp] {
return qp.execute_internal("insert into ks.cf (p1, c1, r1) values (?, ?, ?);", { sstring("key1"), 1, 100 }).then([](auto rs) {
BOOST_REQUIRE(rs->empty());
@@ -69,8 +69,8 @@ SEASTAR_TEST_CASE(test_execute_internal_insert) {
SEASTAR_TEST_CASE(test_execute_internal_delete) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
return qp.execute_internal("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(rs->empty());
return e.execute_cql("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(dynamic_pointer_cast<cql_transport::messages::result_message::schema_change>(rs));
}).then([&qp] {
return qp.execute_internal("insert into ks.cf (p1, c1, r1) values (?, ?, ?);", { sstring("key1"), 1, 100 }).then([](auto rs) {
BOOST_REQUIRE(rs->empty());
@@ -90,8 +90,8 @@ SEASTAR_TEST_CASE(test_execute_internal_delete) {
SEASTAR_TEST_CASE(test_execute_internal_update) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
return qp.execute_internal("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(rs->empty());
return e.execute_cql("create table ks.cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").then([](auto rs) {
BOOST_REQUIRE(dynamic_pointer_cast<cql_transport::messages::result_message::schema_change>(rs));
}).then([&qp] {
return qp.execute_internal("insert into ks.cf (p1, c1, r1) values (?, ?, ?);", { sstring("key1"), 1, 100 }).then([](auto rs) {
BOOST_REQUIRE(rs->empty());
@@ -298,8 +298,8 @@ SEASTAR_TEST_CASE(test_query_counters) {
SEASTAR_TEST_CASE(test_select_full_scan_metrics) {
return do_with_cql_env_thread([](cql_test_env& e) {
auto& qp = e.local_qp();
qp.execute_internal("create table ks.fsm (pk int, ck int, c1 int, c2 int, PRIMARY KEY(pk, ck));").get();
qp.execute_internal("create index on ks.fsm (c1);").get();
cquery_nofail(e, "create table ks.fsm (pk int, ck int, c1 int, c2 int, PRIMARY KEY(pk, ck));");
cquery_nofail(e, "create index on ks.fsm (c1);");
qp.execute_internal("insert into ks.fsm (pk, ck, c1, c2) values (?,?,?,?);", { 1, 1, 1, 1 }).get();
auto stat_bc1 = qp.get_cql_stats().select_bypass_caches;
qp.execute_internal("select * from ks.fsm bypass cache;").get();