Merge 'Alternator: implement UpdateTable operation to add or delete GSI' from Nadav Har'El
In this series we implement the UpdateTable operation to add a GSI to an existing table, or remove a GSI from a table. As the individual commit messages will explained, this required changing how Alternator stores materialized view keys - instead of insisting that these key must be real columns (that is **not** the case when adding a GSI to an existing table), the materialized view can now take as its key any Alternator attribute serialized inside the ":attrs" map holding all non-key attributes. Fixes #11567. We also fix the IndexStatus and Backfilling attributes returned by DescribeTable - as DynamoDB API users use this API to discover when a newly added GSI completed its "backfilling" (what we call "view building") stage. Fixes #11471. This series should not be backported lightly - it's a new feature and required fairly large and intrusive changes that can introduce bugs to use cases that don't even use Alternator or its UpdateTable operations - every user of CQL materialized views or secondary indexes, as well as Alternator GSI or LSI, will use modified code. **It should be backported to 2025.1**, though - this version was actually branched long after this PR was sent, and it provides a feature that was promised for 2025.1. Closes scylladb/scylladb#21989 * github.com:scylladb/scylladb: alternator: fix view build on oversized GSI key attribute mv: clean up do_delete_old_entry test/alternator: unflake test for IndexStatus test/alternator: work around unrelated bug causing test flakiness docs/alternator: adding a GSI is no longer an unimplemented feature test/alternator: remove xfail from all tests for issue 11567 alternator: overhaul implementation of GSIs and support UpdateTable mv: support regular_column_transformation key columns in view alternator: add new materialized-view computed column for item in map build: in cmake build, schema needs alternator build: build tests with Alternator alternator: add function serialized_value_if_type() mv: introduce regular_column_transformation, a new type of computed column alternator: add IndexStatus/Backfilling in DescribeTable alternator: add "LimitExceededException" error type docs/alternator: document two more unimplemented Alternator features
This commit is contained in:
@@ -88,6 +88,9 @@ public:
|
||||
static api_error table_not_found(std::string msg) {
|
||||
return api_error("TableNotFoundException", std::move(msg));
|
||||
}
|
||||
static api_error limit_exceeded(std::string msg) {
|
||||
return api_error("LimitExceededException", std::move(msg));
|
||||
}
|
||||
static api_error internal(std::string msg) {
|
||||
return api_error("InternalServerError", std::move(msg), http::reply::status_type::internal_server_error);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/consumed_capacity.hh"
|
||||
#include "auth/permission.hh"
|
||||
@@ -55,6 +56,9 @@
|
||||
#include "utils/error_injection.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "alternator/extract_from_attrs.hh"
|
||||
#include "types/types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -215,7 +219,7 @@ static void validate_table_name(const std::string& name) {
|
||||
// instead of each component individually as DynamoDB does.
|
||||
// The view_name() function assumes the table_name has already been validated
|
||||
// but validates the legality of index_name and the combination of both.
|
||||
static std::string view_name(const std::string& table_name, std::string_view index_name, const std::string& delim = ":") {
|
||||
static std::string view_name(std::string_view table_name, std::string_view index_name, const std::string& delim = ":") {
|
||||
if (index_name.length() < 3) {
|
||||
throw api_error::validation("IndexName must be at least 3 characters long");
|
||||
}
|
||||
@@ -223,7 +227,7 @@ static std::string view_name(const std::string& table_name, std::string_view ind
|
||||
throw api_error::validation(
|
||||
fmt::format("IndexName '{}' must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", index_name));
|
||||
}
|
||||
std::string ret = table_name + delim + std::string(index_name);
|
||||
std::string ret = std::string(table_name) + delim + std::string(index_name);
|
||||
if (ret.length() > max_table_name_length) {
|
||||
throw api_error::validation(
|
||||
fmt::format("The total length of TableName ('{}') and IndexName ('{}') cannot exceed {} characters",
|
||||
@@ -232,7 +236,7 @@ static std::string view_name(const std::string& table_name, std::string_view ind
|
||||
return ret;
|
||||
}
|
||||
|
||||
static std::string lsi_name(const std::string& table_name, std::string_view index_name) {
|
||||
static std::string lsi_name(std::string_view table_name, std::string_view index_name) {
|
||||
return view_name(table_name, index_name, "!:");
|
||||
}
|
||||
|
||||
@@ -469,7 +473,90 @@ static rjson::value generate_arn_for_index(const schema& schema, std::string_vie
|
||||
schema.ks_name(), schema.cf_name(), index_name));
|
||||
}
|
||||
|
||||
static rjson::value fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy const& proxy)
|
||||
// The following function checks if a given view has finished building.
|
||||
// We need this for describe_table() to know if a view is still backfilling,
|
||||
// or active.
|
||||
//
|
||||
// Currently we don't have in view_ptr the knowledge whether a view finished
|
||||
// building long ago - so checking this involves a somewhat inefficient, but
|
||||
// still node-local, process:
|
||||
// We need a table that can accurately tell that all nodes have finished
|
||||
// building this view. system.built_views is not good enough because it only
|
||||
// knows the view building status in the current node. In recent versions,
|
||||
// after PR #19745, we have a local table system.view_build_status_v2 with
|
||||
// global information, replacing the old system_distributed.view_build_status.
|
||||
// In theory, there can be a period during upgrading an old cluster when this
|
||||
// table is not yet available. However, since the IndexStatus is a new feature
|
||||
// too, it is acceptable that it doesn't yet work in the middle of the update.
|
||||
static future<bool> is_view_built(
|
||||
view_ptr view,
|
||||
service::storage_proxy& proxy,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
auto schema = proxy.data_dictionary().find_table(
|
||||
"system", db::system_keyspace::VIEW_BUILD_STATUS_V2).schema();
|
||||
// The table system.view_build_status_v2 has "keyspace_name" and
|
||||
// "view_name" as the partition key, and each clustering row has
|
||||
// "host_id" as clustering key and a string "status". We need to
|
||||
// read a single partition:
|
||||
partition_key pk = partition_key::from_exploded(*schema,
|
||||
{utf8_type->decompose(view->ks_name()),
|
||||
utf8_type->decompose(view->cf_name())});
|
||||
dht::partition_range_vector partition_ranges{
|
||||
dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
auto selection = cql3::selection::selection::wildcard(schema); // only for get_query_options()!
|
||||
auto partition_slice = query::partition_slice(
|
||||
{query::clustering_range::make_open_ended_both_sides()},
|
||||
{}, // static columns
|
||||
{schema->get_column_definition("status")->id}, // regular columns
|
||||
selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(
|
||||
schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
service::storage_proxy::coordinator_query_result qr =
|
||||
co_await proxy.query(
|
||||
schema, std::move(command), std::move(partition_ranges),
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
service::storage_proxy::coordinator_query_options(
|
||||
executor::default_timeout(), std::move(permit), client_state, trace_state));
|
||||
query::result_set rs = query::result_set::from_raw_result(
|
||||
schema, partition_slice, *qr.query_result);
|
||||
std::unordered_map<locator::host_id, sstring> statuses;
|
||||
for (auto&& r : rs.rows()) {
|
||||
auto host_id = r.get<utils::UUID>("host_id");
|
||||
auto status = r.get<sstring>("status");
|
||||
if (host_id && status) {
|
||||
statuses.emplace(locator::host_id(*host_id), *status);
|
||||
}
|
||||
}
|
||||
// A view is considered "built" if all nodes reported SUCCESS in having
|
||||
// built this view. Note that we need this "SUCCESS" for all nodes in the
|
||||
// cluster - even those that are temporarily down (their success is known
|
||||
// by this node, even if they are down). Conversely, we don't care what is
|
||||
// the recorded status for any node which is no longer in the cluster - it
|
||||
// is possible we forgot to erase the status of nodes that left the
|
||||
// cluster, but here we just ignore them and look at the nodes actually
|
||||
// in the topology.
|
||||
bool all_built = true;
|
||||
auto token_metadata = proxy.get_token_metadata_ptr();
|
||||
token_metadata->get_topology().for_each_node(
|
||||
[&] (const locator::node& node) {
|
||||
// Note: we could skip nodes in DCs which have no replication of
|
||||
// this view. However, in practice even those nodes would run
|
||||
// the view building (and just see empty content) so we don't
|
||||
// need to bother with this skipping.
|
||||
auto it = statuses.find(node.host_id());
|
||||
if (it == statuses.end() || it->second != "SUCCESS") {
|
||||
all_built = false;
|
||||
}
|
||||
});
|
||||
co_return all_built;
|
||||
|
||||
}
|
||||
|
||||
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
auto tags_ptr = db::get_tags_of_table(schema);
|
||||
@@ -548,7 +635,22 @@ static rjson::value fill_table_description(schema_ptr schema, table_status tbl_s
|
||||
// FIXME: we have to get ProjectionType from the schema when it is added
|
||||
rjson::add(view_entry, "Projection", std::move(projection));
|
||||
// Local secondary indexes are marked by an extra '!' sign occurring before the ':' delimiter
|
||||
rjson::value& index_array = (delim_it > 1 && cf_name[delim_it-1] == '!') ? lsi_array : gsi_array;
|
||||
bool is_lsi = (delim_it > 1 && cf_name[delim_it-1] == '!');
|
||||
// Add IndexStatus and Backfilling flags, but only for GSIs -
|
||||
// LSIs can only be created with the table itself and do not
|
||||
// have a status. Alternator schema operations are synchronous
|
||||
// so only two combinations of these flags are possible: ACTIVE
|
||||
// (for a built view) or CREATING+Backfilling (if view building
|
||||
// is in progress).
|
||||
if (!is_lsi) {
|
||||
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
|
||||
rjson::add(view_entry, "IndexStatus", "ACTIVE");
|
||||
} else {
|
||||
rjson::add(view_entry, "IndexStatus", "CREATING");
|
||||
rjson::add(view_entry, "Backfilling", rjson::value(true));
|
||||
}
|
||||
}
|
||||
rjson::value& index_array = is_lsi ? lsi_array : gsi_array;
|
||||
rjson::push_back(index_array, std::move(view_entry));
|
||||
}
|
||||
if (!lsi_array.Empty()) {
|
||||
@@ -572,7 +674,7 @@ static rjson::value fill_table_description(schema_ptr schema, table_status tbl_s
|
||||
executor::supplement_table_stream_info(table_description, *schema, proxy);
|
||||
|
||||
// FIXME: still missing some response fields (issue #5026)
|
||||
return table_description;
|
||||
co_return table_description;
|
||||
}
|
||||
|
||||
bool is_alternator_keyspace(const sstring& ks_name) {
|
||||
@@ -591,11 +693,11 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
rjson::value table_description = fill_table_description(schema, table_status::active, _proxy);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Table", std::move(table_description));
|
||||
elogger.trace("returning {}", response);
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(response)));
|
||||
co_return make_jsonable(std::move(response));
|
||||
}
|
||||
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
|
||||
@@ -656,7 +758,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
auto& p = _proxy.container();
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = fill_table_description(schema, table_status::deleting, _proxy);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
// FIXME: the following needs to be in a loop. If mm.announce() below
|
||||
@@ -704,7 +806,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
co_return make_jsonable(std::move(response));
|
||||
}
|
||||
|
||||
static data_type parse_key_type(const std::string& type) {
|
||||
static data_type parse_key_type(std::string_view type) {
|
||||
// Note that keys are only allowed to be string, blob or number (S/B/N).
|
||||
// The other types: boolean and various lists or sets - are not allowed.
|
||||
if (type.length() == 1) {
|
||||
@@ -719,7 +821,7 @@ static data_type parse_key_type(const std::string& type) {
|
||||
}
|
||||
|
||||
|
||||
static void add_column(schema_builder& builder, const std::string& name, const rjson::value& attribute_definitions, column_kind kind) {
|
||||
static void add_column(schema_builder& builder, const std::string& name, const rjson::value& attribute_definitions, column_kind kind, bool computed_column=false) {
|
||||
// FIXME: Currently, the column name ATTRS_COLUMN_NAME is not allowed
|
||||
// because we use it for our untyped attribute map, and we can't have a
|
||||
// second column with the same name. We should fix this, by renaming
|
||||
@@ -731,7 +833,16 @@ static void add_column(schema_builder& builder, const std::string& name, const r
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == name) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
builder.with_column(to_bytes(name), parse_key_type(type), kind);
|
||||
data_type dt = parse_key_type(type);
|
||||
if (computed_column) {
|
||||
// Computed column for GSI (doesn't choose a real column as-is
|
||||
// but rather extracts a single value from the ":attrs" map)
|
||||
alternator_type at = type_info_from_string(type).atype;
|
||||
builder.with_computed_column(to_bytes(name), dt, kind,
|
||||
std::make_unique<extract_from_attrs_column_computation>(to_bytes(name), at));
|
||||
} else {
|
||||
builder.with_column(to_bytes(name), dt, kind);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -1072,6 +1183,87 @@ static std::unordered_set<std::string> validate_attribute_definitions(const rjso
|
||||
return seen_attribute_names;
|
||||
}
|
||||
|
||||
// The following "extract_from_attrs_column_computation" implementation is
|
||||
// what allows Alternator GSIs to use in a materialized view's key a member
|
||||
// from the ":attrs" map instead of a real column in the schema:
|
||||
|
||||
const bytes extract_from_attrs_column_computation::MAP_NAME = executor::ATTRS_COLUMN_NAME;
|
||||
|
||||
column_computation_ptr extract_from_attrs_column_computation::clone() const {
|
||||
return std::make_unique<extract_from_attrs_column_computation>(*this);
|
||||
}
|
||||
|
||||
// Serialize the *definition* of this column computation into a JSON
|
||||
// string with a unique "type" string - TYPE_NAME - which then causes
|
||||
// column_computation::deserialize() to create an object from this class.
|
||||
bytes extract_from_attrs_column_computation::serialize() const {
|
||||
rjson::value ret = rjson::empty_object();
|
||||
rjson::add(ret, "type", TYPE_NAME);
|
||||
rjson::add(ret, "attr_name", rjson::from_string(to_string_view(_attr_name)));
|
||||
rjson::add(ret, "desired_type", represent_type(_desired_type).ident);
|
||||
return to_bytes(rjson::print(ret));
|
||||
}
|
||||
|
||||
// Construct an extract_from_attrs_column_computation object based on the
|
||||
// saved output of serialize(). Calls on_internal_error() if the string
|
||||
// doesn't match the expected output format of serialize(). "type" is not
|
||||
// checked - we assume the caller (column_computation::deserialize()) won't
|
||||
// call this constructor if "type" doesn't match.
|
||||
extract_from_attrs_column_computation::extract_from_attrs_column_computation(const rjson::value &v) {
|
||||
const rjson::value* attr_name = rjson::find(v, "attr_name");
|
||||
if (attr_name->IsString()) {
|
||||
_attr_name = bytes(to_bytes_view(rjson::to_string_view(*attr_name)));
|
||||
const rjson::value* desired_type = rjson::find(v, "desired_type");
|
||||
if (desired_type->IsString()) {
|
||||
_desired_type = type_info_from_string(rjson::to_string_view(*desired_type)).atype;
|
||||
switch (_desired_type) {
|
||||
case alternator_type::S:
|
||||
case alternator_type::B:
|
||||
case alternator_type::N:
|
||||
// We're done
|
||||
return;
|
||||
default:
|
||||
// Fall through to on_internal_error below.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
on_internal_error(elogger, format("Improperly formatted alternator::extract_from_attrs_column_computation computed column definition: {}", v));
|
||||
}
|
||||
|
||||
regular_column_transformation::result extract_from_attrs_column_computation::compute_value(
|
||||
const schema& schema,
|
||||
const partition_key& key,
|
||||
const db::view::clustering_or_static_row& row) const
|
||||
{
|
||||
const column_definition* attrs_col = schema.get_column_definition(MAP_NAME);
|
||||
if (!attrs_col || !attrs_col->is_regular() || !attrs_col->is_multi_cell()) {
|
||||
on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value() on a table without an attrs map");
|
||||
}
|
||||
// Look for the desired attribute _attr_name in the attrs_col map in row:
|
||||
const atomic_cell_or_collection* attrs = row.cells().find_cell(attrs_col->id);
|
||||
if (!attrs) {
|
||||
return regular_column_transformation::result();
|
||||
}
|
||||
collection_mutation_view cmv = attrs->as_collection_mutation();
|
||||
return cmv.with_deserialized(*attrs_col->type, [this] (const collection_mutation_view_description& cmvd) {
|
||||
for (auto&& [key, cell] : cmvd.cells) {
|
||||
if (key == _attr_name) {
|
||||
return regular_column_transformation::result(cell,
|
||||
std::bind(serialized_value_if_type, std::placeholders::_1, _desired_type));
|
||||
}
|
||||
}
|
||||
return regular_column_transformation::result();
|
||||
});
|
||||
}
|
||||
|
||||
// extract_from_attrs_column_computation needs the whole row to compute
|
||||
// value, it cann't use just the partition key.
|
||||
bytes extract_from_attrs_column_computation::compute_value(const schema&, const partition_key&) const {
|
||||
on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value called without row");
|
||||
}
|
||||
|
||||
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
@@ -1110,67 +1302,15 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
|
||||
schema_ptr partial_schema = builder.build();
|
||||
|
||||
// Parse GlobalSecondaryIndexes parameters before creating the base
|
||||
// table, so if we have a parse errors we can fail without creating
|
||||
// Parse Local/GlobalSecondaryIndexes parameters before creating the
|
||||
// base table, so if we have a parse errors we can fail without creating
|
||||
// any table.
|
||||
const rjson::value* gsi = rjson::find(request, "GlobalSecondaryIndexes");
|
||||
std::vector<schema_builder> view_builders;
|
||||
std::unordered_set<std::string> index_names;
|
||||
if (gsi) {
|
||||
if (!gsi->IsArray()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes must be an array.");
|
||||
}
|
||||
for (const rjson::value& g : gsi->GetArray()) {
|
||||
const rjson::value* index_name_v = rjson::find(g, "IndexName");
|
||||
if (!index_name_v || !index_name_v->IsString()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string.");
|
||||
}
|
||||
std::string_view index_name = rjson::to_string_view(*index_name_v);
|
||||
auto [it, added] = index_names.emplace(index_name);
|
||||
if (!added) {
|
||||
co_return api_error::validation(fmt::format("Duplicate IndexName '{}', ", index_name));
|
||||
}
|
||||
std::string vname(view_name(table_name, index_name));
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
// require the MV code to copy just parts of the attrs map.
|
||||
schema_builder view_builder(keyspace_name, vname);
|
||||
auto [view_hash_key, view_range_key] = parse_key_schema(g);
|
||||
if (partial_schema->get_column_definition(to_bytes(view_hash_key)) == nullptr) {
|
||||
// A column that exists in a global secondary index is upgraded from being a map entry
|
||||
// to having a regular column definition in the base schema
|
||||
add_column(builder, view_hash_key, attribute_definitions, column_kind::regular_column);
|
||||
}
|
||||
add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key);
|
||||
unused_attribute_definitions.erase(view_hash_key);
|
||||
if (!view_range_key.empty()) {
|
||||
if (partial_schema->get_column_definition(to_bytes(view_range_key)) == nullptr) {
|
||||
// A column that exists in a global secondary index is upgraded from being a map entry
|
||||
// to having a regular column definition in the base schema
|
||||
if (partial_schema->get_column_definition(to_bytes(view_hash_key)) == nullptr) {
|
||||
// FIXME: this is alternator limitation only, because Scylla's materialized views
|
||||
// we use underneath do not allow more than 1 base regular column to be part of the MV key
|
||||
elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions");
|
||||
}
|
||||
add_column(builder, view_range_key, attribute_definitions, column_kind::regular_column);
|
||||
}
|
||||
add_column(view_builder, view_range_key, attribute_definitions, column_kind::clustering_key);
|
||||
unused_attribute_definitions.erase(view_range_key);
|
||||
}
|
||||
// Base key columns which aren't part of the index's key need to
|
||||
// be added to the view nonetheless, as (additional) clustering
|
||||
// key(s).
|
||||
if (hash_key != view_hash_key && hash_key != view_range_key) {
|
||||
add_column(view_builder, hash_key, attribute_definitions, column_kind::clustering_key);
|
||||
}
|
||||
if (!range_key.empty() && range_key != view_hash_key && range_key != view_range_key) {
|
||||
add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key);
|
||||
}
|
||||
// GSIs have no tags:
|
||||
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>());
|
||||
view_builders.emplace_back(std::move(view_builder));
|
||||
}
|
||||
}
|
||||
// Remember the attributes used for LSI keys. Since LSI must be created
|
||||
// with the table, we make these attributes real schema columns, and need
|
||||
// to remember this below if the same attributes are used as GSI keys.
|
||||
std::unordered_set<std::string> lsi_range_keys;
|
||||
|
||||
const rjson::value* lsi = rjson::find(request, "LocalSecondaryIndexes");
|
||||
if (lsi) {
|
||||
@@ -1228,9 +1368,68 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
std::map<sstring, sstring> tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}};
|
||||
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
view_builders.emplace_back(std::move(view_builder));
|
||||
lsi_range_keys.emplace(view_range_key);
|
||||
}
|
||||
}
|
||||
|
||||
const rjson::value* gsi = rjson::find(request, "GlobalSecondaryIndexes");
|
||||
if (gsi) {
|
||||
if (!gsi->IsArray()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes must be an array.");
|
||||
}
|
||||
for (const rjson::value& g : gsi->GetArray()) {
|
||||
const rjson::value* index_name_v = rjson::find(g, "IndexName");
|
||||
if (!index_name_v || !index_name_v->IsString()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string.");
|
||||
}
|
||||
std::string_view index_name = rjson::to_string_view(*index_name_v);
|
||||
auto [it, added] = index_names.emplace(index_name);
|
||||
if (!added) {
|
||||
co_return api_error::validation(fmt::format("Duplicate IndexName '{}', ", index_name));
|
||||
}
|
||||
std::string vname(view_name(table_name, index_name));
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
// require the MV code to copy just parts of the attrs map.
|
||||
schema_builder view_builder(keyspace_name, vname);
|
||||
auto [view_hash_key, view_range_key] = parse_key_schema(g);
|
||||
|
||||
// If an attribute is already a real column in the base table
|
||||
// (i.e., a key attribute) or we already made it a real column
|
||||
// as an LSI key above, we can use it directly as a view key.
|
||||
// Otherwise, we need to add it as a "computed column", which
|
||||
// extracts and deserializes the attribute from the ":attrs" map.
|
||||
bool view_hash_key_real_column =
|
||||
partial_schema->get_column_definition(to_bytes(view_hash_key)) ||
|
||||
lsi_range_keys.contains(view_hash_key);
|
||||
add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key, !view_hash_key_real_column);
|
||||
unused_attribute_definitions.erase(view_hash_key);
|
||||
if (!view_range_key.empty()) {
|
||||
bool view_range_key_real_column =
|
||||
partial_schema->get_column_definition(to_bytes(view_range_key)) ||
|
||||
lsi_range_keys.contains(view_range_key);
|
||||
add_column(view_builder, view_range_key, attribute_definitions, column_kind::clustering_key, !view_range_key_real_column);
|
||||
if (!partial_schema->get_column_definition(to_bytes(view_range_key)) &&
|
||||
!partial_schema->get_column_definition(to_bytes(view_hash_key))) {
|
||||
// FIXME: This warning should go away. See issue #6714
|
||||
elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions");
|
||||
}
|
||||
unused_attribute_definitions.erase(view_range_key);
|
||||
}
|
||||
// Base key columns which aren't part of the index's key need to
|
||||
// be added to the view nonetheless, as (additional) clustering
|
||||
// key(s).
|
||||
if (hash_key != view_hash_key && hash_key != view_range_key) {
|
||||
add_column(view_builder, hash_key, attribute_definitions, column_kind::clustering_key);
|
||||
}
|
||||
if (!range_key.empty() && range_key != view_hash_key && range_key != view_range_key) {
|
||||
add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key);
|
||||
}
|
||||
// GSIs have no tags:
|
||||
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>());
|
||||
view_builders.emplace_back(std::move(view_builder));
|
||||
}
|
||||
}
|
||||
if (!unused_attribute_definitions.empty()) {
|
||||
co_return api_error::validation(fmt::format(
|
||||
"AttributeDefinitions defines spurious attributes not used by any KeySchema: {}",
|
||||
@@ -1371,12 +1570,37 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
});
|
||||
}
|
||||
|
||||
// When UpdateTable adds a GSI, the type of its key columns must be specified
|
||||
// in a AttributeDefinitions. If one of these key columns are *already* key
|
||||
// columns of the base table or any of its prior GSIs or LSIs, the type
|
||||
// given in AttributeDefinitions must match the type of the existing key -
|
||||
// otherise Alternator will not know which type to enforce in new writes.
|
||||
// This function checks for such conflicts. It assumes that the structure of
|
||||
// the given attribute_definitions was already validated (with
|
||||
// validate_attribute_definitions()).
|
||||
// This function should be called multiple times - once for the base schema
|
||||
// and once for each of its views (existing GSIs and LSIs on this table).
|
||||
static void check_attribute_definitions_conflicts(const rjson::value& attribute_definitions, const schema& schema) {
|
||||
for (auto& def : schema.primary_key_columns()) {
|
||||
std::string def_type = type_to_string(def.type);
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (type != def_type) {
|
||||
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::update_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.update_table++;
|
||||
elogger.trace("Updating table {}", request);
|
||||
|
||||
static const std::vector<sstring> unsupported = {
|
||||
"GlobalSecondaryIndexUpdates",
|
||||
"ProvisionedThroughput",
|
||||
"ReplicaUpdates",
|
||||
"SSESpecification",
|
||||
@@ -1388,11 +1612,14 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
}
|
||||
}
|
||||
|
||||
bool empty_request = true;
|
||||
|
||||
if (rjson::find(request, "BillingMode")) {
|
||||
empty_request = false;
|
||||
verify_billing_mode(request);
|
||||
}
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard()]
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
// FIXME: the following needs to be in a loop. If mm.announce() below
|
||||
// fails, we need to retry the whole thing.
|
||||
@@ -1412,6 +1639,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
|
||||
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
empty_request = false;
|
||||
add_stream_options(*stream_specification, builder, p.local());
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#16317)
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
@@ -1423,8 +1651,162 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
}
|
||||
|
||||
auto schema = builder.build();
|
||||
std::vector<view_ptr> new_views;
|
||||
std::vector<std::string> dropped_views;
|
||||
|
||||
rjson::value* gsi_updates = rjson::find(request, "GlobalSecondaryIndexUpdates");
|
||||
if (gsi_updates) {
|
||||
if (!gsi_updates->IsArray()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates must be an array");
|
||||
}
|
||||
if (gsi_updates->Size() > 1) {
|
||||
// Although UpdateTable takes an array of operations and could
|
||||
// support multiple Create and/or Delete operations in one
|
||||
// command, DynamoDB doesn't actually allows this, and throws
|
||||
// a LimitExceededException if this is attempted.
|
||||
co_return api_error::limit_exceeded("GlobalSecondaryIndexUpdates only allows one index creation or deletion");
|
||||
}
|
||||
if (gsi_updates->Size() == 1) {
|
||||
empty_request = false;
|
||||
if (!(*gsi_updates)[0].IsObject() || (*gsi_updates)[0].MemberCount() != 1) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates array must contain one object with a Create, Delete or Update operation");
|
||||
}
|
||||
auto it = (*gsi_updates)[0].MemberBegin();
|
||||
const std::string_view op = rjson::to_string_view(it->name);
|
||||
if (!it->value.IsObject()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates entries must be objects");
|
||||
}
|
||||
const rjson::value* index_name_v = rjson::find(it->value, "IndexName");
|
||||
if (!index_name_v || !index_name_v->IsString()) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates operation must have IndexName");
|
||||
}
|
||||
std::string_view index_name = rjson::to_string_view(*index_name_v);
|
||||
std::string_view table_name = schema->cf_name();
|
||||
std::string_view keyspace_name = schema->ks_name();
|
||||
std::string vname(view_name(table_name, index_name));
|
||||
if (op == "Create") {
|
||||
const rjson::value* attribute_definitions = rjson::find(request, "AttributeDefinitions");
|
||||
if (!attribute_definitions) {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates Create needs AttributeDefinitions");
|
||||
}
|
||||
std::unordered_set<std::string> unused_attribute_definitions =
|
||||
validate_attribute_definitions(*attribute_definitions);
|
||||
check_attribute_definitions_conflicts(*attribute_definitions, *schema);
|
||||
for (auto& view : p.local().data_dictionary().find_column_family(tab).views()) {
|
||||
check_attribute_definitions_conflicts(*attribute_definitions, *view);
|
||||
}
|
||||
|
||||
if (p.local().data_dictionary().has_schema(keyspace_name, vname)) {
|
||||
// Surprisingly, DynamoDB uses validation error here, not resource_in_use
|
||||
co_return api_error::validation(fmt::format(
|
||||
"GSI {} already exists in table {}", index_name, table_name));
|
||||
}
|
||||
if (p.local().data_dictionary().has_schema(keyspace_name, lsi_name(table_name, index_name))) {
|
||||
co_return api_error::validation(fmt::format(
|
||||
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
|
||||
}
|
||||
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
// require the MV code to copy just parts of the attrs map.
|
||||
schema_builder view_builder(keyspace_name, vname);
|
||||
auto [view_hash_key, view_range_key] = parse_key_schema(it->value);
|
||||
// If an attribute is already a real column in the base
|
||||
// table (i.e., a key attribute in the base table or LSI),
|
||||
// we can use it directly as a view key. Otherwise, we
|
||||
// need to add it as a "computed column", which extracts
|
||||
// and deserializes the attribute from the ":attrs" map.
|
||||
bool view_hash_key_real_column =
|
||||
schema->get_column_definition(to_bytes(view_hash_key));
|
||||
add_column(view_builder, view_hash_key, *attribute_definitions, column_kind::partition_key, !view_hash_key_real_column);
|
||||
unused_attribute_definitions.erase(view_hash_key);
|
||||
if (!view_range_key.empty()) {
|
||||
bool view_range_key_real_column =
|
||||
schema->get_column_definition(to_bytes(view_range_key));
|
||||
add_column(view_builder, view_range_key, *attribute_definitions, column_kind::clustering_key, !view_range_key_real_column);
|
||||
if (!schema->get_column_definition(to_bytes(view_range_key)) &&
|
||||
!schema->get_column_definition(to_bytes(view_hash_key))) {
|
||||
// FIXME: This warning should go away. See issue #6714
|
||||
elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions");
|
||||
}
|
||||
unused_attribute_definitions.erase(view_range_key);
|
||||
}
|
||||
// Surprisingly, although DynamoDB checks for unused
|
||||
// AttributeDefinitions in CreateTable, it does not
|
||||
// check it in UpdateTable. We decided to check anyway.
|
||||
if (!unused_attribute_definitions.empty()) {
|
||||
co_return api_error::validation(fmt::format(
|
||||
"AttributeDefinitions defines spurious attributes not used by any KeySchema: {}",
|
||||
unused_attribute_definitions));
|
||||
}
|
||||
// Base key columns which aren't part of the index's key need to
|
||||
// be added to the view nonetheless, as (additional) clustering
|
||||
// key(s).
|
||||
for (auto& def : schema->primary_key_columns()) {
|
||||
if (def.name_as_text() != view_hash_key && def.name_as_text() != view_range_key) {
|
||||
view_builder.with_column(def.name(), def.type, column_kind::clustering_key);
|
||||
}
|
||||
}
|
||||
// GSIs have no tags:
|
||||
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>());
|
||||
// Note below we don't need to add virtual columns, as all
|
||||
// base columns were copied to view. TODO: reconsider the need
|
||||
// for virtual columns when we support Projection.
|
||||
for (const column_definition& regular_cdef : schema->regular_columns()) {
|
||||
if (!view_builder.has_column(*cql3::to_identifier(regular_cdef))) {
|
||||
view_builder.with_column(regular_cdef.name(), regular_cdef.type, column_kind::regular_column);
|
||||
}
|
||||
}
|
||||
const bool include_all_columns = true;
|
||||
view_builder.with_view_info(*schema, include_all_columns, ""/*where clause*/);
|
||||
new_views.emplace_back(view_builder.build());
|
||||
} else if (op == "Delete") {
|
||||
elogger.trace("Deleting GSI {}", index_name);
|
||||
if (!p.local().data_dictionary().has_schema(keyspace_name, vname)) {
|
||||
co_return api_error::resource_not_found(fmt::format("No GSI {} in table {}", index_name, table_name));
|
||||
}
|
||||
dropped_views.emplace_back(vname);
|
||||
} else if (op == "Update") {
|
||||
co_return api_error::validation("GlobalSecondaryIndexUpdates Update not yet supported");
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("GlobalSecondaryIndexUpdates supports a Create, Delete or Update operation, saw '{}'", op));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (empty_request) {
|
||||
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
|
||||
}
|
||||
|
||||
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
|
||||
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
|
||||
for (view_ptr view : new_views) {
|
||||
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
|
||||
std::move(m2.begin(), m2.end(), std::back_inserter(m));
|
||||
}
|
||||
for (const std::string& view_name : dropped_views) {
|
||||
auto m2 = co_await service::prepare_view_drop_announcement(p.local(), schema->ks_name(), view_name, group0_guard.write_timestamp());
|
||||
std::move(m2.begin(), m2.end(), std::back_inserter(m));
|
||||
}
|
||||
// If a role is allowed to create a GSI, we should give it permissions
|
||||
// to read the GSI it just created. This is known as "auto-grant".
|
||||
// Also, when we delete a GSI we should revoke any permissions set on
|
||||
// it - so if it's ever created again the old permissions wouldn't be
|
||||
// remembered for the new GSI. This is known as "auto-revoke"
|
||||
if (client_state_other_shard.get().user() && (!new_views.empty() || !dropped_views.empty())) {
|
||||
service::group0_batch mc(std::move(group0_guard));
|
||||
mc.add_mutations(std::move(m));
|
||||
for (view_ptr view : new_views) {
|
||||
auto resource = auth::make_data_resource(view->ks_name(), view->cf_name());
|
||||
co_await auth::grant_applicable_permissions(
|
||||
*client_state_other_shard.get().get_auth_service(), *client_state_other_shard.get().user(), resource, mc);
|
||||
}
|
||||
for (const auto& view_name : dropped_views) {
|
||||
auto resource = auth::make_data_resource(schema->ks_name(), view_name);
|
||||
co_await auth::revoke_all(*client_state_other_shard.get().get_auth_service(), resource, mc);
|
||||
}
|
||||
std::tie(m, group0_guard) = co_await std::move(mc).extract();
|
||||
}
|
||||
|
||||
co_await mm.announce(std::move(m), std::move(group0_guard), format("alternator-executor: update {} table", tab->cf_name()));
|
||||
|
||||
@@ -1546,7 +1928,7 @@ public:
|
||||
struct delete_item {};
|
||||
struct put_item {};
|
||||
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item);
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
|
||||
// put_or_delete_item doesn't keep a reference to schema (so it can be
|
||||
// moved between shards for LWT) so it needs to be given again to build():
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts) const;
|
||||
@@ -1578,7 +1960,75 @@ static inline const column_definition* find_attribute(const schema& schema, cons
|
||||
return cdef;
|
||||
}
|
||||
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item)
|
||||
|
||||
// Get a list of all attributes that serve as a key attributes for any of the
|
||||
// GSIs or LSIs of this table, and the declared type for each (can be only
|
||||
// "S", "B", or "N"). The implementation below will also list the base table's
|
||||
// key columns (they are the views' clustering keys).
|
||||
std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table t) {
|
||||
std::unordered_map<bytes, std::string> ret;
|
||||
for (const view_ptr& v : t.views()) {
|
||||
for (const column_definition& cdef : v->partition_key_columns()) {
|
||||
ret[cdef.name()] = type_to_string(cdef.type);
|
||||
}
|
||||
for (const column_definition& cdef : v->clustering_key_columns()) {
|
||||
ret[cdef.name()] = type_to_string(cdef.type);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// When an attribute is a key (hash or sort) of one of the GSIs on a table,
|
||||
// DynamoDB refuses an update to that attribute with an unsuitable value.
|
||||
// Unsuitable values are:
|
||||
// 1. An empty string (those are normally allowed as values, but not allowed
|
||||
// as keys, including GSI keys).
|
||||
// 2. A value with a type different than that declared for the GSI key.
|
||||
// Normally non-key attributes can take values of any type (DynamoDB is
|
||||
// schema-less), but as soon as an attribute is used as a GSI key, it
|
||||
// must be set only to the specific type declared for that key.
|
||||
// (Note that a missing value for an GSI key attribute is fine - the update
|
||||
// will happen on the base table, but won't reach the view table. In this
|
||||
// case, this function simply won't be called for this attribute.)
|
||||
//
|
||||
// This function checks if the given attribute update is an update to some
|
||||
// GSI's key, and if the value is unsuitable, a api_error::validation is
|
||||
// thrown. The checking here is similar to the checking done in
|
||||
// get_key_from_typed_value() for the base table's key columns.
|
||||
//
|
||||
// validate_value_if_gsi_key() should only be called after validate_value()
|
||||
// already validated that the value itself has a valid form.
|
||||
static inline void validate_value_if_gsi_key(
|
||||
std::unordered_map<bytes, std::string> key_attributes,
|
||||
const bytes& attribute,
|
||||
const rjson::value& value) {
|
||||
if (key_attributes.empty()) {
|
||||
return;
|
||||
}
|
||||
auto it = key_attributes.find(attribute);
|
||||
if (it == key_attributes.end()) {
|
||||
// Given attribute is not a key column with a fixed type, so no
|
||||
// more validation to do.
|
||||
return;
|
||||
}
|
||||
const std::string& expected_type = it->second;
|
||||
// We assume that validate_value() was previously called on this value,
|
||||
// so value is known to be of the proper format (an object with one
|
||||
// member, whose key and value are strings)
|
||||
std::string_view value_type = rjson::to_string_view(value.MemberBegin()->name);
|
||||
if (expected_type != value_type) {
|
||||
throw api_error::validation(fmt::format(
|
||||
"Type mismatch: expected type {} for GSI key attribute {}, got type {}",
|
||||
expected_type, to_string_view(attribute), value_type));
|
||||
}
|
||||
std::string_view value_content = rjson::to_string_view(value.MemberBegin()->value);
|
||||
if (value_content.empty()) {
|
||||
throw api_error::validation(fmt::format(
|
||||
"GSI key attribute {} cannot be set to an empty string", to_string_view(attribute)));
|
||||
}
|
||||
}
|
||||
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
|
||||
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
@@ -1588,6 +2038,9 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
_length_in_bytes += column_name.size();
|
||||
if (!cdef) {
|
||||
// This attribute may be a key column of one of the GSI, in which
|
||||
// case there are some limitations on the value
|
||||
validate_value_if_gsi_key(key_attributes, column_name, it->value);
|
||||
bytes value = serialize_item(it->value);
|
||||
if (value.size()) {
|
||||
// ScyllaDB uses one extra byte compared to DynamoDB for the bytes length
|
||||
@@ -1595,7 +2048,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
}
|
||||
_cells->push_back({std::move(column_name), serialize_item(it->value)});
|
||||
} else if (!cdef->is_primary_key()) {
|
||||
// Fixed-type regular column can be used for GSI key
|
||||
// Fixed-type regular column can be used for LSI key
|
||||
bytes value = get_key_from_typed_value(it->value, *cdef);
|
||||
_cells->push_back({std::move(column_name),
|
||||
value});
|
||||
@@ -1954,7 +2407,8 @@ public:
|
||||
parsed::condition_expression _condition_expression;
|
||||
put_item_operation(service::storage_proxy& proxy, rjson::value&& request)
|
||||
: rmw_operation(proxy, std::move(request))
|
||||
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{}) {
|
||||
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
|
||||
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
|
||||
_pk = _mutation_builder.pk();
|
||||
_ck = _mutation_builder.ck();
|
||||
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
|
||||
@@ -2315,7 +2769,8 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
const rjson::value& put_request = r->value;
|
||||
const rjson::value& item = put_request["Item"];
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
item, schema, put_or_delete_item::put_item{}));
|
||||
item, schema, put_or_delete_item::put_item{},
|
||||
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name()))));
|
||||
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
|
||||
if (used_keys.contains(mut_key)) {
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
@@ -2859,6 +3314,10 @@ public:
|
||||
// them by top-level attribute, and detects forbidden overlaps/conflicts.
|
||||
attribute_path_map<parsed::update_expression::action> _update_expression;
|
||||
|
||||
// Saved list of GSI keys in the table being updated, used for
|
||||
// validate_value_if_gsi_key()
|
||||
std::unordered_map<bytes, std::string> _key_attributes;
|
||||
|
||||
parsed::condition_expression _condition_expression;
|
||||
|
||||
update_item_operation(service::storage_proxy& proxy, rjson::value&& request);
|
||||
@@ -2950,6 +3409,9 @@ update_item_operation::update_item_operation(service::storage_proxy& proxy, rjso
|
||||
if (expression_attribute_values) {
|
||||
_consumed_capacity._total_bytes += estimate_value_size(*expression_attribute_values);
|
||||
}
|
||||
|
||||
_key_attributes = si_key_attributes(proxy.data_dictionary().find_table(
|
||||
_schema->ks_name(), _schema->cf_name()));
|
||||
}
|
||||
|
||||
// These are the cases where update_item_operation::apply() needs to use
|
||||
@@ -3247,6 +3709,9 @@ update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::t
|
||||
bytes column_value = get_key_from_typed_value(json_value, *cdef);
|
||||
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, column_value));
|
||||
} else {
|
||||
// This attribute may be a key column of one of the GSIs, in which
|
||||
// case there are some limitations on the value.
|
||||
validate_value_if_gsi_key(_key_attributes, column_name, json_value);
|
||||
attrs_collector.put(std::move(column_name), serialize_item(json_value), ts);
|
||||
}
|
||||
};
|
||||
|
||||
73
alternator/extract_from_attrs.hh
Normal file
73
alternator/extract_from_attrs.hh
Normal file
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
#include "utils/rjson.hh"
|
||||
#include "serialization.hh"
|
||||
#include "column_computation.hh"
|
||||
#include "db/view/regular_column_transformation.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// An implementation of a "column_computation" which extracts a specific
|
||||
// non-key attribute from the big map (":attrs") of all non-key attributes,
|
||||
// and deserializes it if it has the desired type. GSI will use this computed
|
||||
// column as a materialized-view key when the view key attribute isn't a
|
||||
// full-fledged CQL column but rather stored in ":attrs".
|
||||
class extract_from_attrs_column_computation : public regular_column_transformation {
|
||||
// The name of the CQL column name holding the attribute map. It is a
|
||||
// constant defined in executor.cc (as ":attrs"), so doesn't need
|
||||
// to be specified when constructing the column computation.
|
||||
static const bytes MAP_NAME;
|
||||
// The top-level attribute name to extract from the ":attrs" map.
|
||||
bytes _attr_name;
|
||||
// The type we expect for the value stored in the attribute. If the type
|
||||
// matches the expected type, it is decoded from the serialized format
|
||||
// we store in the map's values) into the raw CQL type value that we use
|
||||
// for keys, and returned by compute_value(). Only the types "S" (string),
|
||||
// "B" (bytes) and "N" (number) are allowed as keys in DynamoDB, and
|
||||
// therefore in desired_type.
|
||||
alternator_type _desired_type;
|
||||
public:
|
||||
virtual column_computation_ptr clone() const override;
|
||||
// TYPE_NAME is a unique string that distinguishes this class from other
|
||||
// column_computation subclasses. column_computation::deserialize() will
|
||||
// construct an object of this subclass if it sees a "type" TYPE_NAME.
|
||||
static inline const std::string TYPE_NAME = "alternator_extract_from_attrs";
|
||||
// Serialize the *definition* of this column computation into a JSON
|
||||
// string with a unique "type" string - TYPE_NAME - which then causes
|
||||
// column_computation::deserialize() to create an object from this class.
|
||||
virtual bytes serialize() const override;
|
||||
// Construct this object based on the previous output of serialize().
|
||||
// Calls on_internal_error() if the string doesn't match the output format
|
||||
// of serialize(). "type" is not checked column_computation::deserialize()
|
||||
// won't call this constructor if "type" doesn't match.
|
||||
extract_from_attrs_column_computation(const rjson::value &v);
|
||||
extract_from_attrs_column_computation(bytes_view attr_name, alternator_type desired_type)
|
||||
: _attr_name(attr_name), _desired_type(desired_type)
|
||||
{}
|
||||
// Implement regular_column_transformation's compute_value() that
|
||||
// accepts the full row:
|
||||
result compute_value(const schema& schema, const partition_key& key,
|
||||
const db::view::clustering_or_static_row& row) const override;
|
||||
// But do not implement column_computation's compute_value() that
|
||||
// accepts only a partition key - that's not enough so our implementation
|
||||
// of this function does on_internal_error().
|
||||
bytes compute_value(const schema& schema, const partition_key& key) const override;
|
||||
// This computed column does depend on a non-primary key column, so
|
||||
// its result may change in the update and we need to compute it
|
||||
// before and after the update.
|
||||
virtual bool depends_on_non_primary_key_column() const override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
} // namespace alternator
|
||||
@@ -245,6 +245,27 @@ rjson::value deserialize_item(bytes_view bv) {
|
||||
return deserialized;
|
||||
}
|
||||
|
||||
// This function takes a bytes_view created earlier by serialize_item(), and
|
||||
// if has the type "expected_type", the function returns the value as a
|
||||
// raw Scylla type. If the type doesn't match, returns an unset optional.
|
||||
// This function only supports the key types S (string), B (bytes) and N
|
||||
// (number) - serialize_item() serializes those types as a single-byte type
|
||||
// followed by the serialized raw Scylla type, so all this function needs to
|
||||
// do is to remove the first byte. This makes this function much more
|
||||
// efficient than deserialize_item() above because it avoids transformation
|
||||
// to/from JSON.
|
||||
std::optional<bytes> serialized_value_if_type(bytes_view bv, alternator_type expected_type) {
|
||||
if (bv.empty() || alternator_type(bv[0]) != expected_type) {
|
||||
return std::nullopt;
|
||||
}
|
||||
// Currently, serialize_item() for types in alternator_type (notably S, B
|
||||
// and N) are nothing more than Scylla's raw format for these types
|
||||
// preceded by a type byte. So we just need to skip that byte and we are
|
||||
// left by exactly what we need to return.
|
||||
bv.remove_prefix(1);
|
||||
return bytes(bv);
|
||||
}
|
||||
|
||||
std::string type_to_string(data_type type) {
|
||||
static thread_local std::unordered_map<data_type, std::string> types = {
|
||||
{utf8_type, "S"},
|
||||
|
||||
@@ -43,6 +43,7 @@ type_representation represent_type(alternator_type atype);
|
||||
|
||||
bytes serialize_item(const rjson::value& item);
|
||||
rjson::value deserialize_item(bytes_view bv);
|
||||
std::optional<bytes> serialized_value_if_type(bytes_view bv, alternator_type expected_type);
|
||||
|
||||
std::string type_to_string(data_type type);
|
||||
|
||||
|
||||
@@ -1573,7 +1573,7 @@ deps['test/boost/linearizing_input_stream_test'] = [
|
||||
"test/boost/linearizing_input_stream_test.cc",
|
||||
"test/lib/log.cc",
|
||||
]
|
||||
deps['test/boost/expr_test'] = ['test/boost/expr_test.cc', 'test/lib/expr_test_utils.cc'] + scylla_core
|
||||
deps['test/boost/expr_test'] = ['test/boost/expr_test.cc', 'test/lib/expr_test_utils.cc'] + scylla_core + alternator
|
||||
deps['test/boost/rate_limiter_test'] = ['test/boost/rate_limiter_test.cc', 'db/rate_limiter.cc']
|
||||
deps['test/boost/exceptions_optimized_test'] = ['test/boost/exceptions_optimized_test.cc', 'utils/exceptions.cc']
|
||||
deps['test/boost/exceptions_fallback_test'] = ['test/boost/exceptions_fallback_test.cc', 'utils/exceptions.cc']
|
||||
@@ -1590,8 +1590,8 @@ deps['test/raft/many_test'] = ['test/raft/many_test.cc', 'test/raft/replication.
|
||||
deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
|
||||
deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
|
||||
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
|
||||
scylla_core + scylla_tests_generic_dependencies
|
||||
deps['test/boost/address_map_test'] = ['test/boost/address_map_test.cc'] + scylla_core
|
||||
scylla_core + alternator + scylla_tests_generic_dependencies
|
||||
deps['test/boost/address_map_test'] = ['test/boost/address_map_test.cc'] + scylla_core + alternator
|
||||
deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
|
||||
'test/raft/helpers.cc',
|
||||
'test/lib/log.cc',
|
||||
|
||||
127
db/view/regular_column_transformation.hh
Normal file
127
db/view/regular_column_transformation.hh
Normal file
@@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "column_computation.hh"
|
||||
#include "mutation/atomic_cell.hh"
|
||||
#include "timestamp.hh"
|
||||
#include <type_traits>
|
||||
|
||||
class row_marker;
|
||||
|
||||
// In a basic column_computation defined in column_computation.hh, the
|
||||
// compute_value() method is only based on the partition key, and it must
|
||||
// return a value. That API has very limited applications - basically the
|
||||
// only thing we can implement with it is token_column_computation which
|
||||
// we used to create the token column in secondary indexes.
|
||||
// The regular_column_transformation base class here is more powerful, but
|
||||
// still is not a completely general computation: Its compute_value() virtual
|
||||
// method can transform the value read from a single cell of a regular column
|
||||
// into a new cell stored in a structure regular_column_transformation::result.
|
||||
//
|
||||
// In more details, the assumptions of regular_column_transformation is:
|
||||
// 1. compute_value() computes the value based on a *single* column in a
|
||||
// row passed to compute_value().
|
||||
// This assumption means that the value or deletion of the value always
|
||||
// has a single known timestamp (and the value can't be half-missing)
|
||||
// and single TTL information. That would not have been possible if we
|
||||
// allowed the computation to depend on multiple columns.
|
||||
// 2. compute_value() computes the value based on a *regular* column in the
|
||||
// base table. This means that an update can modify this value (unlike a
|
||||
// base-table key column that can't change in an update), so the view
|
||||
// update code needs to compute the value before and after the update,
|
||||
// and potentially delete and create view rows.
|
||||
// 3. compute_value() returns a column_computation::result which includes
|
||||
// a value and its liveness information (timestamp and ttl/expiry) or
|
||||
// is missing a value.
|
||||
|
||||
class regular_column_transformation : public column_computation {
|
||||
public:
|
||||
struct result {
|
||||
// We can use "bytes" instead of "managed_bytes" here because we know
|
||||
// that a column_computation is only used for generating a key value,
|
||||
// and that is limited to 64K. This limitation is enforced below -
|
||||
// we never linearize a cell's value if its size is more than 64K.
|
||||
std::optional<bytes> _value;
|
||||
|
||||
// _ttl and _expiry are only defined if _value is set.
|
||||
// The default values below are used when the source cell does not
|
||||
// expire, and are the same values that row_marker uses for a non-
|
||||
// expiring marker. This is useful when creating a row_marker from
|
||||
// get_ttl() and get_expiry().
|
||||
gc_clock::duration _ttl { 0 };
|
||||
gc_clock::time_point _expiry { gc_clock::duration(0) };
|
||||
|
||||
// _ts may be set even if _value is missing, which can remember the
|
||||
// timestamp of a tombstone. Note that the current view-update code
|
||||
// that uses this class doesn't use _ts when _value is missing.
|
||||
api::timestamp_type _ts = api::missing_timestamp;
|
||||
|
||||
api::timestamp_type get_ts() const {
|
||||
return _ts;
|
||||
}
|
||||
|
||||
bool has_value() const {
|
||||
return _value.has_value();
|
||||
}
|
||||
|
||||
// Should only be called if has_value() is true:
|
||||
const bytes& get_value() const {
|
||||
return *_value;
|
||||
}
|
||||
gc_clock::duration get_ttl() const {
|
||||
return _ttl;
|
||||
}
|
||||
gc_clock::time_point get_expiry() const {
|
||||
return _expiry;
|
||||
}
|
||||
|
||||
// A missing computation result
|
||||
result() { }
|
||||
|
||||
// Construct a computation result by copying a given atomic_cell -
|
||||
// including its value, timestamp, and ttl - or deletion timestamp.
|
||||
// The second parameter is an optional transformation function f -
|
||||
// taking a bytes and returning an optional<bytes> - that transforms
|
||||
// the value of the cell but keeps its other liveness information.
|
||||
// If f returns a nullopt, it causes the view row should be deleted.
|
||||
template<typename Func=std::identity>
|
||||
requires std::invocable<Func, bytes> && std::convertible_to<std::invoke_result_t<Func, bytes>, std::optional<bytes>>
|
||||
result(atomic_cell_view cell, Func f = {}) {
|
||||
_ts = cell.timestamp();
|
||||
if (cell.is_live()) {
|
||||
// If the cell is larger than what a key can hold (64KB),
|
||||
// return a missing value. This lets us skip this item during
|
||||
// view building and avoid hanging the view build as described
|
||||
// in #8627. But it doesn't prevent later inserting such a item
|
||||
// to the base table, nor does it implement front-end specific
|
||||
// limits (such as Alternator's 1K or 2K limits - see #10347).
|
||||
// Those stricter limits should be validated in the base-table
|
||||
// write code, not here - deep inside the view update code.
|
||||
// Note also we assume that f() doesn't grow the value further.
|
||||
if (cell.value().size() >= 65536) {
|
||||
return;
|
||||
}
|
||||
_value = f(to_bytes(cell.value()));
|
||||
if (_value) {
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
_ttl = cell.ttl();
|
||||
_expiry = cell.expiry();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
virtual ~regular_column_transformation() = default;
|
||||
virtual result compute_value(
|
||||
const schema& schema,
|
||||
const partition_key& key,
|
||||
const db::view::clustering_or_static_row& row) const = 0;
|
||||
};
|
||||
419
db/view/view.cc
419
db/view/view.cc
@@ -37,6 +37,7 @@
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "db/view/view_update_generator.hh"
|
||||
#include "db/view/regular_column_transformation.hh"
|
||||
#include "db/system_keyspace_view_types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
@@ -508,79 +509,6 @@ size_t view_updates::op_count() const {
|
||||
return _op_count;
|
||||
}
|
||||
|
||||
row_marker view_updates::compute_row_marker(const clustering_or_static_row& base_row) const {
|
||||
/*
|
||||
* We need to compute both the timestamp and expiration for view rows.
|
||||
*
|
||||
* Below there are several distinct cases depending on how many new key
|
||||
* columns the view has - i.e., how many of the view's key columns were
|
||||
* regular columns in the base. base_regular_columns_in_view_pk.size():
|
||||
*
|
||||
* Zero new key columns:
|
||||
* The view rows key is composed only from base key columns, and those
|
||||
* cannot be changed in an update, so the view row remains alive as
|
||||
* long as the base row is alive. We need to return the same row
|
||||
* marker as the base for the view - to keep an empty view row alive
|
||||
* for as long as an empty base row exists.
|
||||
* Note that in this case, if there are *unselected* base columns, we
|
||||
* may need to keep an empty view row alive even without a row marker
|
||||
* because the base row (which has additional columns) is still alive.
|
||||
* For that we have the "virtual columns" feature: In the zero new
|
||||
* key columns case, we put unselected columns in the view as empty
|
||||
* columns, to keep the view row alive.
|
||||
*
|
||||
* One new key column:
|
||||
* In this case, there is a regular base column that is part of the
|
||||
* view key. This regular column can be added or deleted in an update,
|
||||
* or its expiration be set, and those can cause the view row -
|
||||
* including its row marker - to need to appear or disappear as well.
|
||||
* So the liveness of cell of this one column determines the liveness
|
||||
* of the view row and the row marker that we return.
|
||||
*
|
||||
* Two or more new key columns:
|
||||
* This case is explicitly NOT supported in CQL - one cannot create a
|
||||
* view with more than one base-regular columns in its key. In general
|
||||
* picking one liveness (timestamp and expiration) is not possible
|
||||
* if there are multiple regular base columns in the view key, as
|
||||
* those can have different liveness.
|
||||
* However, we do allow this case for Alternator - we need to allow
|
||||
* the case of two (but not more) because the DynamoDB API allows
|
||||
* creating a GSI whose two key columns (hash and range key) were
|
||||
* regular columns.
|
||||
* We can support this case in Alternator because it doesn't use
|
||||
* expiration (the "TTL" it does support is different), and doesn't
|
||||
* support user-defined timestamps. But, the two columns can still
|
||||
* have different timestamps - this happens if an update modifies
|
||||
* just one of them. In this case the timestamp of the view update
|
||||
* (and that of the row marker we return) is the later of these two
|
||||
* updated columns.
|
||||
*/
|
||||
const auto& col_ids = base_row.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
if (!col_ids.empty()) {
|
||||
auto& def = _base->column_at(base_row.column_kind(), col_ids[0]);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto cell = base_row.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
auto ts = cell.timestamp();
|
||||
if (col_ids.size() > 1){
|
||||
// As explained above, this case only happens in Alternator,
|
||||
// and we may need to pick a higher ts:
|
||||
auto& second_def = _base->column_at(base_row.column_kind(), col_ids[1]);
|
||||
auto second_cell = base_row.cells().cell_at(col_ids[1]).as_atomic_cell(second_def);
|
||||
auto second_ts = second_cell.timestamp();
|
||||
ts = std::max(ts, second_ts);
|
||||
// Alternator isn't supposed to have TTL or more than two col_ids!
|
||||
if (col_ids.size() != 2 || cell.is_live_and_has_ttl() || second_cell.is_live_and_has_ttl()) [[unlikely]] {
|
||||
utils::on_internal_error(format("Unexpected col_ids length {} or has TTL", col_ids.size()));
|
||||
}
|
||||
}
|
||||
return cell.is_live_and_has_ttl() ? row_marker(ts, cell.ttl(), cell.expiry()) : row_marker(ts);
|
||||
}
|
||||
|
||||
return base_row.marker();
|
||||
}
|
||||
|
||||
namespace {
|
||||
// The following struct is identical to view_key_with_action, except the key
|
||||
// is stored as a managed_bytes_view instead of bytes.
|
||||
@@ -656,8 +584,8 @@ public:
|
||||
return {_update.key()->get_component(_base, base_col->position())};
|
||||
default:
|
||||
if (base_col->kind != _update.column_kind()) {
|
||||
on_internal_error(vlogger, format("Tried to get a {} column from a {} row update, which is impossible",
|
||||
to_sstring(base_col->kind), _update.is_clustering_row() ? "clustering" : "static"));
|
||||
on_internal_error(vlogger, format("Tried to get a {} column {} from a {} row update, which is impossible",
|
||||
to_sstring(base_col->kind), base_col->name_as_text(), _update.is_clustering_row() ? "clustering" : "static"));
|
||||
}
|
||||
auto& c = _update.cells().cell_at(base_col->id);
|
||||
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
|
||||
@@ -678,6 +606,22 @@ private:
|
||||
return handle_collection_column_computation(collection_computation);
|
||||
}
|
||||
|
||||
// TODO: we already calculated this computation in updatable_view_key_cols,
|
||||
// so perhaps we should pass it here and not re-compute it. But this will
|
||||
// mean computed columns will only work for view key columns (currently
|
||||
// we assume that anyway)
|
||||
if (auto* c = dynamic_cast<const regular_column_transformation*>(&computation)) {
|
||||
regular_column_transformation::result after =
|
||||
c->compute_value(_base, _base_key, _update);
|
||||
if (after.has_value()) {
|
||||
return {managed_bytes_view(linearized_values.emplace_back(after.get_value()))};
|
||||
}
|
||||
// We only get to this function when we know the _update row
|
||||
// exists and call it to read its key columns, so we don't expect
|
||||
// to see a missing value for any of those columns
|
||||
on_internal_error(vlogger, fmt::format("unexpected call to handle_computed_column {} missing in update", cdef.name_as_text()));
|
||||
}
|
||||
|
||||
auto computed_value = computation.compute_value(_base, _base_key);
|
||||
return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))};
|
||||
}
|
||||
@@ -729,7 +673,6 @@ view_updates::get_view_rows(const partition_key& base_key, const clustering_or_s
|
||||
if (partition.partition_tombstone() && partition.partition_tombstone() == row_delete_tomb.tomb()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action});
|
||||
};
|
||||
|
||||
@@ -936,13 +879,12 @@ static void add_cells_to_view(const schema& base, const schema& view, column_kin
|
||||
* Creates a view entry corresponding to the provided base row.
|
||||
* This method checks that the base row does match the view filter before applying anything.
|
||||
*/
|
||||
void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now, row_marker update_marker) {
|
||||
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
||||
auto update_marker = compute_row_marker(update);
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action]: view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
@@ -960,48 +902,28 @@ void view_updates::create_entry(data_dictionary::database db, const partition_ke
|
||||
* Deletes the view entry corresponding to the provided base row.
|
||||
* This method checks that the base row does match the view filter before bothering.
|
||||
*/
|
||||
void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) {
|
||||
// Before deleting an old entry, make sure it was matching the view filter
|
||||
// (otherwise there is nothing to delete)
|
||||
if (matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
||||
do_delete_old_entry(base_key, existing, update, now);
|
||||
do_delete_old_entry(base_key, existing, update, now, deletion_ts);
|
||||
}
|
||||
}
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) {
|
||||
auto view_rows = get_view_rows(base_key, existing, std::nullopt, update.tomb());
|
||||
const auto kind = existing.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
const auto& col_ids = existing.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
if (auto ts_tag = std::get_if<view_key_and_action::shadowable_tombstone_tag>(&action)) {
|
||||
r->apply(ts_tag->into_shadowable_tombstone(now));
|
||||
}
|
||||
} else if (!col_ids.empty()) {
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& def = _base->column_at(kind, col_ids[0]);
|
||||
auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
auto ts = cell.timestamp();
|
||||
if (col_ids.size() > 1) {
|
||||
// This is the Alternator-only support for two regular base
|
||||
// columns that become view key columns. See explanation in
|
||||
// view_updates::compute_row_marker().
|
||||
auto& second_def = _base->column_at(kind, col_ids[1]);
|
||||
auto second_cell = existing.cells().cell_at(col_ids[1]).as_atomic_cell(second_def);
|
||||
auto second_ts = second_cell.timestamp();
|
||||
ts = std::max(ts, second_ts);
|
||||
// Alternator isn't supposed to have more than two col_ids!
|
||||
if (col_ids.size() != 2) [[unlikely]] {
|
||||
utils::on_internal_error(format("Unexpected col_ids length {}", col_ids.size()));
|
||||
}
|
||||
}
|
||||
if (cell.is_live()) {
|
||||
r->apply(shadowable_tombstone(ts, now));
|
||||
}
|
||||
if (!col_ids.empty() || _view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
// The view key could have been modified because it contains or
|
||||
// depends on a non-primary-key. The fact that this function was
|
||||
// called instead of update_entry() means the caller knows it
|
||||
// wants to delete the old row (with the given deletion_ts) and
|
||||
// will create a different one. So let's honor this.
|
||||
r->apply(shadowable_tombstone(deletion_ts, now));
|
||||
} else {
|
||||
// "update" caused the base row to have been deleted, and !col_id
|
||||
// means view row is the same - so it needs to be deleted as well
|
||||
@@ -1102,15 +1024,15 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
|
||||
* This method checks that the base row (before and after) matches the view filter before
|
||||
* applying anything.
|
||||
*/
|
||||
void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) {
|
||||
void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now, row_marker update_marker) {
|
||||
// While we know update and existing correspond to the same view entry,
|
||||
// they may not match the view filter.
|
||||
if (!matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
||||
create_entry(db, base_key, update, now);
|
||||
create_entry(db, base_key, update, now, update_marker);
|
||||
return;
|
||||
}
|
||||
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
||||
do_delete_old_entry(base_key, existing, update, now);
|
||||
do_delete_old_entry(base_key, existing, update, now, update_marker.timestamp());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1119,7 +1041,7 @@ void view_updates::update_entry(data_dictionary::database db, const partition_ke
|
||||
}
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
||||
auto update_marker = compute_row_marker(update);
|
||||
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
@@ -1135,6 +1057,8 @@ void view_updates::update_entry(data_dictionary::database db, const partition_ke
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
|
||||
// Note: despite the general-sounding name of this function, it is used
|
||||
// just for the case of collection indexing.
|
||||
void view_updates::update_entry_for_computed_column(
|
||||
const partition_key& base_key,
|
||||
const clustering_or_static_row& update,
|
||||
@@ -1157,30 +1081,72 @@ void view_updates::update_entry_for_computed_column(
|
||||
}
|
||||
}
|
||||
|
||||
// view_updates::generate_update() is the main function for taking an update
|
||||
// to a base table row - consisting of existing and updated versions of row -
|
||||
// and creating from it zero or more updates to a given materialized view.
|
||||
// These view updates may consist of updating an existing view row, deleting
|
||||
// an old view row, and/or creating a new view row.
|
||||
// There are several distinct cases depending on how many of the view's key
|
||||
// columns are "new key columns", i.e., were regular key columns in the base
|
||||
// or are a computed column based on a regular column (these computed columns
|
||||
// are used by, for example, Alternator's GSI):
|
||||
//
|
||||
// Zero new key columns:
|
||||
// The view rows key is composed only from base key columns, and those can't
|
||||
// be changed in an update, so the view row remains alive as long as the
|
||||
// base row is alive. The row marker for the view needs to be set to the
|
||||
// same row marker in the base - to keep an empty view row alive for as long
|
||||
// as an empty base row exists.
|
||||
// Note that in this case, if there are *unselected* base columns, we may
|
||||
// need to keep an empty view row alive even without a row marker because
|
||||
// the base row (which has additional columns) is still alive. For that we
|
||||
// have the "virtual columns" feature: In the zero new key columns case, we
|
||||
// put unselected columns in the view as empty columns, to keep the view
|
||||
// row alive.
|
||||
//
|
||||
// One new key column:
|
||||
// In this case, there is a regular base column that is part of the view
|
||||
// key. This regular column can be added or deleted in an update, or its
|
||||
// expiration be set, and those can cause the view row - including its row
|
||||
// marker - to need to appear or disappear as well. So the liveness of cell
|
||||
// of this one column determines the liveness of the view row and the row
|
||||
// marker that we set for it.
|
||||
//
|
||||
// Two or more new key columns:
|
||||
// This case is explicitly NOT supported in CQL - one cannot create a view
|
||||
// with more than one base-regular columns in its key. In general picking
|
||||
// one liveness (timestamp and expiration) is not possible if there are
|
||||
// multiple regular base columns in the view key, asthose can have different
|
||||
// liveness.
|
||||
// However, we do allow this case for Alternator - we need to allow the case
|
||||
// of two (but not more) because the DynamoDB API allows creating a GSI
|
||||
// whose two key columns (hash and range key) were regular columns. We can
|
||||
// support this case in Alternator because it doesn't use expiration (the
|
||||
// "TTL" it does support is different), and doesn't support user-defined
|
||||
// timestamps. But, the two columns can still have different timestamps -
|
||||
// this happens if an update modifies just one of them. In this case the
|
||||
// timestamp of the view update (and that of the row marker) is the later
|
||||
// of these two updated columns.
|
||||
void view_updates::generate_update(
|
||||
data_dictionary::database db,
|
||||
const partition_key& base_key,
|
||||
const clustering_or_static_row& update,
|
||||
const std::optional<clustering_or_static_row>& existing,
|
||||
gc_clock::time_point now) {
|
||||
|
||||
// Note that the base PK columns in update and existing are the same, since we're intrinsically dealing
|
||||
// with the same base row. So we have to check 3 things:
|
||||
// 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case,
|
||||
// there is no corresponding entries.
|
||||
// 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
|
||||
// 3) whether the update actually matches the view SELECT filter
|
||||
|
||||
// FIXME: The following if() is old code which may be related to COMPACT
|
||||
// STORAGE. If this is a real case, refer to a test that demonstrates it.
|
||||
// If it's not a real case, remove this if().
|
||||
if (update.is_clustering_row()) {
|
||||
if (!update.key()->is_full(*_base)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
return update_entry_for_computed_column(base_key, update, existing, now);
|
||||
}
|
||||
if (!_base_info->has_base_non_pk_columns_in_view_pk) {
|
||||
// If the view key depends on any regular column in the base, the update
|
||||
// may change the view key and may require deleting an old view row and
|
||||
// inserting a new row. The other case, which we'll handle here first,
|
||||
// is easier and require just modifying one view row.
|
||||
if (!_base_info->has_base_non_pk_columns_in_view_pk &&
|
||||
!_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
if (update.is_static_row()) {
|
||||
// TODO: support static rows in views with pk only including columns from base pk
|
||||
return;
|
||||
@@ -1188,85 +1154,186 @@ void view_updates::generate_update(
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && existing->is_live(*_base)) {
|
||||
if (update.is_live(*_base)) {
|
||||
update_entry(db, base_key, update, *existing, now);
|
||||
update_entry(db, base_key, update, *existing, now, update.marker());
|
||||
} else {
|
||||
delete_old_entry(db, base_key, *existing, update, now);
|
||||
delete_old_entry(db, base_key, *existing, update, now, api::missing_timestamp);
|
||||
}
|
||||
} else if (update.is_live(*_base)) {
|
||||
create_entry(db, base_key, update, now);
|
||||
create_entry(db, base_key, update, now, update.marker());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const auto& col_ids = update.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
|
||||
// The view has a non-primary-key column from the base table as its primary key.
|
||||
// That means it's either a regular or static column. If we are currently
|
||||
// processing an update which does not correspond to the column's kind,
|
||||
// just stop here.
|
||||
if (col_ids.empty()) {
|
||||
// Find the view key columns that may be changed by an update.
|
||||
// This case is interesting because a change to the view key means that
|
||||
// we may need to delete an old view row and/or create a new view row.
|
||||
// The columns we look for are view key columns that are neither base key
|
||||
// columns nor computed columns based just on key columns. In other words,
|
||||
// we look here for columns which were regular columns or static columns
|
||||
// in the base table, or computed columns based on regular columns.
|
||||
struct updatable_view_key_col {
|
||||
column_id view_col_id;
|
||||
regular_column_transformation::result before;
|
||||
regular_column_transformation::result after;
|
||||
};
|
||||
std::vector<updatable_view_key_col> updatable_view_key_cols;
|
||||
for (const column_definition& view_col : _view->primary_key_columns()) {
|
||||
if (view_col.is_computed()) {
|
||||
const column_computation& computation = view_col.get_computation();
|
||||
if (computation.depends_on_non_primary_key_column()) {
|
||||
// Column is a computed column that does not depend just on
|
||||
// the base key, so it may change in the update.
|
||||
if (auto* c = dynamic_cast<const regular_column_transformation*>(&computation)) {
|
||||
updatable_view_key_cols.emplace_back(view_col.id,
|
||||
existing ? c->compute_value(*_base, base_key, *existing) : regular_column_transformation::result(),
|
||||
c->compute_value(*_base, base_key, update));
|
||||
} else {
|
||||
// The only other column_computation we have which has
|
||||
// depends_on_non_primary_key_column is
|
||||
// collection_column_computation, and we have a special
|
||||
// function to handle that case:
|
||||
return update_entry_for_computed_column(base_key, update, existing, now);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const column_definition* base_col = _base->get_column_definition(view_col.name());
|
||||
if (!base_col) {
|
||||
on_internal_error(vlogger, fmt::format("Column {} in view {}.{} was not found in the base table {}.{}",
|
||||
view_col.name(), _view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name()));
|
||||
}
|
||||
// If the view key column was also a base primary key column, then
|
||||
// it can't possibly change in this update. But the column was not
|
||||
// not a primary key column - i.e., a regular column or static
|
||||
// column, the update might have changed it and we need to list it
|
||||
// on updatable_view_key_cols.
|
||||
// We check base_col->kind == update.column_kind() instead of just
|
||||
// !base_col->is_primary_key() because when update is a static row
|
||||
// we know it can't possibly update a regular column (and vice
|
||||
// versa).
|
||||
if (base_col->kind == update.column_kind()) {
|
||||
// This is view key, so we know it is atomic
|
||||
std::optional<atomic_cell_view> after;
|
||||
auto afterp = update.cells().find_cell(base_col->id);
|
||||
if (afterp) {
|
||||
after = afterp->as_atomic_cell(*base_col);
|
||||
}
|
||||
std::optional<atomic_cell_view> before;
|
||||
if (existing) {
|
||||
auto beforep = existing->cells().find_cell(base_col->id);
|
||||
if (beforep) {
|
||||
before = beforep->as_atomic_cell(*base_col);
|
||||
}
|
||||
}
|
||||
updatable_view_key_cols.emplace_back(view_col.id,
|
||||
before ? regular_column_transformation::result(*before) : regular_column_transformation::result(),
|
||||
after ? regular_column_transformation::result(*after) : regular_column_transformation::result());
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we reached here, the view has a non-primary-key column from the base
|
||||
// table as its primary key. That means it's either a regular or static
|
||||
// column. If we are currently processing an update which does not
|
||||
// correspond to the column's kind, updatable_view_key_cols will be empty
|
||||
// and we can just stop here.
|
||||
if (updatable_view_key_cols.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto kind = update.column_kind();
|
||||
|
||||
// If one of the key columns is missing, set has_new_row = false
|
||||
// meaning that after the update there will be no view row.
|
||||
// If one of the key columns is missing in the existing value,
|
||||
// set has_old_row = false meaning we don't have an old row to
|
||||
// delete.
|
||||
// Use updatable_view_key_cols - the before and after values of the
|
||||
// view key columns that may have changed - to determine if the update
|
||||
// changes an existing view row, deletes an old row or creates a new row.
|
||||
bool has_old_row = true;
|
||||
bool has_new_row = true;
|
||||
bool same_row = true;
|
||||
for (auto col_id : col_ids) {
|
||||
auto* after = update.cells().find_cell(col_id);
|
||||
auto& cdef = _base->column_at(kind, col_id);
|
||||
if (existing) {
|
||||
auto* before = existing->cells().find_cell(col_id);
|
||||
// Note that this cell is necessarily atomic, because col_ids are
|
||||
// view key columns, and keys must be atomic.
|
||||
if (before && before->as_atomic_cell(cdef).is_live()) {
|
||||
if (after && after->as_atomic_cell(cdef).is_live()) {
|
||||
// We need to compare just the values of the keys, not
|
||||
// metadata like the timestamp. This is because below,
|
||||
// if the old and new view row have the same key, we need
|
||||
// to be sure to reach the update_entry() case.
|
||||
auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value());
|
||||
if (cmp != 0) {
|
||||
same_row = false;
|
||||
}
|
||||
bool same_row = true; // undefined if either has_old_row or has_new_row are false
|
||||
for (const auto& u : updatable_view_key_cols) {
|
||||
if (u.before.has_value()) {
|
||||
if (u.after.has_value()) {
|
||||
if (compare_unsigned(u.before.get_value(), u.after.get_value()) != 0) {
|
||||
same_row = false;
|
||||
}
|
||||
} else {
|
||||
has_old_row = false;
|
||||
has_new_row = false;
|
||||
}
|
||||
} else {
|
||||
has_old_row = false;
|
||||
}
|
||||
if (!after || !after->as_atomic_cell(cdef).is_live()) {
|
||||
has_new_row = false;
|
||||
if (!u.after.has_value()) {
|
||||
has_new_row = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If has_new_row, calculate a row marker for this view row - i.e., a
|
||||
// timestamp and ttl - based on those of the updatable view key column
|
||||
// (or, in an Alternator-only extension, more than one).
|
||||
row_marker new_row_rm; // only set if has_new_row
|
||||
if (has_new_row) {
|
||||
// Note:
|
||||
// 1. By reaching here we know that updatable_view_key_cols has at
|
||||
// least one member (in CQL, it's always one, in Alternator it
|
||||
// may be two).
|
||||
// 2. Because has_new_row, we know all elements in that array have
|
||||
// after.has_value() true, so we can use after.get_ts() et al.
|
||||
api::timestamp_type new_row_ts = updatable_view_key_cols[0].after.get_ts();
|
||||
// This is the Alternator-only support for *two* regular base columns
|
||||
// that become view key columns. The timestamp we use is the *maximum*
|
||||
// of the two key columns, as explained in pull-request #17172.
|
||||
if (updatable_view_key_cols.size() > 1) {
|
||||
auto second_ts = updatable_view_key_cols[1].after.get_ts();
|
||||
new_row_ts = std::max(new_row_ts, second_ts);
|
||||
// Alternator isn't supposed to have more than two updatable view key columns!
|
||||
if (updatable_view_key_cols.size() != 2) [[unlikely]] {
|
||||
utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size()));
|
||||
}
|
||||
}
|
||||
// We assume that either updatable_view_key_cols has just one column
|
||||
// (the only situation allowed in CQL) or if there is more then one
|
||||
// they have the same expiry information (in Alternator, there is
|
||||
// never a CQL TTL set).
|
||||
new_row_rm = row_marker(new_row_ts, updatable_view_key_cols[0].after.get_ttl(), updatable_view_key_cols[0].after.get_expiry());
|
||||
}
|
||||
|
||||
if (has_old_row) {
|
||||
// As explained in #19977, when there is one updatable_view_key_cols
|
||||
// (the only case allowed in CQL) the deletion timestamp is before's
|
||||
// timestamp. As explained in #17119, if there are two of them (only
|
||||
// possible in Alternator), we take the maximum.
|
||||
// Note:
|
||||
// 1. By reaching here we know that updatable_view_key_cols has at
|
||||
// least one member (in CQL, it's always one, in Alternator it
|
||||
// may be two).
|
||||
// 2. Because has_old_row, we know all elements in that array have
|
||||
// before.has_value() true, so we can use before.get_ts().
|
||||
auto old_row_ts = updatable_view_key_cols[0].before.get_ts();
|
||||
if (updatable_view_key_cols.size() > 1) {
|
||||
// This is the Alternator-only support for two regular base
|
||||
// columns that become view key columns. See explanation in
|
||||
// view_updates::compute_row_marker().
|
||||
auto second_ts = updatable_view_key_cols[1].before.get_ts();
|
||||
old_row_ts = std::max(old_row_ts, second_ts);
|
||||
// Alternator isn't supposed to have more than two updatable view key columns!
|
||||
if (updatable_view_key_cols.size() != 2) [[unlikely]] {
|
||||
utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size()));
|
||||
}
|
||||
}
|
||||
if (has_new_row) {
|
||||
if (same_row) {
|
||||
update_entry(db, base_key, update, *existing, now);
|
||||
update_entry(db, base_key, update, *existing, now, new_row_rm);
|
||||
} else {
|
||||
// This code doesn't work if the old and new view row have the
|
||||
// same key, because if they do we get both data and tombstone
|
||||
// for the same timestamp (now) and the tombstone wins. This
|
||||
// is why we need the "same_row" case above - it's not just a
|
||||
// performance optimization.
|
||||
delete_old_entry(db, base_key, *existing, update, now);
|
||||
create_entry(db, base_key, update, now);
|
||||
// The following code doesn't work if the old and new view row
|
||||
// have the same key, because if they do we can get both data
|
||||
// and tombstone for the same timestamp and the tombstone
|
||||
// wins. This is why we need the "same_row" case above - it's
|
||||
// not just a performance optimization.
|
||||
delete_old_entry(db, base_key, *existing, update, now, old_row_ts);
|
||||
create_entry(db, base_key, update, now, new_row_rm);
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(db, base_key, *existing, update, now);
|
||||
delete_old_entry(db, base_key, *existing, update, now, old_row_ts);
|
||||
}
|
||||
} else if (has_new_row) {
|
||||
create_entry(db, base_key, update, now);
|
||||
create_entry(db, base_key, update, now, new_row_rm);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
bool view_updates::is_partition_key_permutation_of_base_partition_key() const {
|
||||
|
||||
@@ -240,10 +240,10 @@ private:
|
||||
};
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, row_tombstone update_tomb);
|
||||
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
|
||||
void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now);
|
||||
void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now, row_marker update_marker);
|
||||
void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts);
|
||||
void update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now, row_marker update_marker);
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||
};
|
||||
|
||||
|
||||
@@ -272,12 +272,6 @@ behave the same in Alternator. However, there are a few features which we have
|
||||
not implemented yet. Unimplemented features return an error when used, so
|
||||
they should be easy to detect. Here is a list of these unimplemented features:
|
||||
|
||||
* Currently in Alternator, a GSI (Global Secondary Index) can only be added
|
||||
to a table at table creation time. DynamoDB allows adding a GSI (but not an
|
||||
LSI) to an existing table using an UpdateTable operation, and similarly it
|
||||
allows removing a GSI from a table.
|
||||
<https://github.com/scylladb/scylla/issues/11567>
|
||||
|
||||
* GSI (Global Secondary Index) and LSI (Local Secondary Index) may be
|
||||
configured to project only a subset of the base-table attributes to the
|
||||
index. This option is not yet respected by Alternator - all attributes
|
||||
@@ -378,3 +372,14 @@ they should be easy to detect. Here is a list of these unimplemented features:
|
||||
that can be used to forbid table deletion. This table option was added to
|
||||
DynamoDB in March 2023.
|
||||
<https://github.com/scylladb/scylla/issues/14482>
|
||||
|
||||
* Alternator does not support the table option WarmThroughput that can be
|
||||
used to check or guarantee that the database has "warmed" to handle a
|
||||
particular throughput. This table option was added to DynamoDB in
|
||||
November 2024.
|
||||
<https://github.com/scylladb/scylladb/issues/21853>
|
||||
|
||||
* Alternator does not support the table option MultiRegionConsistency
|
||||
that can be used to achieve consistent reads on global (multi-region) tables.
|
||||
This table option was added as a preview to DynamoDB in December 2024.
|
||||
<https://github.com/scylladb/scylladb/issues/21852>
|
||||
|
||||
@@ -1591,7 +1591,7 @@ void row::apply_monotonically(const schema& our_schema, const schema& their_sche
|
||||
// we erase the live cells according to the shadowable_tombstone rules.
|
||||
static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) {
|
||||
return s.is_view()
|
||||
&& s.view_info()->has_base_non_pk_columns_in_view_pk()
|
||||
&& (s.view_info()->has_base_non_pk_columns_in_view_pk() || s.view_info()->has_computed_column_depending_on_base_non_primary_key())
|
||||
&& !marker.is_live()
|
||||
&& kind == column_kind::regular_column; // not applicable to static rows
|
||||
}
|
||||
|
||||
@@ -13,7 +13,9 @@ target_link_libraries(schema
|
||||
idl
|
||||
Seastar::seastar
|
||||
xxHash::xxhash
|
||||
absl::headers)
|
||||
absl::headers
|
||||
PRIVATE
|
||||
alternator)
|
||||
|
||||
check_headers(check-headers schema
|
||||
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "index/target_parser.hh"
|
||||
#include "utils/hashing.hh"
|
||||
#include "utils/hashers.hh"
|
||||
#include "alternator/extract_from_attrs.hh"
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
@@ -2090,6 +2091,9 @@ column_computation_ptr column_computation::deserialize(bytes_view raw) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (type == alternator::extract_from_attrs_column_computation::TYPE_NAME) {
|
||||
return std::make_unique<alternator::extract_from_attrs_column_computation>(parsed);
|
||||
}
|
||||
throw std::runtime_error(format("Incorrect column computation type {} found when parsing {}", *type_json, parsed));
|
||||
}
|
||||
|
||||
|
||||
@@ -655,7 +655,6 @@ def test_rbac_updatetable(dynamodb, cql):
|
||||
# to create a completely separate and independent table.
|
||||
# Below we also have two additional tests for auto-grant when creating a GSI,
|
||||
# and auto-revoke when deleting it.
|
||||
@pytest.mark.xfail(reason="#11567")
|
||||
def test_rbac_updatetable_gsi(dynamodb, cql):
|
||||
schema = {
|
||||
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
@@ -713,7 +712,6 @@ def test_rbac_updatetable_gsi(dynamodb, cql):
|
||||
# Test the "autogrant" feature of UpdateTable's GSI Create feature: If a role
|
||||
# is allowed to create a GSI, this role is automatically given full (SELECT)
|
||||
# permissions to the newly-created GSI.
|
||||
@pytest.mark.xfail(reason="#11567")
|
||||
def test_rbac_updatetable_gsi_autogrant(dynamodb, cql):
|
||||
schema = {
|
||||
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
@@ -760,7 +758,6 @@ def test_rbac_updatetable_gsi_autogrant(dynamodb, cql):
|
||||
# on the deleted GSI are revoked. If we forgot to do this revocation, it is
|
||||
# possible for role1 to create a GSI, delete it, and then role2 creates a
|
||||
# different GSI with the same name and role1 might be able to read it.
|
||||
@pytest.mark.xfail(reason="#11567")
|
||||
def test_rbac_updatetable_gsi_autorevoke(dynamodb, cql):
|
||||
schema = {
|
||||
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import pytest
|
||||
import time
|
||||
from botocore.exceptions import ClientError
|
||||
from test.alternator.util import create_test_table, random_string, random_bytes, full_scan, full_query, multiset, list_tables, new_test_table
|
||||
from .util import create_test_table, random_string, random_bytes, full_scan, full_query, multiset, list_tables, new_test_table, wait_for_gsi
|
||||
|
||||
# GSIs only support eventually consistent reads, so tests that involve
|
||||
# writing to a table and then expect to read something from it cannot be
|
||||
@@ -261,8 +261,16 @@ def test_gsi_describe(test_table_gsi_1):
|
||||
# backfilled might be in other states, but that case is tested in different
|
||||
# tests in test_gsi_updatetable.py.
|
||||
# Reproduces #11471.
|
||||
@pytest.mark.xfail(reason="issue #11471")
|
||||
def test_gsi_describe_indexstatus(test_table_gsi_1):
|
||||
# In DynamoDB, a GSI created together with the table is always immediately
|
||||
# ACTIVE, but this is not always true in Alternator: Although a new table
|
||||
# is completely empty and its "view building" phase has nothing to do,
|
||||
# this "nothing" can still take a short while (especially in debug builds)
|
||||
# and in the mean time the test might see the CREATING state and be flaky.
|
||||
# So let's wait_for_gsi() just to be sure the view building is over.
|
||||
# Note that this makes the explicit IndexStatus check below redundant,
|
||||
# because wait_for_gsi() already does it..
|
||||
wait_for_gsi(test_table_gsi_1, 'hello')
|
||||
desc = test_table_gsi_1.meta.client.describe_table(TableName=test_table_gsi_1.name)
|
||||
gsis = desc['Table']['GlobalSecondaryIndexes']
|
||||
assert len(gsis) == 1
|
||||
@@ -607,6 +615,122 @@ def test_gsi_wrong_type_attribute_batch(test_table_gsi_2):
|
||||
for p in [p1, p2, p3]:
|
||||
assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
|
||||
# Test when a table has a GSI, if the indexed attribute is a partition key
|
||||
# in the GSI and its value is 2048 bytes, the update operation is rejected,
|
||||
# and is added to neither base table nor index. DynamoDB limits partition
|
||||
# keys to that length (see test_limits.py::test_limit_partition_key_len_2048)
|
||||
# so wants to limit the GSI keys as well.
|
||||
# Note that in test_gsi_updatetable.py we have a similar test for when adding
|
||||
# a pre-existing table. In that case we can't reject the base-table update
|
||||
# because the oversized attribute is already there - but can just drop this
|
||||
# item from the GSI.
|
||||
@pytest.mark.xfail(reason="issue #10347: key length limits not enforced")
|
||||
def test_gsi_limit_partition_key_len_2048(test_table_gsi_2):
|
||||
# A value for 'x' (the GSI's partition key) of length 2048 is fine:
|
||||
p = random_string()
|
||||
x = 'a'*2048
|
||||
test_table_gsi_2.put_item(Item={'p': p, 'x': x})
|
||||
assert_index_query(test_table_gsi_2, 'hello', [{'p': p, 'x': x}],
|
||||
KeyConditions={
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# PutItem with oversized for 'x' is rejected, item isn't created even
|
||||
# in the base table.
|
||||
p = random_string()
|
||||
x = 'a'*2049
|
||||
with pytest.raises(ClientError, match='ValidationException.*2048'):
|
||||
test_table_gsi_2.put_item(Item={'p': p, 'x': x})
|
||||
assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
|
||||
# This is a variant of the above test, where we don't insist that the
|
||||
# partition key length limit must be exactly 2048 bytes as in DynamoDB,
|
||||
# but that it be *at least* 2408. I.e., we verify that 2048-byte values
|
||||
# are allowed for GSI partition keys, while very long keys that surpass
|
||||
# Scylla's low-level key-length limit (64 KB) are forbidden with an
|
||||
# appropriate error message and not an "internal server error". This test
|
||||
# should pass even if Alternator decides to adopt a different key length
|
||||
# limits from DynamoDB. We do have to adopt *some* limit because the
|
||||
# internal Scylla implementation has a 64 KB limit on key lengths.
|
||||
@pytest.mark.xfail(reason="issue #10347: key length limits not enforced")
|
||||
def test_gsi_limit_partition_key_len(test_table_gsi_2):
|
||||
# A value for 'x' (the GSI's partition key) of length 2048 is fine:
|
||||
p = random_string()
|
||||
x = 'a'*2048
|
||||
test_table_gsi_2.put_item(Item={'p': p, 'x': x})
|
||||
assert_index_query(test_table_gsi_2, 'hello', [{'p': p, 'x': x}],
|
||||
KeyConditions={
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# Attribute, that is a GSI partition key, of length 64 KB + 1 is forbidden:
|
||||
# it obviously exceeds DynamoDB's limit (2048 bytes), but also exceeds
|
||||
# Scylla's internal limit on key length (64 KB - 1). We except to get a
|
||||
# reasonable error on request validation - not some "internal server error".
|
||||
# We actually used to get this "internal server error" for 64 KB - 2
|
||||
# (this is probably related to issue #16772).
|
||||
p = random_string()
|
||||
x = 'a'*65536
|
||||
with pytest.raises(ClientError, match='ValidationException.*limit'):
|
||||
test_table_gsi_2.put_item(Item={'p': p, 'x': x})
|
||||
assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
|
||||
# Test when a table has a GSI, if the indexed attribute is a partition key
|
||||
# in the GSI and its value is 1024 bytes, the update operation is rejected,
|
||||
# and is added to neither base table nor index. DynamoDB limits partition
|
||||
# keys to that length (see test_limits.py::test_limit_partition_key_len_1024)
|
||||
# so wants to limit the GSI keys as well.
|
||||
# Note that in test_gsi_updatetable.py we have a similar test for when adding
|
||||
# a pre-existing table. In that case we can't reject the base-table update
|
||||
# because the oversized attribute is already there - but can just drop this
|
||||
# item from the GSI.
|
||||
@pytest.mark.xfail(reason="issue #10347: key length limits not enforced")
|
||||
def test_gsi_limit_sort_key_len_1024(test_table_gsi_5):
|
||||
# A value for 'x' (the GSI's partition key) of length 1024 is fine:
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
x = 'a'*1024
|
||||
test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x})
|
||||
assert_index_query(test_table_gsi_5, 'hello', [{'p': p, 'c': c, 'x': x}],
|
||||
KeyConditions={
|
||||
'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# PutItem with oversized for 'x' is rejected, item isn't created even
|
||||
# in the base table.
|
||||
p = random_string()
|
||||
x = 'a'*1025
|
||||
with pytest.raises(ClientError, match='ValidationException.*1024'):
|
||||
test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x})
|
||||
assert not 'Item' in test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)
|
||||
|
||||
# This is a variant of the above test, where we don't insist that the
|
||||
# partition key length limit must be exactly 1024 bytes as in DynamoDB,
|
||||
# but that it be *at least* 1024. I.e., we verify that 1024-byte values
|
||||
# are allowed for GSI partition keys, while very long keys that surpass
|
||||
# Scylla's low-level key-length limit (64 KB) are forbidden with an
|
||||
# appropriate error message and not an "internal server error". This test
|
||||
# should pass even if Alternator decides to adopt a different key length
|
||||
# limits from DynamoDB. We do have to adopt *some* limit because the
|
||||
# internal Scylla implementation has a 64 KB limit on key lengths.
|
||||
@pytest.mark.xfail(reason="issue #10347: key length limits not enforced")
|
||||
def test_gsi_limit_sort_key_len(test_table_gsi_5):
|
||||
# A value for 'x' (the GSI's partition key) of length 1024 is fine:
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
x = 'a'*1024
|
||||
test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x})
|
||||
assert_index_query(test_table_gsi_5, 'hello', [{'p': p, 'c': c, 'x': x}],
|
||||
KeyConditions={
|
||||
'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# Attribute, that is a GSI partition key, of length 64 KB + 1 is forbidden:
|
||||
# it obviously exceeds DynamoDB's limit (1024 bytes), but also exceeds
|
||||
# Scylla's internal limit on key length (64 KB - 1). We except to get a
|
||||
# reasonable error on request validation - not some "internal server error".
|
||||
# We actually used to get this "internal server error" for 64 KB - 2
|
||||
# (this is probably related to issue #16772).
|
||||
p = random_string()
|
||||
x = 'a'*65536
|
||||
with pytest.raises(ClientError, match='ValidationException.*limit'):
|
||||
test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x})
|
||||
assert not 'Item' in test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)
|
||||
|
||||
# A third scenario of GSI. Index has a hash key and a sort key, both are
|
||||
# non-key attributes from the base table. This scenario may be very
|
||||
# difficult to implement in Alternator because Scylla's materialized-views
|
||||
|
||||
@@ -12,59 +12,9 @@ import pytest
|
||||
import time
|
||||
from botocore.exceptions import ClientError
|
||||
from .util import random_string, full_scan, full_query, multiset, \
|
||||
new_test_table
|
||||
new_test_table, wait_for_gsi, wait_for_gsi_gone
|
||||
from .test_gsi import assert_index_query
|
||||
|
||||
# UpdateTable for creating a GSI is an asynchronous operation. The table's
|
||||
# TableStatus changes from ACTIVE to UPDATING for a short while, and then
|
||||
# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING,
|
||||
# until eventually (in Amazon DynamoDB - it tests a *long* time...) it
|
||||
# becomes ACTIVE. During the CREATING phase, at some point the Backfilling
|
||||
# attribute also appears, until it eventually disappears. We need to wait
|
||||
# until all three markers indicate completion.
|
||||
# Unfortunately, while boto3 has a client.get_waiter('table_exists') to
|
||||
# wait for a table to exists, there is no such function to wait for an
|
||||
# index to come up, so we need to code it ourselves.
|
||||
def wait_for_gsi(table, gsi_name):
|
||||
start_time = time.time()
|
||||
# The timeout needs to be long because on Amazon DynamoDB, even on a
|
||||
# a tiny table, it sometimes takes minutes.
|
||||
while time.time() < start_time + 600:
|
||||
desc = table.meta.client.describe_table(TableName=table.name)
|
||||
table_status = desc['Table']['TableStatus']
|
||||
if table_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
||||
assert len(index_desc) == 1
|
||||
index_status = index_desc[0]['IndexStatus']
|
||||
if index_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
# When the index is ACTIVE, this must be after backfilling completed
|
||||
assert not 'Backfilling' in index_desc[0]
|
||||
return
|
||||
raise AssertionError("wait_for_gsi did not complete")
|
||||
|
||||
# Similarly to how wait_for_gsi() waits for a GSI to finish adding,
|
||||
# this function waits for a GSI to be finally deleted.
|
||||
def wait_for_gsi_gone(table, gsi_name):
|
||||
start_time = time.time()
|
||||
while time.time() < start_time + 600:
|
||||
desc = table.meta.client.describe_table(TableName=table.name)
|
||||
table_status = desc['Table']['TableStatus']
|
||||
if table_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
if 'GlobalSecondaryIndexes' in desc['Table']:
|
||||
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
||||
if len(index_desc) != 0:
|
||||
index_status = index_desc[0]['IndexStatus']
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
return
|
||||
raise AssertionError("wait_for_gsi_gone did not complete")
|
||||
|
||||
# All tests in test_gsi.py involved creating a new table with a GSI up-front.
|
||||
# This test will be about creating a base table *without* a GSI, putting data
|
||||
# in it, and then adding a GSI with the UpdateTable operation. This starts
|
||||
@@ -73,7 +23,6 @@ def wait_for_gsi_gone(table, gsi_name):
|
||||
# the wrong type are silently ignored and not added to the index. We also
|
||||
# check that after adding the GSI, it is no longer possible to add more
|
||||
# items with wrong types to the base table.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_backfill(dynamodb):
|
||||
# First create, and fill, a table without GSI. The items in items1
|
||||
# will have the appropriate string type for 'x' and will later get
|
||||
@@ -141,7 +90,6 @@ def test_gsi_backfill(dynamodb):
|
||||
# check that the new GSI works. In Alternator's implementation, the LSI key
|
||||
# column will become a real column in the schema, and the GSI needs to use
|
||||
# that instead of the usual computed column.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_backfill_with_lsi(dynamodb):
|
||||
# First create, and fill, a table with an LSI but without GSI.
|
||||
with new_test_table(dynamodb,
|
||||
@@ -208,7 +156,6 @@ def test_gsi_backfill_with_lsi(dynamodb):
|
||||
# checked the case of a new GSI key being a real column because it was an
|
||||
# LSI key. In this test the GSI key is a real column because it was a
|
||||
# key column of the base table itself.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_backfill_with_real_column(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[
|
||||
@@ -237,7 +184,6 @@ def test_gsi_backfill_with_real_column(dynamodb):
|
||||
assert multiset(items) == multiset(full_scan(table, ConsistentRead=False, IndexName='gsi'))
|
||||
|
||||
# Test deleting an existing GSI using UpdateTable
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_delete(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
@@ -285,7 +231,6 @@ def test_gsi_delete(dynamodb):
|
||||
# still enforced because it is still an LSI key. In Alternator's
|
||||
# implementation this happens because the LSI key column was - and remains -
|
||||
# a real column in the schema.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_delete_with_lsi(dynamodb):
|
||||
# A table whose non-key column "x" serves as a range key in an LSI,
|
||||
# and partition key in a GSI.
|
||||
@@ -315,6 +260,10 @@ def test_gsi_delete_with_lsi(dynamodb):
|
||||
'Projection': { 'ProjectionType': 'ALL' }
|
||||
}
|
||||
]) as table:
|
||||
# We shouldn't need to wait for a GSI created together with the
|
||||
# table, but let's do it anyway to work around bug #9059 (which
|
||||
# isn't what this test is trying to reproduce).
|
||||
wait_for_gsi(table, 'gsi')
|
||||
items = [{'p': random_string(), 'c': random_string(), 'x': random_string()} for i in range(10)]
|
||||
with table.batch_writer() as batch:
|
||||
for item in items:
|
||||
@@ -362,7 +311,6 @@ def test_gsi_delete_with_lsi(dynamodb):
|
||||
# operation on a table set up by CreateTable. In this test we try several
|
||||
# of these operations in sequence, to check we can add more than one GSI,
|
||||
# delete a GSI that we just added, recreate a GSI that we just deleted, etc.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_creates_and_deletes(dynamodb):
|
||||
schema = {
|
||||
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
@@ -491,7 +439,6 @@ def test_gsi_backfill_empty_string(dynamodb):
|
||||
# happens during the table creation, and one here where the second GSI is
|
||||
# added after the table already exists with the first GSI.
|
||||
# Reproduces #13870.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_key_type_conflict_on_update(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
@@ -532,7 +479,6 @@ def table1(dynamodb):
|
||||
yield table
|
||||
|
||||
# An empty update_table() call, without any parameters changed, is not allowed.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_updatetable_empty(dynamodb, table1):
|
||||
with pytest.raises(ClientError, match='ValidationException.*UpdateTable'):
|
||||
dynamodb.meta.client.update_table(TableName=table1.name)
|
||||
@@ -543,7 +489,6 @@ def test_updatetable_empty(dynamodb, table1):
|
||||
GlobalSecondaryIndexUpdates=[])
|
||||
|
||||
# Test various invalid cases of UpdateTable's GlobalSecondaryIndexUpdates.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_updatetable_errors(dynamodb, table1):
|
||||
client = dynamodb.meta.client
|
||||
|
||||
@@ -637,7 +582,6 @@ def test_gsi_updatetable_errors(dynamodb, table1):
|
||||
# In Alternator, we decided to detect this case anyway - it can help users
|
||||
# notice problems (see #19784). So because we differ from DynamoDB on this,
|
||||
# this test is marked scylla_only.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_gsi_updatetable_spurious_attribute_definitions(table1, scylla_only):
|
||||
with pytest.raises(ClientError, match='ValidationException.*AttributeDefinitions'):
|
||||
table1.meta.client.update_table(TableName=table1.name,
|
||||
@@ -654,9 +598,132 @@ def test_gsi_updatetable_spurious_attribute_definitions(table1, scylla_only):
|
||||
|
||||
# Check that attempting to delete a GSI that doesn't exist results in
|
||||
# the expected ResourceNotFoundException.
|
||||
@pytest.mark.xfail(reason="issue #11567")
|
||||
def test_updatetable_delete_missing_gsi(dynamodb, table1):
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||||
dynamodb.meta.client.update_table(TableName=table1.name,
|
||||
GlobalSecondaryIndexUpdates=[{ 'Delete':
|
||||
{ 'IndexName': 'nonexistent' } }])
|
||||
|
||||
# Whereas DynamoDB allows attribute values to reach a generous length (they
|
||||
# are only limited by the item's size limit, 400 KB), an attribute which is
|
||||
# a *key* has much stricter limits - 2048 bytes for a partition key, 1024
|
||||
# bytes for a sort key. This means that if a table has a GSI or LSI and
|
||||
# one of the attributes serves as a key in that GSI and LSI, DynamoDB
|
||||
# limits its length. In the tests test_gsi.py::test_gsi_limit_* we verified
|
||||
# that attempts to write an oversized value to an attribute which is a
|
||||
# GSI key are rejected. Here we test what happens when adding a GSI to
|
||||
# a table with pre-existing data, which already includes items with oversized
|
||||
# values for the key attribute. These items can't be "rejected" - they
|
||||
# are already in the base table - but should be skipped while filling the
|
||||
# GSI. What we don't want to happen is to see the view building hang,
|
||||
# as described in issue #8627 and #10347.
|
||||
# The first test here, test_gsi_backfill_oversized_key(), doesn't check the
|
||||
# specific limits of 2048 and 1024 bytes, it only checks that an item with
|
||||
# a 65 KB attribute (above Scylla's internal limitations for keys) are
|
||||
# cleanly skipped and don't cause view build hangs. The following test
|
||||
# test_gsi_backfill_key_limits will check the specific limits.
|
||||
def test_gsi_backfill_oversized_key(dynamodb):
|
||||
# First create, and fill, a table without GSI:
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table:
|
||||
p1 = random_string()
|
||||
p2 = random_string()
|
||||
c = random_string()
|
||||
# Create two items, one has a small "x" attribute, the other has
|
||||
# a 65 KB "x" attribute.
|
||||
table.put_item(Item={'p': p1, 'c': c, 'x': 'hello'})
|
||||
table.put_item(Item={'p': p2, 'c': c, 'x': 'a'*66500})
|
||||
# Now use UpdateTable to create two GSIs. In one of them "x" will be
|
||||
# the partition key, and in the other "x" will be a sort key.
|
||||
# DynamoDB limits the number of indexes that can be added in one
|
||||
# UpdateTable command to just one, so we need to do it in two separate
|
||||
# commands and wait for each to complete.
|
||||
dynamodb.meta.client.update_table(TableName=table.name,
|
||||
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }],
|
||||
GlobalSecondaryIndexUpdates=[
|
||||
{ 'Create': { 'IndexName': 'index1',
|
||||
'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }],
|
||||
'Projection': { 'ProjectionType': 'ALL' }}
|
||||
}
|
||||
])
|
||||
wait_for_gsi(table, 'index1')
|
||||
dynamodb.meta.client.update_table(TableName=table.name,
|
||||
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' }],
|
||||
GlobalSecondaryIndexUpdates=[
|
||||
{ 'Create': { 'IndexName': 'index2',
|
||||
'KeySchema': [{ 'AttributeName': 'c', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'x', 'KeyType': 'RANGE' }],
|
||||
'Projection': { 'ProjectionType': 'ALL' }}
|
||||
}
|
||||
])
|
||||
wait_for_gsi(table, 'index2')
|
||||
# Verify that the items with the oversized x are missing from both
|
||||
# GSIs, so only the one item with x = hello should appear in both.
|
||||
# Note that we don't need to retry the reads here (i.e., use the
|
||||
# assert_index_scan() or assert_index_query() functions) because after
|
||||
# we waited for backfilling to complete, we know all the pre-existing
|
||||
# data is already in the index.
|
||||
assert [{'p': p1, 'c': c, 'x': 'hello'}] == full_scan(table, ConsistentRead=False, IndexName='index1')
|
||||
assert [{'p': p1, 'c': c, 'x': 'hello'}] == full_scan(table, ConsistentRead=False, IndexName='index2')
|
||||
|
||||
# The previous test, test_gsi_backfill_oversized_key(), checked that a
|
||||
# grossly oversized GSI key attribute (over Scylla's internal key limit
|
||||
# of 64 KB) doesn't hang the view building process. This test verifies
|
||||
# more specifically that DynamoDB's documented limits - 2048 bytes for
|
||||
# a GSI partition key and 1024 for a GSI sort key - are implemented. An
|
||||
# item that has an attribute longer than that should simply be skipped
|
||||
# during view building.
|
||||
# Reproduces issue #10347.
|
||||
@pytest.mark.xfail(reason="issue #10347: key length limits not enforced")
|
||||
def test_gsi_backfill_key_limits(dynamodb):
|
||||
# First create, and fill, a table without GSI:
|
||||
with new_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table:
|
||||
# Create four items, with 'x' attribute sizes of 1024, 1025, 2048
|
||||
# and 2049. Only one item (1024) has x suitable for a sort key,
|
||||
# and three (1024, 1025 and 2048) have length suitable for a partition
|
||||
# key. The unsuitable items will be missing from the indexes.
|
||||
lengths = [1024, 1025, 2048, 2049]
|
||||
p = [random_string() for length in lengths]
|
||||
x = ['a'*length for length in lengths]
|
||||
c = random_string()
|
||||
for i in range(len(lengths)):
|
||||
table.put_item(Item={'p': p[i], 'c': c, 'x': x[i]})
|
||||
# Now use UpdateTable to create two GSIs. In one of them "x" will be
|
||||
# the partition key, and in the other "x" will be a sort key.
|
||||
# DynamoDB limits the number of indexes that can be added in one
|
||||
# UpdateTable command to just one, so we need to do it in two separate
|
||||
# commands and wait for each to complete.
|
||||
dynamodb.meta.client.update_table(TableName=table.name,
|
||||
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }],
|
||||
GlobalSecondaryIndexUpdates=[
|
||||
{ 'Create': { 'IndexName': 'index1',
|
||||
'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }],
|
||||
'Projection': { 'ProjectionType': 'ALL' }}
|
||||
}
|
||||
])
|
||||
wait_for_gsi(table, 'index1')
|
||||
dynamodb.meta.client.update_table(TableName=table.name,
|
||||
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' }],
|
||||
GlobalSecondaryIndexUpdates=[
|
||||
{ 'Create': { 'IndexName': 'index2',
|
||||
'KeySchema': [{ 'AttributeName': 'c', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'x', 'KeyType': 'RANGE' }],
|
||||
'Projection': { 'ProjectionType': 'ALL' }}
|
||||
}
|
||||
])
|
||||
wait_for_gsi(table, 'index2')
|
||||
# Verify that the items with the oversized x are missing from both
|
||||
# GSIs. For index1 (x is a partition key, limited to 2048 bytes)
|
||||
# items 0,1,2 should appear, for index2 (x is a sort key, limited
|
||||
# to 1024 bytes), only item 0 should appear.
|
||||
assert multiset([{'p': p[i], 'c': c, 'x': x[i]} for i in range(3)]) == multiset(full_scan(table, ConsistentRead=False, IndexName='index1'))
|
||||
assert [{'p': p[0], 'c': c, 'x': x[0]}] == full_scan(table, ConsistentRead=False, IndexName='index2')
|
||||
|
||||
@@ -245,3 +245,53 @@ def scylla_inject_error(rest_api, err, one_shot=False):
|
||||
def scylla_log(optional_rest_api, message, level):
|
||||
if optional_rest_api:
|
||||
requests.post(f'{optional_rest_api}/system/log?message={requests.utils.quote(message)}&level={level}')
|
||||
|
||||
# UpdateTable for creating a GSI is an asynchronous operation. The table's
|
||||
# TableStatus changes from ACTIVE to UPDATING for a short while, and then
|
||||
# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING,
|
||||
# until eventually (in Amazon DynamoDB - it tests a *long* time...) it
|
||||
# becomes ACTIVE. During the CREATING phase, at some point the Backfilling
|
||||
# attribute also appears, until it eventually disappears. We need to wait
|
||||
# until all three markers indicate completion.
|
||||
# Unfortunately, while boto3 has a client.get_waiter('table_exists') to
|
||||
# wait for a table to exists, there is no such function to wait for an
|
||||
# index to come up, so we need to code it ourselves.
|
||||
def wait_for_gsi(table, gsi_name):
|
||||
start_time = time.time()
|
||||
# The timeout needs to be long because on Amazon DynamoDB, even on a
|
||||
# a tiny table, it sometimes takes minutes.
|
||||
while time.time() < start_time + 600:
|
||||
desc = table.meta.client.describe_table(TableName=table.name)
|
||||
table_status = desc['Table']['TableStatus']
|
||||
if table_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
||||
assert len(index_desc) == 1
|
||||
index_status = index_desc[0]['IndexStatus']
|
||||
if index_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
# When the index is ACTIVE, this must be after backfilling completed
|
||||
assert not 'Backfilling' in index_desc[0]
|
||||
return
|
||||
raise AssertionError("wait_for_gsi did not complete")
|
||||
|
||||
# Similarly to how wait_for_gsi() waits for a GSI to finish adding,
|
||||
# this function waits for a GSI to be finally deleted.
|
||||
def wait_for_gsi_gone(table, gsi_name):
|
||||
start_time = time.time()
|
||||
while time.time() < start_time + 600:
|
||||
desc = table.meta.client.describe_table(TableName=table.name)
|
||||
table_status = desc['Table']['TableStatus']
|
||||
if table_status != 'ACTIVE':
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
if 'GlobalSecondaryIndexes' in desc['Table']:
|
||||
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
||||
if len(index_desc) != 0:
|
||||
index_status = index_desc[0]['IndexStatus']
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
return
|
||||
raise AssertionError("wait_for_gsi_gone did not complete")
|
||||
|
||||
Reference in New Issue
Block a user