Merge 'scylla-sstable: add support for loading schema of views and indexes' from Botond Dénes

Loading schemas of views and indexes was not supported, with either `--schema-file`, or when loading schema from schema sstables.
This PR addresses both:
* When loading schema from CQL (file), `CREATE MATERIALIZED VIEW` and `CREATE INDEX` statements are now also processed correctly.
* When loading schema from schema tables, `system_schema.views` is also processed, when the table has no corresponding entry in `system_schema.tables`.

Tests are also added.

Fixes: #16492

Closes scylladb/scylladb#16517

* github.com:scylladb/scylladb:
  test/cql-pytest: test_tools.py: add schema-loading tests for MV/SI
  test/cql-pytest: test_tools.py: extract some fixture logic to functions
  test/cql-pytest: test_tools.py: extract common schema-loading facilities into base-class
  tools/schema_loader: load_schema_from_schema_tables(): add support for MV/SI schemas
  tools/schema_loader: load_one_schema_from_file(): add support for view/index schemas
  test/boost/schema_loader_test: add test for mvs and indexes
  tools/schema_loader: load_schemas(): implement parsing views/indexes from CQL
  replica/database: extract existing_index_names and get_available_index_name
  tools/schema_loader: make real_db.tables the only source of truth on existing tables
  tools/schema_loader: table(): store const keyspace&
  tools/schema_loader: make database,keyspace,table non-movable
  cql3/statements/create_index_statement: build_index_schema(): include index metadata in returned value
  cql3/statements/create_index_statement: make build_index_schema() public
  cql3/statements/create_index_statement: relax some method's dependence on qp
  cql3/statements/create_view_statement: make prepare_view() public
This commit is contained in:
Nadav Har'El
2024-01-24 23:36:54 +02:00
9 changed files with 545 additions and 100 deletions

View File

@@ -73,8 +73,7 @@ create_index_statement::validate(query_processor& qp, const service::client_stat
_properties->validate();
}
std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_executing(query_processor& qp) const {
auto db = qp.db();
std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_executing(data_dictionary::database db) const {
auto schema = validation::validate_column_family(db, keyspace(), column_family());
if (schema->is_counter()) {
@@ -327,10 +326,9 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
}
}
schema_ptr create_index_statement::build_index_schema(query_processor& qp) const {
auto targets = validate_while_executing(qp);
std::optional<create_index_statement::base_schema_with_new_index> create_index_statement::build_index_schema(data_dictionary::database db) const {
auto targets = validate_while_executing(db);
data_dictionary::database db = qp.db();
auto schema = db.find_schema(keyspace(), column_family());
sstring accepted_name = _index_name;
@@ -355,7 +353,7 @@ schema_ptr create_index_statement::build_index_schema(query_processor& qp) const
auto existing_index = schema->find_index_noname(index);
if (existing_index) {
if (_if_not_exists) {
return schema_ptr();
return {};
} else {
throw exceptions::invalid_request_exception(
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
@@ -372,19 +370,19 @@ schema_ptr create_index_statement::build_index_schema(query_processor& qp) const
schema_builder builder{schema};
builder.with_index(index);
return builder.build();
return base_schema_with_new_index{builder.build(), index};
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
using namespace cql_transport;
auto schema = build_index_schema(qp);
auto res = build_index_schema(qp.db());
::shared_ptr<event::schema_change> ret;
std::vector<mutation> m;
if (schema) {
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), std::move(schema), false, {}, ts);
if (res) {
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), std::move(res->schema), false, {}, ts);
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,

View File

@@ -48,6 +48,12 @@ public:
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
struct base_schema_with_new_index {
schema_ptr schema;
index_metadata index;
};
std::optional<base_schema_with_new_index> build_index_schema(data_dictionary::database db) const;
private:
void validate_for_local_index(const schema& schema) const;
void validate_for_frozen_collection(const index_target& target) const;
@@ -62,8 +68,7 @@ private:
const sstring& name,
index_metadata_kind kind,
const index_options_map& options);
std::vector<::shared_ptr<index_target>> validate_while_executing(query_processor& qp) const;
schema_ptr build_index_schema(query_processor& qp) const;
std::vector<::shared_ptr<index_target>> validate_while_executing(data_dictionary::database db) const;
};
}

View File

@@ -38,8 +38,6 @@ private:
cf_properties _properties;
bool _if_not_exists;
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db) const;
public:
create_view_statement(
cf_name view_name,
@@ -50,6 +48,8 @@ public:
std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
bool if_not_exists);
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db) const;
auto& properties() {
return _properties;
}

View File

