diff --git a/configure.py b/configure.py index 778cb57433..ca1c75089c 100755 --- a/configure.py +++ b/configure.py @@ -421,6 +421,7 @@ scylla_core = (['database.cc', 'db/consistency_level.cc', 'db/system_keyspace.cc', 'db/schema_tables.cc', + 'db/legacy_schema_migrator.cc', 'db/commitlog/commitlog.cc', 'db/commitlog/commitlog_replayer.cc', 'db/commitlog/commitlog_entry.cc', diff --git a/db/legacy_schema_migrator.cc b/db/legacy_schema_migrator.cc new file mode 100644 index 0000000000..75f70a2996 --- /dev/null +++ b/db/legacy_schema_migrator.cc @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Modified by ScyllaDB + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include +#include + +#include "database.hh" +#include "legacy_schema_migrator.hh" +#include "system_keyspace.hh" +#include "schema_tables.hh" +#include "schema_builder.hh" +#include "json.hh" +#include "cql3/query_processor.hh" + +static seastar::logger logger("legacy_schema_migrator"); + +namespace db { +namespace legacy_schema_migrator { + +// local data carriers + +class migrator { +public: + migrator(cql3::query_processor& qp) + : _qp(qp) { + } + migrator(migrator&&) = default; + + typedef db_clock::time_point time_point; + + // TODO: we dont't support triggers. + // this is a placeholder. + struct trigger { + time_point timestamp; + sstring name; + std::unordered_map options; + }; + + struct table { + time_point timestamp; + schema_ptr metadata; + std::vector triggers; + }; + + struct type { + time_point timestamp; + user_type metadata; + }; + + struct function { + time_point timestamp; + sstring ks_name; + sstring fn_name; + std::vector arg_names; + std::vector arg_types; + sstring return_type; + bool called_on_null_input; + sstring language; + sstring body; + }; + + struct aggregate { + time_point timestamp; + sstring ks_name; + sstring fn_name; + std::vector arg_names; + std::vector arg_types; + sstring return_type; + sstring final_func; + sstring initcond; + sstring state_func; + sstring state_type; + }; + + struct keyspace { + time_point timestamp; + sstring name; + bool durable_writes; + std::map replication_params; + + std::vector tables; + std::vector types; + std::vector functions; + std::vector aggregates; + }; + + class unsupported_feature : public std::runtime_error { + public: + using runtime_error::runtime_error; + }; + + static sstring fmt_query(const char* fmt, const char* table) { + return sprint(fmt, db::system_keyspace::NAME, table); + } + + typedef ::shared_ptr result_set_type; + typedef const cql3::untyped_result_set::row row_type; + + future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) { + auto fmt = "SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?"; + auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES); + auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS); + auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS); + + typedef std::tuple, future, future> result_tuple; + + return when_all(_qp.execute_internal(tq, { dst.name, cf_name }), + _qp.execute_internal(cq, { dst.name, cf_name }), + _qp.execute_internal(zq, { dst.name, cf_name })).then([this, &dst, cf_name, timestamp](result_tuple&& t) { + + result_set_type tables = std::get<0>(t).get0(); + result_set_type columns = std::get<1>(t).get0(); + result_set_type triggers = std::get<2>(t).get0(); + + row_type& td = tables->one(); + + auto ks_name = td.get_as("keyspace_name"); + auto cf_name = td.get_as("columnfamily_name"); + auto id = td.get_or("cf_id", generate_legacy_id(ks_name, cf_name)); + + schema_builder builder(dst.name, cf_name, id); + + cf_type cf = sstring_to_cf_type(td.get_or("type", sstring("standard"))); + if (cf == cf_type::super) { + fail(unimplemented::cause::SUPER); + } + + auto comparator = td.get_as("comparator"); + bool is_compound = cell_comparator::check_compound(comparator); + builder.set_is_compound(is_compound); + cell_comparator::read_collections(builder, comparator); + + bool filter_sparse = false; + + data_type default_validator = {}; + if (td.has("default_validator")) { + default_validator = db::schema_tables::parse_type(td.get_as("default_validator")); + if (default_validator->is_counter()) { + builder.set_is_counter(true); + } + } + + /* + * Determine whether or not the table is *really* dense + * We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively), + * but we can trust is_dense value of false. + */ + auto is_dense = td.get_opt("is_dense"); + if (is_dense && !*is_dense) { + builder.set_is_dense(false); + } else { + auto calulated_is_dense = [&] { + /* + * As said above, this method is only here because we need to deal with thrift upgrades. + * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once, + * then we'll have saved the "is_dense" value and will be good to go. + * + * But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need + * to infer that information without relying on it in that case. And for the most part this is + * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not + * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the + * PRIMARY KEY defined. + * + * So we need to recognize those special case CQL3 table with only a primary key. If we have some + * clustering columns, we're fine as said above. So the only problem is that we cannot decide for + * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it + * has been created in CQL3 by say: + * CREATE TABLE test (k int PRIMARY KEY) + * in which case it should not be dense. However, we can limit our margin of error by assuming we are + * in the latter case only if the comparator is exactly CompositeType(UTF8Type). + */ + stdx::optional max_cl_idx; + const cql3::untyped_result_set::row * regular = nullptr; + for (auto& row : *columns) { + auto kind_str = row.get_as("type"); + if (kind_str == "compact_value") { + continue; + } + + auto kind = db::schema_tables::deserialize_kind(kind_str); + + if (kind == column_kind::regular_column) { + if (regular != nullptr) { + return false; + } + regular = &row; + continue; + } + if (kind == column_kind::clustering_key) { + max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id())); + } + } + + auto is_cql3_only_pk_comparator = [](const sstring& comparator) { + if (!cell_comparator::check_compound(comparator)) { + return false; + } + // CMH. We don't have composites, nor a parser for it. This is a simple way of c + // checking the same. + auto comma = comparator.find(','); + if (comma != sstring::npos) { + return false; + } + auto off = comparator.find('('); + auto end = comparator.find(')'); + + return comparator.compare(off, end - off, utf8_type->name()) == 0; + }; + + if (regular) { + auto name = regular->get_or("column_name", bytes()); + // This is a lame attempt at determining if this was in fact a compact_value column + if (!max_cl_idx || (!name.empty() && name != to_bytes("value")) + || db::schema_tables::parse_type(regular->get_as("type")) != default_validator) { + return false; + } + // Ok, we will assume this was in fact a (scylla-created) compact value. + } + + if (max_cl_idx) { + auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1 + return *max_cl_idx == n; + } + + return !is_cql3_only_pk_comparator(comparator); + + }(); + + builder.set_is_dense(calulated_is_dense); + + // now, if switched to sparse, remove redundant compact_value column and the last clustering column, + // directly copying CASSANDRA-11502 logic. See CASSANDRA-11315. + + filter_sparse = !calulated_is_dense && is_dense.value_or(true); + } + + for (auto& row : *columns) { + auto kind_str = row.get_as("type"); + auto kind = db::schema_tables::deserialize_kind(kind_str); + auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0)); + auto name = row.get_or("column_name", bytes()); + auto validator = db::schema_tables::parse_type(row.get_as("validator")); + + if (filter_sparse) { + if (kind_str == "compact_value") { + continue; + } + if (kind == column_kind::clustering_key) { + if (cf == cf_type::super && component_index != 0) { + continue; + } + if (cf != cf_type::super && !is_compound) { + continue; + } + } + } + + stdx::optional index_kind; + sstring index_name; + index_options_map options; + if (row.has("index_type")) { + index_kind = schema_tables::deserialize_index_kind(row.get_as("index_type")); + } + if (row.has("index_name")) { + index_name = row.get_as("index_name"); + } + if (row.has("index_options")) { + options = json::to_map(row.get_as("index_options"), index_options_map()); + sstring type; + auto i = options.find("index_keys"); + if (i != options.end()) { + options.erase(i); + type = "KEYS"; + } + i = options.find("index_keys_and_values"); + if (i != options.end()) { + options.erase(i); + type = "KEYS_AND_VALUES"; + } + if (type.empty()) { + if (validator->is_collection() && validator->is_multi_cell()) { + type = "FULL"; + } else { + type = "VALUES"; + } + } + auto column = maybe_quote(utf8_type->to_string(name)); + options["target"] = validator->is_collection() + ? type + "(" + column + ")" + : column; + } + if (index_kind) { + // Origin assumes index_name is always set, so let's do the same + builder.with_index(index_metadata(index_name, options, *index_kind)); + } + + builder.with_column(std::move(name), std::move(validator), kind, component_index); + } + + if (td.has("read_repair_chance")) { + builder.set_read_repair_chance(td.get_as("read_repair_chance")); + } + if (td.has("local_read_repair_chance")) { + builder.set_dc_local_read_repair_chance(td.get_as("local_read_repair_chance")); + } + if (td.has("gc_grace_seconds")) { + builder.set_gc_grace_seconds(td.get_as("gc_grace_seconds")); + } + if (td.has("min_compaction_threshold")) { + builder.set_min_compaction_threshold(td.get_as("min_compaction_threshold")); + } + if (td.has("max_compaction_threshold")) { + builder.set_max_compaction_threshold(td.get_as("max_compaction_threshold")); + } + if (td.has("comment")) { + builder.set_comment(td.get_as("comment")); + } + if (td.has("memtable_flush_period_in_ms")) { + builder.set_memtable_flush_period(td.get_as("memtable_flush_period_in_ms")); + } + if (td.has("caching")) { + builder.set_caching_options(caching_options::from_sstring(td.get_as("caching"))); + } + if (td.has("default_time_to_live")) { + builder.set_default_time_to_live(gc_clock::duration(td.get_as("default_time_to_live"))); + } + if (td.has("speculative_retry")) { + builder.set_speculative_retry(td.get_as("speculative_retry")); + } + if (td.has("compaction_strategy_class")) { + auto strategy = td.get_as("compaction_strategy_class"); + try { + builder.set_compaction_strategy(sstables::compaction_strategy::type(strategy)); + } catch (const exceptions::configuration_exception& e) { + // If compaction strategy class isn't supported, fallback to size tiered. + logger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what()); + builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); + } + } + if (td.has("compaction_strategy_options")) { + builder.set_compaction_strategy_options(json::to_map(td.get_as("compaction_strategy_options"))); + } + auto comp_param = td.get_as("compression_parameters"); + compression_parameters cp(json::to_map(comp_param)); + builder.set_compressor_params(cp); + + if (td.has("min_index_interval")) { + builder.set_min_index_interval(td.get_as("min_index_interval")); + } else if (td.has("index_interval")) { // compatibility + builder.set_min_index_interval(td.get_as("index_interval")); + } + if (td.has("max_index_interval")) { + builder.set_max_index_interval(td.get_as("max_index_interval")); + } + if (td.has("bloom_filter_fp_chance")) { + builder.set_bloom_filter_fp_chance(td.get_as("bloom_filter_fp_chance")); + } else { + builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance()); + } + if (td.has("dropped_columns")) { + auto map = td.get_map("dropped_columns"); + for (auto&& e : map) { + builder.without_column(e.first, api::timestamp_type(e.second)); + }; + } + + // ignore version. we're transient + if (!triggers->empty()) { + throw unsupported_feature("triggers"); + } + + // TODO: table upgrades as in origin converter. + + dst.tables.emplace_back(table{timestamp, builder.build() }); + }); + } + + future<> read_tables(keyspace& dst) { + auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name = ?", + db::system_keyspace::legacy::COLUMNFAMILIES); + return _qp.execute_internal(query, {dst.name}).then([this, &dst](result_set_type result) { + return parallel_for_each(*result, [this, &dst](row_type& row) { + return read_table(dst, row.get_as("columnfamily_name"), row.get_as("timestamp")); + }).finally([result] {}); + }); + } + + future read_type_timestamp(keyspace& dst, sstring type_name) { + // TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot + // use the writeTime() CQL function, and must resort to a lower level. + // Origin digs up the actual cells of target partition and gets timestamp from there. + // We should do the same, but g-dam thats messy. Lets give back dung value for now. + return make_ready_future(dst.timestamp); + } + + future<> read_types(keyspace& dst) { + auto query = fmt_query("SELECT * FROM %s.%s WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES); + return _qp.execute_internal(query, {dst.name}).then([this, &dst](result_set_type result) { + return parallel_for_each(*result, [this, &dst](row_type& row) { + auto name = row.get_blob("type_name"); + auto columns = row.get_list("field_names"); + auto types = row.get_list("field_types"); + std::vector field_types; + for (auto&& value : types) { + field_types.emplace_back(db::schema_tables::parse_type(value)); + } + auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types); + return read_type_timestamp(dst, value_cast(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) { + dst.types.emplace_back(type{timestamp, ut}); + }); + }).finally([result] {}); + }); + } + + future<> read_functions(keyspace& dst) { + auto query = fmt_query("SELECT * FROM %s.%s WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS); + return _qp.execute_internal(query, {dst.name}).then([this, &dst](result_set_type result) { + if (!result->empty()) { + throw unsupported_feature("functions"); + } + }); + } + + future<> read_aggregates(keyspace& dst) { + auto query = fmt_query("SELECT * FROM %s.%s WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES); + return _qp.execute_internal(query, {dst.name}).then([this, &dst](result_set_type result) { + if (!result->empty()) { + throw unsupported_feature("aggregates"); + } + }); + } + + future read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) { + auto map = json::to_map(strategy_options); + map.emplace("class", std::move(strategy_class)); + auto ks = ::make_lw_shared(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) }); + + return read_tables(*ks).then([this, ks] { + //Collection types = readTypes(keyspaceName); + return read_types(*ks); + }).then([this, ks] { + return read_functions(*ks); + }).then([this, ks] { + return read_aggregates(*ks); + }).then([this, ks] { + return make_ready_future(std::move(*ks)); + }); + } + + future<> read_all_keyspaces() { + static auto ks_filter = [](row_type& row) { + auto ks_name = row.get_as("keyspace_name"); + return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME; + }; + + auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM %s.%s", + db::system_keyspace::legacy::KEYSPACES); + + return _qp.execute_internal(query).then([this](result_set_type result) { + auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end()); + auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end()); + return parallel_for_each(i, e, [this](row_type& row) { + return read_keyspace(row.get_as("keyspace_name") + , row.get_as("durable_writes") + , row.get_as("strategy_class") + , row.get_as("strategy_options") + , row.get_as("timestamp") + ).then([this](keyspace ks) { + _keyspaces.emplace_back(std::move(ks)); + }); + }).finally([result] {}); + }); + } + + future<> unload_legacy_tables() { + return make_ready_future(); + } + + future<> truncate_legacy_tables() { + logger.info("Truncating legacy schema tables"); + return make_ready_future(); + } + + future<> store_keyspaces_in_new_schema_tables() { + logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})", + _keyspaces.size(), db::schema_tables::v3::NAME); + + std::vector mutations; + + for (auto& ks : _keyspaces) { + auto ksm = ::make_lw_shared(ks.name + , ks.replication_params["class"] // TODO, make ksm like c3? + , ks.replication_params + , ks.durable_writes); + + // we want separate time stamps for tables/types, so cannot bulk them into the ksm. + for (auto&& m : db::schema_tables::make_create_keyspace_mutations(ksm, ks.timestamp.time_since_epoch().count(), false)) { + mutations.emplace_back(std::move(m)); + } + for (auto& t : ks.tables) { + db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations); + } + for (auto& t : ks.types) { + db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations); + } + } + return _qp.proxy().local().mutate_locally(std::move(mutations)); + } + + future<> migrate_indexes() { + return parallel_for_each(_keyspaces, [](const keyspace& ks) { + return parallel_for_each(ks.tables, [&](const table& t) { + return parallel_for_each(t.metadata->indices(), [&](const index_metadata& index) { + return system_keyspace::is_index_built(ks.name, t.metadata->cf_name() + "." + index.name()).then([&](bool built) { + if (!built) { + return make_ready_future(); + } + return system_keyspace::set_index_built(ks.name, index.name()).then([&] { + return system_keyspace::set_index_removed(ks.name, t.metadata->cf_name() + "." + index.name()); + }); + }); + }); + }); + }); + } + future<> flush_schemas() { + return _qp.proxy().local().get_db().invoke_on_all([this] (database& db) { + return parallel_for_each(db::schema_tables::ALL, [this, &db](const sstring& cf_name) { + auto& cf = db.find_column_family(db::schema_tables::NAME, cf_name); + return cf.flush(); + }); + }); + } + + future<> migrate() { + return read_all_keyspaces().then([this]() { + if (_keyspaces.empty()) { + return unload_legacy_tables(); + } + // write metadata to the new schema tables + return store_keyspaces_in_new_schema_tables().then(std::bind(&migrator::migrate_indexes, this)) + .then(std::bind(&migrator::flush_schemas, this)) + .then(std::bind(&migrator::truncate_legacy_tables, this)) + .then(std::bind(&migrator::unload_legacy_tables, this)) + .then([] { logger.info("Completed migration of legacy schema tables"); }); + }); + } + + cql3::query_processor& _qp; + std::vector _keyspaces; +}; + +} +} + +future<> +db::legacy_schema_migrator::migrate(cql3::query_processor& qp) { + return do_with(migrator(qp), std::bind(&migrator::migrate, std::placeholders::_1)); +} + diff --git a/db/legacy_schema_migrator.hh b/db/legacy_schema_migrator.hh new file mode 100644 index 0000000000..919c175f38 --- /dev/null +++ b/db/legacy_schema_migrator.hh @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Modified by ScyllaDB + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +class database; + +namespace cql3 { +class query_processor; +} + +namespace db { +namespace legacy_schema_migrator { + +future<> migrate(cql3::query_processor&); + +} +}