alternator: move create_table() to raft

This commit is contained in:
Gleb Natapov
2021-12-09 13:01:24 +02:00
parent 0cd6d283ad
commit 1491cc2906
2 changed files with 77 additions and 56 deletions

View File

@@ -67,6 +67,8 @@ logging::logger elogger("alternator-executor");
namespace alternator {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
return t;
@@ -838,13 +840,12 @@ static void verify_billing_mode(const rjson::value& request) {
}
}
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
static future<executor::request_return_type> create_table_on_shard0(tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper) {
assert(this_shard_id() == 0);
std::string table_name = get_table_name(request);
if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) {
return make_ready_future<request_return_type>(api_error::validation(
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
if (table_name.find(executor::INTERNAL_TABLE_PREFIX) == 0) {
co_return api_error::validation(format("Prefix {} is reserved for accessing internal tables", executor::INTERNAL_TABLE_PREFIX));
}
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
const rjson::value& attribute_definitions = request["AttributeDefinitions"];
@@ -857,10 +858,12 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (!range_key.empty()) {
add_column(builder, range_key, attribute_definitions, column_kind::clustering_key);
}
builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
verify_billing_mode(request);
co_await mm.schema_read_barrier();
schema_ptr partial_schema = builder.build();
// Parse GlobalSecondaryIndexes parameters before creating the base
@@ -871,12 +874,12 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::vector<sstring> where_clauses;
if (gsi) {
if (!gsi->IsArray()) {
return make_ready_future<request_return_type>(api_error::validation("GlobalSecondaryIndexes must be an array."));
co_return api_error::validation("GlobalSecondaryIndexes must be an array.");
}
for (const rjson::value& g : gsi->GetArray()) {
const rjson::value* index_name = rjson::find(g, "IndexName");
if (!index_name || !index_name->IsString()) {
return make_ready_future<request_return_type>(api_error::validation("GlobalSecondaryIndexes IndexName must be a string."));
co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string.");
}
std::string vname(view_name(table_name, index_name->GetString()));
elogger.trace("Adding GSI {}", index_name->GetString());
@@ -934,23 +937,21 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string vname(lsi_name(table_name, index_name->GetString()));
elogger.trace("Adding LSI {}", index_name->GetString());
if (range_key.empty()) {
return make_ready_future<request_return_type>(api_error::validation(
"LocalSecondaryIndex requires that the base table have a range key"));
co_return api_error::validation("LocalSecondaryIndex requires that the base table have a range key");
}
// FIXME: read and handle "Projection" parameter. This will
// require the MV code to copy just parts of the attrs map.
schema_builder view_builder(keyspace_name, vname);
auto [view_hash_key, view_range_key] = parse_key_schema(l);
if (view_hash_key != hash_key) {
return make_ready_future<request_return_type>(api_error::validation(
"LocalSecondaryIndex hash key must match the base table hash key"));
co_return api_error::validation("LocalSecondaryIndex hash key must match the base table hash key");
}
add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key);
if (view_range_key.empty()) {
return make_ready_future<request_return_type>(api_error::validation("LocalSecondaryIndex must specify a sort key"));
co_return api_error::validation("LocalSecondaryIndex must specify a sort key");
}
if (view_range_key == hash_key) {
return make_ready_future<request_return_type>(api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key"));
co_return api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key");
}
if (view_range_key != range_key) {
add_column(builder, view_range_key, attribute_definitions, column_kind::regular_column);
@@ -962,7 +963,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (!range_key.empty() && view_range_key != range_key) {
add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key);
}
view_builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
view_builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
// Note above we don't need to add virtual columns, as all
// base columns were copied to view. TODO: reconsider the need
// for virtual columns when we support Projection.
@@ -982,17 +983,17 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (sse_specification && sse_specification->IsObject()) {
rjson::value* enabled = rjson::find(*sse_specification, "Enabled");
if (!enabled || !enabled->IsBool()) {
return make_ready_future<request_return_type>(api_error("ValidationException", "SSESpecification needs boolean Enabled"));
co_return api_error("ValidationException", "SSESpecification needs boolean Enabled");
}
if (enabled->GetBool()) {
// TODO: full support for SSESpecification
return make_ready_future<request_return_type>(api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported."));
co_return api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported.");
}
}
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
add_stream_options(*stream_specification, builder, _proxy);
executor::add_stream_options(*stream_specification, builder, sp);
}
// Parse the "Tags" parameter early, so we can avoid creating the table
@@ -1021,30 +1022,52 @@ future<executor::request_return_type> executor::create_table(client_state& clien
++where_clause_it;
}
return create_keyspace(keyspace_name).handle_exception_type([] (exceptions::already_exists_exception&) {
// Ignore the fact that the keyspace may already exist. See discussion in #6340
}).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
return futurize_invoke([&] { return _mm.announce_new_column_family(schema); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
return parallel_for_each(std::move(view_builders), [this, schema] (schema_builder builder) {
return _mm.announce_new_view(view_ptr(builder.build()));
}).then([this, table_info = std::move(table_info), schema, tags_map = std::move(tags_map)] () mutable {
future<> f = make_ready_future<>();
if (!tags_map.empty()) {
f = update_tags(_mm, schema, std::move(tags_map));
}
return f.then([this] {
return wait_for_schema_agreement(_mm, db::timeout_clock::now() + 10s);
}).then([this, table_info = std::move(table_info), schema] () mutable {
rjson::value status = rjson::empty_object();
supplement_table_info(table_info, *schema, _proxy);
rjson::add(status, "TableDescription", std::move(table_info));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(status)));
});
});
}).handle_exception_type([table_name = std::move(table_name)] (exceptions::already_exists_exception&) {
return make_exception_future<executor::request_return_type>(
api_error::resource_in_use(format("Table {} already exists", table_name)));
try {
co_await mm.announce(co_await create_keyspace(keyspace_name, mm, gossiper));
} catch (exceptions::already_exists_exception&) {
// Ignore the fact that the keyspace may already exist. See discussion in #6340
}
try {
// The code should be rewritten in a way that allows creating mutations
// for all the changes in a single mutation array before announcing.
// See https://github.com/scylladb/scylla/issues/9868
co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema));
std::vector<mutation> m;
co_await parallel_for_each(std::move(view_builders), [&mm, schema, &m] (schema_builder builder) -> future<> {
auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build()));
std::move(vm.begin(), vm.end(), std::back_inserter(m));
});
co_await mm.announce(std::move(m));
if (!tags_map.empty()) {
schema_builder builder(schema);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), std::nullopt));
}
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return make_jsonable(std::move(status));
} catch(exceptions::already_exists_exception&) {
co_return api_error::resource_in_use(format("Table {} already exists", table_name));
}
}
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container()]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
co_return co_await create_table_on_shard0(tr, std::move(request), sp.local(), mm, g.local());
});
}
@@ -4063,19 +4086,19 @@ static std::map<sstring, sstring> get_network_topology_options(gms::gossiper& go
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
future<> executor::create_keyspace(std::string_view keyspace_name) {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper) {
sstring keyspace_name_str(keyspace_name);
return gms::get_all_endpoint_count(_gossiper).then([this, keyspace_name_str = std::move(keyspace_name_str)] (int endpoint_count) {
int rf = 3;
if (endpoint_count < rf) {
rf = 1;
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(_gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
return _mm.announce_new_keyspace(ksm);
});
int endpoint_count = co_await gms::get_all_endpoint_count(gossiper);
int rf = 3;
if (endpoint_count < rf) {
rf = 1;
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return mm.prepare_new_keyspace_announcement(ksm);
}
future<> executor::start() {

View File

@@ -198,8 +198,6 @@ public:
future<> start();
future<> stop() { return make_ready_future<>(); }
future<> create_keyspace(std::string_view keyspace_name);
static sstring table_name(const schema&);
static db::timeout_clock::time_point default_timeout();
static void set_default_timeout(db::timeout_clock::duration timeout);