Merge 'alternator: fix remaining problems with new Stream ARN format' from Nadav Har'El
This small series includes a few followups to the patch that changed Alternator Stream ARNs from using our own UUID format to something that resembles Amazon's Stream ARNs (and the KCL library won't reject as bogus-looking ARNs). The first patch is the most important one, fixing ListStreams's LastEvaluatedStreamArn to also use the new ARN format. It fixes SCYLLADB-539. The following patches are additional cleanups and tests for the new ARN code. Closes scylladb/scylladb#29474 * github.com:scylladb/scylladb: alternator: fix ListStreams paging if table is deleted during paging test/alternator: test DescribeStream on non-existent table alternator: ListStreams: on last page, avoid LastEvaluatedStreamArn alternator: remove dead code stream_shard_id alternator: fix ListStreams to return real ARN as LastEvaluatedStreamArn
This commit is contained in:
@@ -167,46 +167,8 @@ static schema_ptr get_schema_from_arn(service::storage_proxy& proxy, const strea
|
||||
}
|
||||
}
|
||||
|
||||
// ShardId. Must be between 28 and 65 characters inclusive.
|
||||
// UUID is 36 bytes as string (including dashes).
|
||||
// Prepend a version/type marker (`S`) -> 37
|
||||
class stream_shard_id : public utils::UUID {
|
||||
public:
|
||||
using UUID = utils::UUID;
|
||||
static constexpr char marker = 'S';
|
||||
|
||||
stream_shard_id() = default;
|
||||
stream_shard_id(const UUID& uuid)
|
||||
: UUID(uuid)
|
||||
{}
|
||||
stream_shard_id(const table_id& tid)
|
||||
: UUID(tid.uuid())
|
||||
{}
|
||||
stream_shard_id(std::string_view v)
|
||||
: UUID(v.substr(1))
|
||||
{
|
||||
if (v[0] != marker) {
|
||||
throw std::invalid_argument(std::string(v));
|
||||
}
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const stream_shard_id& arn) {
|
||||
const UUID& uuid = arn;
|
||||
return os << marker << uuid;
|
||||
}
|
||||
friend std::istream& operator>>(std::istream& is, stream_shard_id& arn) {
|
||||
std::string s;
|
||||
is >> s;
|
||||
arn = stream_shard_id(s);
|
||||
return is;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace alternator
|
||||
|
||||
template<typename ValueType>
|
||||
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_shard_id>
|
||||
: public from_string_helper<ValueType, alternator::stream_shard_id>
|
||||
{};
|
||||
template<typename ValueType>
|
||||
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_arn>
|
||||
: public from_string_helper<ValueType, alternator::stream_arn>
|
||||
@@ -218,7 +180,8 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
_stats.api_operations.list_streams++;
|
||||
|
||||
auto limit = rjson::get_opt<int>(request, "Limit").value_or(100);
|
||||
auto streams_start = rjson::get_opt<stream_shard_id>(request, "ExclusiveStartStreamArn");
|
||||
auto streams_start = rjson::get_opt<stream_arn>(request, "ExclusiveStartStreamArn");
|
||||
|
||||
auto table = find_table(_proxy, request);
|
||||
auto db = _proxy.data_dictionary();
|
||||
|
||||
@@ -244,34 +207,34 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
cfs = db.get_tables();
|
||||
}
|
||||
|
||||
// # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never
|
||||
// generate duplicates in a paged listing here. Can obviously miss things if they
|
||||
// are added between paged calls and end up with a "smaller" UUID/ARN, but that
|
||||
// is to be expected.
|
||||
// We need to sort the tables to ensure a stable order for paging.
|
||||
// We sort by keyspace and table name, which will also allow us to skip to
|
||||
// the right position by ExclusiveStartStreamArn.
|
||||
auto cmp = [](std::string_view ks1, std::string_view cf1, std::string_view ks2, std::string_view cf2) {
|
||||
return ks1 == ks2 ? cf1 < cf2 : ks1 < ks2;
|
||||
};
|
||||
if (std::cmp_less(limit, cfs.size()) || streams_start) {
|
||||
std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return t1.schema()->id().uuid() < t2.schema()->id().uuid();
|
||||
});
|
||||
std::sort(cfs.begin(), cfs.end(),
|
||||
[&cmp](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return cmp(t1.schema()->ks_name(), t1.schema()->cf_name(),
|
||||
t2.schema()->ks_name(), t2.schema()->cf_name());
|
||||
});
|
||||
}
|
||||
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (streams_start) {
|
||||
i = std::find_if(i, e, [&](const data_dictionary::table& t) {
|
||||
return t.schema()->id().uuid() == streams_start
|
||||
&& cdc::get_base_table(db.real_database(), *t.schema())
|
||||
&& is_alternator_keyspace(t.schema()->ks_name())
|
||||
;
|
||||
});
|
||||
if (i != e) {
|
||||
++i;
|
||||
}
|
||||
i = std::upper_bound(i, e, *streams_start,
|
||||
[&cmp](const stream_arn& arn, const data_dictionary::table& t) {
|
||||
return cmp(arn.keyspace_name(), arn.table_name(),
|
||||
t.schema()->ks_name(), t.schema()->cf_name());
|
||||
});
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
auto streams = rjson::empty_array();
|
||||
std::optional<stream_shard_id> last;
|
||||
std::optional<std::string> last;
|
||||
|
||||
for (;limit > 0 && i != e; ++i) {
|
||||
auto s = i->schema();
|
||||
@@ -282,8 +245,9 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
}
|
||||
if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) {
|
||||
rjson::value new_entry = rjson::empty_object();
|
||||
last = i->schema()->id();
|
||||
|
||||
auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) };
|
||||
last = std::string(arn.unparsed());
|
||||
rjson::add(new_entry, "StreamArn", arn);
|
||||
rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s)));
|
||||
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name())));
|
||||
@@ -294,8 +258,12 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
|
||||
rjson::add(ret, "Streams", std::move(streams));
|
||||
|
||||
if (last) {
|
||||
rjson::add(ret, "LastEvaluatedStreamArn", *last);
|
||||
// Only emit LastEvaluatedStreamArn when we stopped because we hit the
|
||||
// limit (limit == 0), meaning there may be more streams to list.
|
||||
// If we exhausted all tables naturally (limit > 0), there are no more
|
||||
// streams, so we must not emit a cookie.
|
||||
if (last && limit == 0) {
|
||||
rjson::add(ret, "LastEvaluatedStreamArn", rjson::from_string(*last));
|
||||
}
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
}
|
||||
|
||||
@@ -179,6 +179,136 @@ def test_list_streams_paged(dynamodb, dynamodbstreams):
|
||||
break
|
||||
streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])
|
||||
|
||||
# The previous test (test_list_streams_paged) verifies that paging in
|
||||
# ListStreams works by passing the cookie returned as LastEvaluatedStreamArn
|
||||
# from one call, into ExclusiveStartStreamArn given to the next call. But that
|
||||
# test did not check what the value of this "ExclusiveStartStreamArn" looks
|
||||
# like. The DynamoDB documentation says that it should be "The stream ARN of
|
||||
# the item where the operation stopped", so in this test we check that it
|
||||
# indeed is the last returned ARN - and not some opaque cookie of a different
|
||||
# form.
|
||||
# This test also verifies that the final page has no LastEvaluatedStreamArn,
|
||||
# something which test_list_streams_paged also did not check.
|
||||
def test_list_streams_paged_cookie(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1:
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2:
|
||||
wait_for_active_stream(dynamodbstreams, table1)
|
||||
wait_for_active_stream(dynamodbstreams, table2)
|
||||
# Page through all streams one at a time, checking on every page
|
||||
# that LastEvaluatedStreamArn equals the StreamArn of the last
|
||||
# returned stream. The DynamoDB documentation says it should be
|
||||
# "The stream ARN of the item where the operation stopped" - i.e.,
|
||||
# the actual ARN, not an opaque cookie of a different form.
|
||||
# We also verify that the final page has no LastEvaluatedStreamArn,
|
||||
# indicating the end of the list.
|
||||
seen_arns = []
|
||||
response = dynamodbstreams.list_streams(Limit=1)
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
# All but the last page must return exactly one stream because
|
||||
# we used Limit=1, but the last page may return 0 or 1 stream.
|
||||
if 'LastEvaluatedStreamArn' not in response:
|
||||
# Reached the last page.
|
||||
assert len(response['Streams']) <= 1
|
||||
if response['Streams']:
|
||||
seen_arns.append(response['Streams'][0]['StreamArn'])
|
||||
break
|
||||
assert len(response['Streams']) == 1
|
||||
seen_arns.append(response['Streams'][0]['StreamArn'])
|
||||
# The cookie must equal the StreamArn of the last returned item.
|
||||
assert response['LastEvaluatedStreamArn'] == response['Streams'][-1]['StreamArn']
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn'])
|
||||
# For completeness, validate that both test tables were listed in
|
||||
# the result, and there were no duplicates.
|
||||
assert len(seen_arns) == len(set(seen_arns))
|
||||
for table in [table1, table2]:
|
||||
table_arns = [s['StreamArn'] for s in
|
||||
dynamodbstreams.list_streams(TableName=table.name)['Streams']]
|
||||
assert any(arn in seen_arns for arn in table_arns)
|
||||
|
||||
# If the last page of ListStreams results is not full because there are no
|
||||
# more streams to list, it shouldn't have LastEvaluatedStreamArn: It's silly
|
||||
# to return one and then force the user to retrieve another empty page).
|
||||
# Note that we focus on the case of the last page not being full because if
|
||||
# the last page *is* full (list Limit streams), the implementation may not be
|
||||
# aware that there are no more streams to list.
|
||||
def test_list_streams_unfull_last_page_no_cookie(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
wait_for_active_stream(dynamodbstreams, table)
|
||||
# With a very high Limit the page is definitely not full, so
|
||||
# LastEvaluatedStreamArn must be absent. Limit=100 is not very
|
||||
# high, but DynamoDB doesn't allow any higher and it is very
|
||||
# unlikely we'll ever run this test with 100 live streams so
|
||||
# we can expect the page to not be full.
|
||||
response = dynamodbstreams.list_streams(Limit=100)
|
||||
assert 'Streams' in response
|
||||
assert len(response['Streams']) >= 1
|
||||
assert 'LastEvaluatedStreamArn' not in response
|
||||
|
||||
# Test what happens if we do a paging ListStreams, a page returns as
|
||||
# LastEvaluatedStreamArn an ARN of some table's stream, but then we delete
|
||||
# that table before retrieving the next page. The paging should be able to
|
||||
# continue without error, and see the rest of the streams that we haven't
|
||||
# seen yet.
|
||||
def test_list_streams_paged_resume_on_deleted_table(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1:
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2:
|
||||
wait_for_active_stream(dynamodbstreams, table1)
|
||||
wait_for_active_stream(dynamodbstreams, table2)
|
||||
# Page through the streams one at a time (Limit=1), waiting until
|
||||
# we get either table1 or table2.
|
||||
# Set found_table to the table found, other_table to the one not
|
||||
# yet found, and last_arn to LastEvaluatedStreamArn where we stopped.
|
||||
response = dynamodbstreams.list_streams(Limit=1)
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
assert len(response['Streams']) == 1
|
||||
last_arn = response['LastEvaluatedStreamArn']
|
||||
if response['Streams'][0]['TableName'] == table1.name:
|
||||
found_table, other_table = table1, table2
|
||||
break
|
||||
if response['Streams'][0]['TableName'] == table2.name:
|
||||
found_table, other_table = table2, table1
|
||||
break
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=last_arn)
|
||||
# Delete found_table, the one which last_arn that we are holding
|
||||
# refers to.
|
||||
found_table.delete()
|
||||
found_table.meta.client.get_waiter('table_not_exists').wait(TableName=found_table.name)
|
||||
# Try to continue the paging, with last_arn that now refers to the
|
||||
# already deleted table. We expect to be able to continue without
|
||||
# error, and see other_table which we haven't seen yet (and not
|
||||
# see found_table again, of course). Other than that, it's not clear
|
||||
# what we expect to see - it would be nice not to have the list
|
||||
# restarted from scratch, but this test doesn't rule this
|
||||
# implementation out (and I'm not sure we should rule it out).
|
||||
response = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=last_arn)
|
||||
found_both = False
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
if response['Streams']:
|
||||
assert len(response['Streams']) == 1
|
||||
assert response['Streams'][0]['TableName'] != found_table.name
|
||||
if response['Streams'][0]['TableName'] == other_table.name:
|
||||
found_both = True
|
||||
if 'LastEvaluatedStreamArn' not in response:
|
||||
break
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn'])
|
||||
assert found_both
|
||||
# When we'll go out of scope on create_stream_test_table that
|
||||
# create found_table - which we deleted - it will expect this
|
||||
# table to exist and fail when it doesn't. So let's recreate the table.
|
||||
# It doesn't matter which schema we use, it just needs to exist.
|
||||
found_table = dynamodb.create_table(TableName=found_table.name,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ],
|
||||
BillingMode='PAY_PER_REQUEST')
|
||||
found_table.meta.client.get_waiter('table_exists').wait(TableName=found_table.name)
|
||||
|
||||
|
||||
# ListStreams with paging should be able correctly return a full list of
|
||||
# pre-existing streams even if additional tables were added between pages
|
||||
# and caused Scylla's hash table of tables to be reorganized.
|
||||
@@ -294,6 +424,36 @@ def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
|
||||
dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')
|
||||
|
||||
# test_describe_nonexistent_stream checked the case where a StreamArn is
|
||||
# completely bogus. Here we want to check well-formed names that point to
|
||||
# table names that do not exist. We should return ResourceNotFoundException
|
||||
# in this case, not ValidationException.
|
||||
# To reduce our assumptions of what valid ARNs look like, we don't hardcode
|
||||
# any ARN formats here. Instead, we create a stream, get its real ARN, and
|
||||
# then modify it to point to a non-existent table.
|
||||
def test_describe_stream_nonexistent_table(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
streams = dynamodbstreams.list_streams(TableName=table.name)
|
||||
arn = streams['Streams'][0]['StreamArn']
|
||||
# Replace the table name in the ARN with one that doesn't exist.
|
||||
# In both DynamoDB and Alternator, the table name appears in the ARN
|
||||
# at least once, and replacing it yields a well-formed but non-existent
|
||||
# ARN.
|
||||
# In Alternator, the table name appears twice - in both keyspace and
|
||||
# CDC log name parts - so we want to try both replacements to verify
|
||||
# that both incorrect keyspace name and incorrect table names are
|
||||
# handled.
|
||||
nonexistent = table.name + '_nonexistent_' + random_string()
|
||||
count = arn.count(table.name)
|
||||
assert count >= 1 # sanity check that the name appeared in the ARN at all
|
||||
for i in range(count):
|
||||
# Replace only the i-th occurrence of table.name.
|
||||
parts = arn.split(table.name)
|
||||
modified_arn = table.name.join(parts[:i+1]) + nonexistent + table.name.join(parts[i+1:])
|
||||
assert modified_arn != arn
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||||
dynamodbstreams.describe_stream(StreamArn=modified_arn)
|
||||
|
||||
def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
streams = dynamodbstreams.list_streams(TableName=table.name)
|
||||
|
||||
Reference in New Issue
Block a user