@@ -163,6 +163,39 @@ sstring index_name_from_table_name(const sstring& table_name) {
return table_name.substr(0, table_name.size() - 6); // remove the _index suffix from an index name;
}
std::set<sstring>
existing_index_names(const std::vector<schema_ptr>& tables, std::string_view cf_to_exclude) {
std::set<sstring> names;
for (auto& schema : tables) {
if (!cf_to_exclude.empty() && schema->cf_name() == cf_to_exclude) {
continue;
}
for (const auto& index_name : schema->index_names()) {
names.emplace(index_name);
}
}
return names;
}
sstring get_available_index_name(
std::string_view ks_name,
std::string_view cf_name,
std::optional<sstring> index_name_root,
const std::set<sstring>& existing_names,
std::function<bool(std::string_view, std::string_view)> has_schema) {
auto base_name = index_metadata::get_default_index_name(sstring(cf_name), index_name_root);
sstring accepted_name = base_name;
int i = 0;
auto name_accepted = [&] {
auto index_table_name = secondary_index::index_table_name(accepted_name);
return !has_schema(ks_name, index_table_name) && !existing_names.contains(accepted_name);
};
while (!name_accepted()) {
accepted_name = base_name + "_" + std::to_string(++i);
}
return accepted_name;
}
static bytes get_available_column_name(const schema& schema, const bytes& root) {
bytes accepted_name = root;
int i = 0;

View File

@@ -34,6 +34,21 @@ sstring index_table_name(const sstring& index_name);
*/
sstring index_name_from_table_name(const sstring& table_name);
/// Given a list of base-table schemas, return all their secondary indexes, except that specified in cf_to_exclude.
std::set<sstring>
existing_index_names(const std::vector<schema_ptr>& tables, std::string_view cf_to_exclude);
/// Given a base-table keyspace and table name, return the first available index
/// name (containing index_name_root if specified).
/// If needed, a running counder is appended to the index name, if it is already
/// taken (existing_names contains it).
sstring get_available_index_name(
std::string_view ks_name,
std::string_view cf_name,
std::optional<sstring> index_name_root,
const std::set<sstring>& existing_names,
std::function<bool(std::string_view, std::string_view)> has_schema);
class index {
index_metadata _im;
cql3::statements::index_target::target_type _target_type;

View File

@@ -1360,16 +1360,7 @@ database::drop_caches() const {
std::set<sstring>
database::existing_index_names(const sstring& ks_name, const sstring& cf_to_exclude) const {
std::set<sstring> names;
for (auto& schema : find_keyspace(ks_name).metadata()->tables()) {
if (!cf_to_exclude.empty() && schema->cf_name() == cf_to_exclude) {
continue;
}
for (const auto& index_name : schema->index_names()) {
names.emplace(index_name);
}
}
return names;
return secondary_index::existing_index_names(find_keyspace(ks_name).metadata()->tables(), cf_to_exclude);
}
namespace {
@@ -2201,18 +2192,8 @@ namespace replica {
sstring database::get_available_index_name(const sstring &ks_name, const sstring &cf_name,
std::optional<sstring> index_name_root) const
{
auto existing_names = existing_index_names(ks_name);
auto base_name = index_metadata::get_default_index_name(cf_name, index_name_root);
sstring accepted_name = base_name;
int i = 0;
auto name_accepted = [&] {
auto index_table_name = secondary_index::index_table_name(accepted_name);
return !has_schema(ks_name, index_table_name) && !existing_names.contains(accepted_name);
};
while (!name_accepted()) {
accepted_name = base_name + "_" + std::to_string(++i);
}
return accepted_name;
return secondary_index::get_available_index_name(ks_name, cf_name, index_name_root, existing_index_names(ks_name),
[this] (std::string_view ks, std::string_view cf) { return has_schema(ks, cf); });
}
schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& index_name) const {

View File

@@ -6,10 +6,13 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/log.hh"
#include "test/lib/scylla_test_case.hh"
#include "db/config.hh"
#include "index/secondary_index_manager.hh"
#include "tools/schema_loader.hh"
#include "view_info.hh"
SEASTAR_THREAD_TEST_CASE(test_empty) {
db::config dbcfg;
@@ -126,3 +129,151 @@ SEASTAR_THREAD_TEST_CASE(test_dropped_columns) {
"INSERT INTO system_schema.dropped_columns (keyspace_name, table_name, column_name, dropped_time, type) VALUES ('ks', 'unknown_cf', 'v2', 1631011979170675, 'int'); "
).get(), std::exception);
}
/// Check that:
/// * schemas[0] is a base schema (it is not a view)
/// * schemas[1]..schemas.back() are views
/// * schemas[0] is the base of each view
/// * the type of schemas[i] matches views_type[i - 1]
enum class view_type {
index,
view
};
void check_views(std::vector<schema_ptr> schemas, std::vector<view_type> views_type, std::source_location sl = std::source_location::current()) {
testlog.info("Checking views built at {}:{}", sl.file_name(), sl.line());
BOOST_REQUIRE_EQUAL(schemas.size(), views_type.size() + 1);
const auto base_schema = schemas.front();
testlog.info("Base table is {}.{}", base_schema->ks_name(), base_schema->cf_name());
BOOST_REQUIRE(!base_schema->is_view());
auto schema_it = schemas.begin() + 1;
auto view_type_it = views_type.begin();
while (schema_it != schemas.end() && view_type_it != views_type.end()) {
auto schema = *schema_it;
auto type = *view_type_it;
testlog.info("Checking view {}.{} is_index={}", schema->ks_name(), schema->cf_name(), type == view_type::index);
BOOST_REQUIRE(schema->is_view());
BOOST_REQUIRE_EQUAL(base_schema->id(), schema->view_info()->base_id());
if (type == view_type::index) {
BOOST_REQUIRE(base_schema->has_index(secondary_index::index_name_from_table_name(schema->cf_name())));
}
++schema_it;
++view_type_it;
}
BOOST_REQUIRE(schema_it == schemas.end());
BOOST_REQUIRE(view_type_it == views_type.end());
}
SEASTAR_THREAD_TEST_CASE(test_materialized_view) {
db::config dbcfg;
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE MATERIALIZED VIEW ks.cf_by_v AS"
" SELECT * FROM ks.cf"
" WHERE v IS NOT NULL"
" PRIMARY KEY (v, pk);").get(),
{view_type::view});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int, v2 int); "
"CREATE MATERIALIZED VIEW ks.cf_by_v1 AS"
" SELECT * FROM ks.cf"
" WHERE v1 IS NOT NULL"
" PRIMARY KEY (v1, pk);"
"CREATE MATERIALIZED VIEW ks.cf_by_v2 AS"
" SELECT * FROM ks.cf"
" WHERE v2 IS NOT NULL"
" PRIMARY KEY (v2, pk);").get(),
{view_type::view, view_type::view});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE MATERIALIZED VIEW IF NOT EXISTS ks.cf_by_v AS"
" SELECT * FROM ks.cf"
" WHERE v IS NOT NULL"
" PRIMARY KEY (v, pk);"
"CREATE MATERIALIZED VIEW IF NOT EXISTS ks.cf_by_v AS"
" SELECT * FROM ks.cf"
" WHERE v IS NOT NULL"
" PRIMARY KEY (v, pk);").get(),
{view_type::view});
};
SEASTAR_THREAD_TEST_CASE(test_index) {
db::config dbcfg;
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE INDEX cf_by_v ON ks.cf (v);").get(),
{view_type::index});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE INDEX ON ks.cf (v);").get(),
{view_type::index});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int, v2 int); "
"CREATE INDEX cf_by_v1 ON ks.cf (v1);"
"CREATE INDEX cf_by_v2 ON ks.cf (v2);").get(),
{view_type::index, view_type::index});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int, v2 int); "
"CREATE INDEX ON ks.cf (v1);"
"CREATE INDEX ON ks.cf (v2);").get(),
{view_type::index, view_type::index});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE INDEX IF NOT EXISTS cf_by_v ON ks.cf (v);"
"CREATE INDEX IF NOT EXISTS cf_by_v ON ks.cf (v);").get(),
{view_type::index});
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v int); "
"CREATE INDEX IF NOT EXISTS ON ks.cf (v);"
"CREATE INDEX IF NOT EXISTS ON ks.cf (v);").get(),
{view_type::index});
}
SEASTAR_THREAD_TEST_CASE(test_mv_index) {
db::config dbcfg;
check_views(
tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int, v2 int); "
"CREATE MATERIALIZED VIEW ks.cf_by_v1 AS"
" SELECT * FROM ks.cf"
" WHERE v1 IS NOT NULL"
" PRIMARY KEY (v1, pk);"
"CREATE INDEX ON ks.cf (v2);"
"CREATE MATERIALIZED VIEW ks.cf_by_v2 AS"
" SELECT * FROM ks.cf"
" WHERE v2 IS NOT NULL"
" PRIMARY KEY (v2, pk);"
"CREATE INDEX ON ks.cf (v1);").get(),
{view_type::view, view_type::index, view_type::view, view_type::index});
}

View File

@@ -506,24 +506,63 @@ def test_scylla_sstable_script(cql, test_keyspace, scylla_path, scylla_data_dir,
assert slice_lua_json == cxx_json
@pytest.fixture(scope="class")
def system_scylla_local_sstable_prepared(cql, scylla_data_dir):
""" Prepares the system.scylla_local table for the needs of the schema loading tests.
class TestScyllaSsstableSchemaLoadingBase:
def check(self, scylla_path, extra_args, sstable, dump_reference, cwd=None, env=None):
dump_common_args = [scylla_path, "sstable", "dump-data", "--output-format", "json", "--logger-log-level", "scylla-sstable=debug:schema_loader=trace"]
dump = json.loads(subprocess.check_output(dump_common_args + extra_args + [sstable], cwd=cwd, env=env))["sstables"]
dump = list(dump.values())[0]
assert dump == dump_reference
def check_fail(self, scylla_path, extra_args, sstable, error_msg=None):
common_args = [scylla_path, "sstable", "dump-data", "--logger-log-level", "scylla-sstable=debug:schema_loader=trace"]
res = subprocess.run(common_args + extra_args + [sstable], capture_output=True, text=True)
print(res.stderr)
if error_msg is None:
error_msg = "Failed to autodetect and load schema, try again with --logger-log-level scylla-sstable=debug to learn more or provide the schema source manually"
assert res.stderr.split('\n')[-2] == error_msg
assert res.returncode != 0
def copy_sstable_to_external_dir(self, system_scylla_local_sstable_prepared, temp_workdir):
table_data_dir, sstable_filename = os.path.split(system_scylla_local_sstable_prepared)
sstable_glob = "-".join(sstable_filename.split("-")[:-1]) + "*"
sstable_components = glob.glob(os.path.join(table_data_dir, sstable_glob))
for c in sstable_components:
shutil.copy(c, temp_workdir)
return glob.glob(os.path.join(temp_workdir, "*-Data.db"))[0]
@contextlib.contextmanager
def _prepare_sstable(cql, scylla_data_dir, table, write_fun=None):
""" Prepares the table for the needs of the schema loading tests.
Namely:
* Disable auto-compaction for the system-schema keyspace and system.scylla_local table.
* Disable auto-compaction for the system-schema keyspace and table's keyspace.
* Flushes said keyspaces.
* Locates an sstable belonging to system.scylla_local and returns it.
* Locates an sstable belonging to the table and returns it.
"""
with nodetool.no_autocompaction_context(cql, "system.scylla_local", "system_schema"):
keyspace_name, table_name = table.split(".")
with nodetool.no_autocompaction_context(cql, keyspace_name, "system_schema"):
if write_fun is not None:
write_fun()
# Need to flush system keyspaces whose sstables we want to meddle
# with, to make sure they are actually on disk.
nodetool.flush_keyspace(cql, "system_schema")
nodetool.flush_keyspace(cql, "system")
sstables = glob.glob(os.path.join(scylla_data_dir, "system", "scylla_local-*", "*-Data.db"))
nodetool.flush_keyspace(cql, keyspace_name)
sstables = glob.glob(os.path.join(scylla_data_dir, keyspace_name, table_name + "-*", "*-Data.db"))
yield sstables[0]
@pytest.fixture(scope="class")
def system_scylla_local_sstable_prepared(cql, scylla_data_dir):
with _prepare_sstable(cql, scylla_data_dir, "system.scylla_local") as sst:
yield sst
@pytest.fixture(scope="class")
def system_scylla_local_schema_file():
""" Prepares a schema.cql with the schema of system.scylla_local. """
@@ -554,24 +593,26 @@ def scylla_home_dir(scylla_data_dir):
yield scylla_home
@pytest.fixture(scope="class")
def system_scylla_local_reference_dump(scylla_path, system_scylla_local_sstable_prepared):
""" Produce a reference json dump of the system.scylla_local sstable. """
def _produce_reference_dump(scylla_path, schema_args, sstable):
""" Produce a json dump, to be used as a reference, of the specified sstable. """
dump_reference = subprocess.check_output([
scylla_path,
"sstable",
"dump-data",
"--output-format", "json",
"--logger-log-level", "scylla-sstable=debug",
"--system-schema",
"--keyspace", "system",
"--table", "scylla_local",
system_scylla_local_sstable_prepared])
] + schema_args + [sstable])
dump_reference = json.loads(dump_reference)["sstables"]
return list(dump_reference.values())[0]
class TestScyllaSsstableSchemaLoading:
@pytest.fixture(scope="class")
def system_scylla_local_reference_dump(scylla_path, system_scylla_local_sstable_prepared):
return _produce_reference_dump(scylla_path, ["--system-schema", "--keyspace", "system", "--table", "scylla_local"],
system_scylla_local_sstable_prepared)
class TestScyllaSsstableSchemaLoading(TestScyllaSsstableSchemaLoadingBase):
""" Test class containing all the schema loader tests.
Helps in providing a natural scope of all the specialized fixtures shared by
@@ -580,31 +621,6 @@ class TestScyllaSsstableSchemaLoading:
keyspace = "system"
table = "scylla_local"
def check(self, scylla_path, extra_args, sstable, dump_reference, cwd=None, env=None):
dump_common_args = [scylla_path, "sstable", "dump-data", "--output-format", "json", "--logger-log-level", "scylla-sstable=debug"]
dump = json.loads(subprocess.check_output(dump_common_args + extra_args + [sstable], cwd=cwd, env=env))["sstables"]
dump = list(dump.values())[0]
assert dump == dump_reference
def check_fail(self, scylla_path, extra_args, sstable, error_msg=None):
common_args = [scylla_path, "sstable", "dump-data", "--logger-log-level", "scylla-sstable=debug:schema_loader=trace"]
res = subprocess.run(common_args + extra_args + [sstable], capture_output=True, text=True)
print(res.stderr)
if error_msg is None:
error_msg = "Failed to autodetect and load schema, try again with --logger-log-level scylla-sstable=debug to learn more or provide the schema source manually"
assert res.stderr.split('\n')[-2] == error_msg
assert res.returncode != 0
def copy_sstable_to_external_dir(self, system_scylla_local_sstable_prepared, temp_workdir):
table_data_dir, sstable_filename = os.path.split(system_scylla_local_sstable_prepared)
sstable_glob = "-".join(sstable_filename.split("-")[:-1]) + "*"
sstable_components = glob.glob(os.path.join(table_data_dir, sstable_glob))
for c in sstable_components:
shutil.copy(c, temp_workdir)
return glob.glob(os.path.join(temp_workdir, "*-Data.db"))[0]
def test_table_dir_system_schema(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump):
self.check(
scylla_path,
@@ -754,6 +770,190 @@ class TestScyllaSsstableSchemaLoading:
error_msg="error processing arguments: could not load schema via schema-tables: std::runtime_error (Failed to find system.non-existent-table in schema tables)")
@pytest.fixture(scope="class")
def schema_test_base_table(cql, test_keyspace):
with util.new_test_table(cql, test_keyspace, "pk int, v1 text, v2 text, PRIMARY KEY (pk)") as table:
yield table
@pytest.fixture(scope="class")
def schema_test_mv(cql, schema_test_base_table):
with util.new_materialized_view(cql, schema_test_base_table,
'*', 'v1, pk', 'v1 is not null and pk is not null') as mv:
yield mv
@pytest.fixture(scope="class")
def schema_test_si(cql, schema_test_base_table):
keyspace, base_table = schema_test_base_table.split(".")
si_name = f"{base_table}_by_v2"
with util.new_secondary_index(cql, schema_test_base_table, "v2", name=si_name) as si:
yield si + "_index"
@pytest.fixture(scope="class")
def schema_test_mv_sstable_prepared(cql, test_keyspace, schema_test_base_table, schema_test_mv, scylla_data_dir):
def write():
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (0, 'v1-0', 'v2-0')")
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (1, 'v1-1', 'v2-1')")
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (2, 'v1-1', 'v2-2')")
with _prepare_sstable(cql, scylla_data_dir, schema_test_mv, write) as sst:
yield sst
@pytest.fixture(scope="class")
def schema_test_si_sstable_prepared(cql, test_keyspace, schema_test_base_table, schema_test_si, scylla_data_dir):
def write():
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (0, 'v1-0', 'v2-0')")
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (1, 'v1-1', 'v2-1')")
cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (2, 'v1-1', 'v2-2')")
with _prepare_sstable(cql, scylla_data_dir, schema_test_si, write) as sst:
yield sst
@pytest.fixture(scope="class")
def schema_test_mv_schema_file(schema_test_base_table, schema_test_mv):
""" Prepares a schema.cql with the schema of the view, matching that in the `mv_sstable_prepared` fixture. """
with tempfile.NamedTemporaryFile("w+t") as f:
f.write(f"CREATE TABLE {schema_test_base_table} (pk int, v1 text, v2 text, PRIMARY KEY (pk));")
f.write(f"CREATE MATERIALIZED VIEW {schema_test_mv} AS")
f.write(f" SELECT * FROM {schema_test_base_table} WHERE v1 IS NOT NULL AND pk IS NOT NULL")
f.write(" PRIMARY KEY (v1, pk);")
f.flush()
yield f.name
@pytest.fixture(scope="class")
def schema_test_si_schema_file(schema_test_base_table, schema_test_si):
""" Prepares a schema.cql with the schema of the index, matching that in the `si_sstable_prepared` fixture. """
keyspace, base_table = schema_test_base_table.split(".")
with tempfile.NamedTemporaryFile("w+t") as f:
f.write(f"CREATE TABLE {schema_test_base_table} (pk int, v1 text, v2 text, PRIMARY KEY (pk));")
f.write(f"CREATE INDEX {base_table}_by_v2 ON {schema_test_base_table}(v2);")
f.flush()
yield f.name
@pytest.fixture(scope="class")
def schema_test_mv_reference_dump(scylla_path, schema_test_mv, schema_test_mv_sstable_prepared):
with tempfile.NamedTemporaryFile("w+t") as f:
f.write(f"CREATE TABLE {schema_test_mv} (v1 text, pk int, v2 text, PRIMARY KEY (v1, pk))")
f.flush()
return _produce_reference_dump(scylla_path, ["--schema-file", f.name], schema_test_mv_sstable_prepared)
@pytest.fixture(scope="class")
def schema_test_si_reference_dump(scylla_path, schema_test_si, schema_test_si_sstable_prepared):
with tempfile.NamedTemporaryFile("w+t") as f:
f.write(f"CREATE TABLE {schema_test_si} (v2 text, idx_token bigint, pk int, PRIMARY KEY (v2, idx_token, pk))")
f.flush()
return _produce_reference_dump(scylla_path, ["--schema-file", f.name], schema_test_si_sstable_prepared)
class TestScyllaSsstableViewSchemaLoading(TestScyllaSsstableSchemaLoadingBase):
""" Test class containing schema-loading tests for materialized views and indexes.
Similar to TestScyllaSsstableSchemaLoading, but focuses on testing that
materialized view and index schemas can be loaded with all methods.
Not focusing on exhaustively testing data directory discovery, that is
already tested by TestScyllaSsstableSchemaLoading.
"""
def test_mv_table_dir_schema_file(self, scylla_path, schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump, schema_test_mv_schema_file):
self.check(
scylla_path,
["--schema-file", schema_test_mv_schema_file],
schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump)
def test_mv_external_dir_schema_file(self, scylla_path, schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump, schema_test_mv_schema_file, temp_workdir):
ext_sstable = self.copy_sstable_to_external_dir(schema_test_mv_sstable_prepared, temp_workdir)
self.check(
scylla_path,
["--schema-file", schema_test_mv_schema_file],
ext_sstable,
schema_test_mv_reference_dump)
def test_mv_table_dir_autodeduced(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump, scylla_home_dir):
self.check(
scylla_path,
[],
schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump)
def test_mv_table_dir_scylla_yaml(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump, scylla_home_dir):
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
keyspace, table = schema_test_mv.split(".")
self.check(
scylla_path,
["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table],
schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump)
def test_mv_external_dir_scylla_yaml(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared,
schema_test_mv_reference_dump, scylla_home_dir, temp_workdir):
ext_sstable = self.copy_sstable_to_external_dir(schema_test_mv_sstable_prepared, temp_workdir)
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
keyspace, table = schema_test_mv.split(".")
self.check(
scylla_path,
["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table],
ext_sstable,
schema_test_mv_reference_dump)
def test_si_table_dir_schema_file(self, scylla_path, schema_test_si_sstable_prepared,
schema_test_si_reference_dump, schema_test_si_schema_file):
self.check(
scylla_path,
["--schema-file", schema_test_si_schema_file],
schema_test_si_sstable_prepared,
schema_test_si_reference_dump)
def test_si_external_dir_schema_file(self, scylla_path, schema_test_si_sstable_prepared,
schema_test_si_reference_dump, schema_test_si_schema_file, temp_workdir):
ext_sstable = self.copy_sstable_to_external_dir(schema_test_si_sstable_prepared, temp_workdir)
self.check(
scylla_path,
["--schema-file", schema_test_si_schema_file],
ext_sstable,
schema_test_si_reference_dump)
def test_si_table_dir_autodeduced(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared,
schema_test_si_reference_dump, scylla_home_dir):
self.check(
scylla_path,
[],
schema_test_si_sstable_prepared,
schema_test_si_reference_dump)
def test_si_table_dir_scylla_yaml(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared,
schema_test_si_reference_dump, scylla_home_dir):
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
keyspace, table = schema_test_si.split(".")
self.check(
scylla_path,
["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table],
schema_test_si_sstable_prepared,
schema_test_si_reference_dump)
def test_si_external_dir_scylla_yaml(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared,
schema_test_si_reference_dump, scylla_home_dir, temp_workdir):
ext_sstable = self.copy_sstable_to_external_dir(schema_test_si_sstable_prepared, temp_workdir)
scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml")
keyspace, table = schema_test_si.split(".")
self.check(
scylla_path,
["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table],
ext_sstable,
schema_test_si_reference_dump)
@pytest.fixture(scope="module")
def scrub_workdir():
"""A root temporary directory to be shared by all the scrub tests"""

View File

@@ -19,6 +19,8 @@
#include "cql3/statements/create_keyspace_statement.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/create_type_statement.hh"
#include "cql3/statements/create_view_statement.hh"
#include "cql3/statements/create_index_statement.hh"
#include "cql3/statements/update_statement.hh"
#include "db/cql_type_parser.hh"
#include "db/config.hh"
@@ -38,6 +40,7 @@
#include "gms/feature_service.hh"
#include "locator/abstract_replication_strategy.hh"
#include "tools/schema_loader.hh"
#include "view_info.hh"
namespace {
@@ -56,6 +59,7 @@ struct database {
database(const db::config& cfg, gms::feature_service& features) : cfg(cfg), features(features)
{ }
database(database&&) = delete;
};
struct keyspace {
@@ -63,14 +67,17 @@ struct keyspace {
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata) : metadata(std::move(metadata))
{ }
keyspace(keyspace&&) = delete;
};
struct table {
keyspace& ks;
const keyspace& ks;
schema_ptr schema;
secondary_index::secondary_index_manager secondary_idx_man;
bool user;
table(data_dictionary_impl& impl, keyspace& ks, schema_ptr schema);
table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user);
table(table&&) = delete;
};
class data_dictionary_impl : public data_dictionary::impl {
@@ -157,12 +164,21 @@ private:
static const std::vector<view_ptr> empty;
return empty;
}
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view table_name,
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view cf_name,
std::optional<sstring> index_name_root) const override {
throw std::bad_function_call();
auto has_schema = [&] (std::string_view ks_name, std::string_view table_name) {
const auto& tables = unwrap(db).tables;
return std::find_if(tables.begin(), tables.end(), [&] (const table& t) {
return t.schema->ks_name() == ks_name && t.schema->cf_name() == table_name;
}) != tables.end();
};
return secondary_index::get_available_index_name(ks_name, cf_name, index_name_root, existing_index_names(db, ks_name), has_schema);
}
virtual std::set<sstring> existing_index_names(data_dictionary::database db, std::string_view ks_name, std::string_view cf_to_exclude = {}) const override {
return {};
auto tables = boost::copy_range<std::vector<schema_ptr>>(unwrap(db).tables
| boost::adaptors::filtered([ks_name] (const table& t) { return t.schema->ks_name() == ks_name; })
| boost::adaptors::transformed([] (const table& t) { return t.schema; }));
return secondary_index::existing_index_names(tables, cf_to_exclude);
}
virtual schema_ptr find_indexed_table(data_dictionary::database db, std::string_view ks_name, std::string_view index_name) const override {
return {};
@@ -199,8 +215,8 @@ public:
}
};
table::table(data_dictionary_impl& impl, keyspace& ks, schema_ptr schema) :
ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this))
table::table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user) :
ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this)), user(user)
{ }
sstring read_file(std::filesystem::path path) {
@@ -235,9 +251,7 @@ std::vector<schema_ptr> do_load_schemas(const db::config& cfg, std::string_view
std::map<sstring, sstring>{},
std::nullopt,
false));
real_db.tables.emplace_back(dd_impl, real_db.keyspaces.back(), db::schema_tables::dropped_columns());
std::vector<schema_ptr> schemas;
real_db.tables.emplace_back(dd_impl, real_db.keyspaces.back(), db::schema_tables::dropped_columns(), false);
auto find_or_create_keyspace = [&] (const sstring& name) -> data_dictionary::keyspace {
try {
@@ -276,14 +290,37 @@ std::vector<schema_ptr> do_load_schemas(const db::config& cfg, std::string_view
} else if (auto p = dynamic_cast<cql3::statements::create_type_statement*>(statement)) {
dd_impl.unwrap(ks).metadata->add_user_type(p->create_type(db));
} else if (auto p = dynamic_cast<cql3::statements::create_table_statement*>(statement)) {
schemas.push_back(p->get_cf_meta_data(db));
auto schema = p->get_cf_meta_data(db);
// CDC tables use a custom partitioner, which is not reflected when
// dumping the schema to schema.cql, so we have to manually set it here.
if (cdc::is_log_name(schemas.back()->cf_name())) {
schema_builder b(std::move(schemas.back()));
if (cdc::is_log_name(schema->cf_name())) {
schema_builder b(std::move(schema));
b.with_partitioner(cdc::cdc_partitioner::classname);
schemas.back() = b.build();
schema = b.build();
}
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), std::move(schema), true);
} else if (auto p = dynamic_cast<cql3::statements::create_view_statement*>(statement)) {
auto&& [view, warnings] = p->prepare_view(db);
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->ks_name() == view->ks_name() && t.schema->cf_name() == view->cf_name(); });
if (it != real_db.tables.end()) {
continue; // view already exists
}
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true);
} else if (auto p = dynamic_cast<cql3::statements::create_index_statement*>(statement)) {
auto res = p->build_index_schema(db);
if (!res) {
continue; // index already exists
}
auto [new_base_schema, index] = *res;
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->id() == new_base_schema->id(); });
if (it == real_db.tables.end()) { // shouldn't happen but let's handle it
throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed to look up base table {}.{}, while creating index on it", new_base_schema->ks_name(), new_base_schema->cf_name()));
}
it->schema = std::move(new_base_schema);
it->secondary_idx_man.reload();
const bool new_token_column_computation = db.features().correct_idx_token_in_secondary_index;
auto view = it->secondary_idx_man.create_view_for_index(index, new_token_column_computation);
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true);
} else if (auto p = dynamic_cast<cql3::statements::update_statement*>(statement)) {
if (p->keyspace() != db::schema_tables::NAME && p->column_family() != db::schema_tables::DROPPED_COLUMNS) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected modification statement to be against {}.{}, but it is against {}.{}",
@@ -308,17 +345,18 @@ std::vector<schema_ptr> do_load_schemas(const db::config& cfg, std::string_view
for (auto& row : rs.rows()) {
const auto keyspace_name = row.get_nonnull<sstring>("keyspace_name");
const auto table_name = row.get_nonnull<sstring>("table_name");
auto it = std::find_if(schemas.begin(), schemas.end(), [&] (schema_ptr s) {
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) {
auto& s = t.schema;
return s->ks_name() == keyspace_name && s->cf_name() == table_name;
});
if (it == schemas.end()) {
if (it == real_db.tables.end()) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed applying update to {}.{}, the table it applies to is not found: {}.{}",
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name));
}
auto name = row.get_nonnull<sstring>("column_name");
auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull<sstring>("type"), user_types_storage(real_db));
auto time = row.get_nonnull<db_clock::time_point>("dropped_time");
*it = schema_builder(*it).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build();
it->schema = schema_builder(std::move(it->schema)).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build();
}
} else {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected statement to be one of (create keyspace, create type, create table), got: {}",
@@ -326,7 +364,10 @@ std::vector<schema_ptr> do_load_schemas(const db::config& cfg, std::string_view
}
}
return schemas;
return boost::copy_range<std::vector<schema_ptr>>(
real_db.tables |
boost::adaptors::filtered([] (const table& t) { return t.user; }) |
boost::adaptors::transformed([] (const table& t) { return t.schema; }));
}
struct sstable_manager_service {
@@ -474,6 +515,7 @@ std::unordered_map<schema_ptr, std::string> get_schema_table_directories(std::fi
const std::vector<schema_ptr> schemas{
db::schema_tables::types(),
db::schema_tables::tables(),
db::schema_tables::views(),
db::schema_tables::columns(),
db::schema_tables::view_virtual_columns(),
db::schema_tables::computed_columns(),
@@ -524,6 +566,9 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
auto schema_table_table_dir = get_schema_table_directories(scylla_data_path);
auto schema_tables_path = scylla_data_path / db::schema_tables::NAME;
auto empty = [] (const mutation_opt& mopt) {
return !mopt || !mopt->partition().row_count();
};
auto do_load = [&] (std::function<const schema_ptr()> schema_factory) {
auto s = schema_factory();
return read_schema_table_mutation(
@@ -535,6 +580,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
{table});
};
mutation_opt tables = do_load(db::schema_tables::tables);
mutation_opt views = do_load(db::schema_tables::views);
mutation_opt columns = do_load(db::schema_tables::columns);
mutation_opt view_virtual_columns = do_load(db::schema_tables::view_virtual_columns);
mutation_opt computed_columns = do_load(db::schema_tables::computed_columns);
@@ -542,7 +588,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
mutation_opt dropped_columns = do_load(db::schema_tables::dropped_columns);
mutation_opt scylla_tables = do_load([] () { return db::schema_tables::scylla_tables(); });
if ((!tables || !tables->partition().row_count()) || (!columns || !columns->partition().row_count())) {
if ((empty(tables) && empty(views)) || empty(columns)) {
throw std::runtime_error(fmt::format("Failed to find {}.{} in schema tables", keyspace, table));
}
@@ -583,9 +629,17 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
gms::feature_service features(gms::feature_config_from_db_config(dbcfg));
db::schema_ctxt ctxt(dbcfg, user_type_storage, features);
if (empty(tables)) {
tables = std::move(views);
}
schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes),
std::move(dropped_columns), std::move(scylla_tables));
return db::schema_tables::create_table_from_mutations(ctxt, muts);
if (muts.is_view()) {
return db::schema_tables::create_view_from_mutations(ctxt, muts);
} else {
return db::schema_tables::create_table_from_mutations(ctxt, muts);
}
}
} // anonymous namespace
@@ -601,10 +655,18 @@ future<std::vector<schema_ptr>> load_schemas(const db::config& dbcfg, std::strin
future<schema_ptr> load_one_schema_from_file(const db::config& dbcfg, std::filesystem::path path) {
return async([&dbcfg, path] () mutable {
auto schemas = do_load_schemas(dbcfg, read_file(path));
if (schemas.size() != 1) {
throw std::runtime_error(fmt::format("Schema file {} expected to contain exactly 1 schema, actually has {}", path.native(), schemas.size()));
if (schemas.size() == 1) {
return std::move(schemas.front());
} else if (schemas.size() == 2) {
// We expect a base table at index 0 and a view/index on it at index 1
if (!schemas[0]->is_view() && schemas[1]->is_view() && schemas[0]->id() == schemas[1]->view_info()->base_id()) {
return std::move(schemas[1]);
}
}
return std::move(schemas.front());
throw std::runtime_error(fmt::format(
"Schema file {} expected to contain exactly 1 schema or 2 schemas (base table and view), actually has {} non-related schemas",
path.native(),
schemas.size()));
});
}