alternator test: use ConsistentRead=True for full_query/scan
Many of the Alternator tests use the convenience functions full_query()/ full_scan() to read from the table. Almost all these tests need to be able to read their own writes, i.e., want ConsistentRead=True, but none of them explicitly specified this parameter. Such tests may sporadically fail when running on cluster with multiple nodes. So this patch follows a TODO in the code, and makes ConsistentRead=True the default for the full_*() functions. The caller can still override it with ConsistentRead=False - and this is necessary in the GSI tests, because ConsistentRead=True is not allowed in GSIs. Note that while ConsistentRead=True is now the default for the full_*() convenience functions, but it is still not the default for the lower level boto3 functions scan(), query() and get_item() - so usages of those should be evaluated as well and missing ConsistentRead=True, if any, should be added. Signed-off-by: Nadav Har'El <nyh@scylladb.com> Message-Id: <20200616073821.824784-1-nyh@scylladb.com>
This commit is contained in:
committed by
Piotr Sarna
parent
2f680b3458
commit
c298088375
@@ -37,19 +37,19 @@ from util import create_test_table, random_string, full_scan, full_query, multis
|
||||
# retry.
|
||||
def assert_index_query(table, index_name, expected_items, **kwargs):
|
||||
for i in range(3):
|
||||
if multiset(expected_items) == multiset(full_query(table, IndexName=index_name, **kwargs)):
|
||||
if multiset(expected_items) == multiset(full_query(table, IndexName=index_name, ConsistentRead=False, **kwargs)):
|
||||
return
|
||||
print('assert_index_query retrying')
|
||||
time.sleep(1)
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, **kwargs))
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, ConsistentRead=False, **kwargs))
|
||||
|
||||
def assert_index_scan(table, index_name, expected_items, **kwargs):
|
||||
for i in range(3):
|
||||
if multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, **kwargs)):
|
||||
if multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, ConsistentRead=False, **kwargs)):
|
||||
return
|
||||
print('assert_index_scan retrying')
|
||||
time.sleep(1)
|
||||
assert multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, **kwargs))
|
||||
assert multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, ConsistentRead=False, **kwargs))
|
||||
|
||||
# Although quite silly, it is actually allowed to create an index which is
|
||||
# identical to the base table.
|
||||
@@ -76,7 +76,7 @@ def test_gsi_identical(dynamodb):
|
||||
assert_index_scan(table, 'hello', items)
|
||||
# We can't scan a non-existent index
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
full_scan(table, IndexName='wrong')
|
||||
full_scan(table, ConsistentRead=False, IndexName='wrong')
|
||||
table.delete()
|
||||
|
||||
# One of the simplest forms of a non-trivial GSI: The base table has a hash
|
||||
@@ -308,7 +308,7 @@ def test_gsi_missing_attribute(test_table_gsi_2):
|
||||
# and item will "never" appear in the index. We do this test last,
|
||||
# so if we had a bug and such item did appear, hopefully we had enough
|
||||
# time for the bug to become visible. At least sometimes.
|
||||
assert not any([i['p'] == p2 for i in full_scan(test_table_gsi_2, IndexName='hello')])
|
||||
assert not any([i['p'] == p2 for i in full_scan(test_table_gsi_2, ConsistentRead=False, IndexName='hello')])
|
||||
|
||||
# Test when a table has a GSI, if the indexed attribute has the wrong type,
|
||||
# the update operation is rejected, and is added to neither base table nor
|
||||
@@ -415,15 +415,15 @@ def test_gsi_missing_attribute_3(test_table_gsi_3):
|
||||
# an item will "never" appear in the index. We hope that if a bug exists
|
||||
# and such an item did appear, sometimes the delay here will be enough
|
||||
# for the unexpected item to become visible.
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, ConsistentRead=False, IndexName='hello')])
|
||||
# Same thing for an item with a missing "b" value:
|
||||
test_table_gsi_3.put_item(Item={'p': p, 'a': a})
|
||||
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p, 'a': a}
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, ConsistentRead=False, IndexName='hello')])
|
||||
# And for an item missing both:
|
||||
test_table_gsi_3.put_item(Item={'p': p})
|
||||
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p}
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, ConsistentRead=False, IndexName='hello')])
|
||||
|
||||
# A fourth scenario of GSI. Two GSIs on a single base table.
|
||||
@pytest.fixture(scope="session")
|
||||
@@ -735,10 +735,10 @@ def test_gsi_backfill(dynamodb):
|
||||
# assert_index_scan() or assert_index_query() functions) because after
|
||||
# we waited for backfilling to complete, we know all the pre-existing
|
||||
# data is already in the index.
|
||||
assert multiset(items1) == multiset(full_scan(table, IndexName='hello'))
|
||||
assert multiset(items1) == multiset(full_scan(table, ConsistentRead=False, IndexName='hello'))
|
||||
# We can also use Query on the new GSI, to search on the attribute x:
|
||||
assert multiset([items1[3]]) == multiset(full_query(table,
|
||||
IndexName='hello',
|
||||
ConsistentRead=False, IndexName='hello',
|
||||
KeyConditions={'x': {'AttributeValueList': [items1[3]['x']], 'ComparisonOperator': 'EQ'}}))
|
||||
# Let's also test that we cannot add another index with the same name
|
||||
# that already exists
|
||||
@@ -785,7 +785,7 @@ def test_gsi_delete(dynamodb):
|
||||
wait_for_gsi_gone(table, 'hello')
|
||||
# Now index is gone. We cannot query using it.
|
||||
with pytest.raises(ClientError, match='ValidationException.*hello'):
|
||||
full_query(table, IndexName='hello',
|
||||
full_query(table, ConsistentRead=False, IndexName='hello',
|
||||
KeyConditions={'x': {'AttributeValueList': [items[3]['x']], 'ComparisonOperator': 'EQ'}})
|
||||
table.delete()
|
||||
|
||||
|
||||
@@ -29,19 +29,19 @@ from util import create_test_table, random_string, full_scan, full_query, multis
|
||||
# LSIs support strongly-consistent reads, so the following functions do not
|
||||
# need to retry like we did in test_gsi.py for GSIs:
|
||||
def assert_index_query(table, index_name, expected_items, **kwargs):
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, ConsistentRead=True, **kwargs))
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, **kwargs))
|
||||
def assert_index_scan(table, index_name, expected_items, **kwargs):
|
||||
assert multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, ConsistentRead=True, **kwargs))
|
||||
assert multiset(expected_items) == multiset(full_scan(table, IndexName=index_name, **kwargs))
|
||||
|
||||
# A version doing retries instead of ConsistentRead, to be used just for the
|
||||
# one test below which has both GSI and LSI:
|
||||
def retrying_assert_index_query(table, index_name, expected_items, **kwargs):
|
||||
for i in range(3):
|
||||
if multiset(expected_items) == multiset(full_query(table, IndexName=index_name, **kwargs)):
|
||||
if multiset(expected_items) == multiset(full_query(table, IndexName=index_name, ConsistentRead=False, **kwargs)):
|
||||
return
|
||||
print('retrying_assert_index_query retrying')
|
||||
time.sleep(1)
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, **kwargs))
|
||||
assert multiset(expected_items) == multiset(full_query(table, IndexName=index_name, ConsistentRead=False, **kwargs))
|
||||
|
||||
# Although quite silly, it is actually allowed to create an index which is
|
||||
# identical to the base table.
|
||||
|
||||
@@ -28,14 +28,18 @@ def random_string(length=10, chars=string.ascii_uppercase + string.digits):
|
||||
def random_bytes(length=10):
|
||||
return bytearray(random.getrandbits(8) for _ in range(length))
|
||||
|
||||
# Utility functions for scan and query into an array of items:
|
||||
# TODO: add to full_scan and full_query by default ConsistentRead=True, as
|
||||
# it's not useful for tests without it!
|
||||
def full_scan(table, **kwargs):
|
||||
response = table.scan(**kwargs)
|
||||
# Utility functions for scan and query into an array of items, reading
|
||||
# the full (possibly requiring multiple requests to read successive pages).
|
||||
# For convenience, ConsistentRead=True is used by default, as most tests
|
||||
# need it to run correctly on a multi-node cluster. Callers who need to
|
||||
# override it, can (this is necessary in GSI tests, where ConsistentRead=True
|
||||
# is not supported).
|
||||
def full_scan(table, ConsistentRead=True, **kwargs):
|
||||
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
||||
items = response['Items']
|
||||
while 'LastEvaluatedKey' in response:
|
||||
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
|
||||
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
||||
ConsistentRead=ConsistentRead, **kwargs)
|
||||
items.extend(response['Items'])
|
||||
return items
|
||||
|
||||
@@ -43,8 +47,8 @@ def full_scan(table, **kwargs):
|
||||
# Note that count isn't simply len(items) - the server returns them
|
||||
# independently. e.g., with Select='COUNT' the items are not returned, but
|
||||
# count is.
|
||||
def full_scan_and_count(table, **kwargs):
|
||||
response = table.scan(**kwargs)
|
||||
def full_scan_and_count(table, ConsistentRead=True, **kwargs):
|
||||
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
||||
items = []
|
||||
count = 0
|
||||
if 'Items' in response:
|
||||
@@ -52,7 +56,8 @@ def full_scan_and_count(table, **kwargs):
|
||||
if 'Count' in response:
|
||||
count = count + response['Count']
|
||||
while 'LastEvaluatedKey' in response:
|
||||
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
|
||||
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
||||
ConsistentRead=ConsistentRead, **kwargs)
|
||||
if 'Items' in response:
|
||||
items.extend(response['Items'])
|
||||
if 'Count' in response:
|
||||
@@ -60,11 +65,12 @@ def full_scan_and_count(table, **kwargs):
|
||||
return (count, items)
|
||||
|
||||
# Utility function for fetching the entire results of a query into an array of items
|
||||
def full_query(table, **kwargs):
|
||||
response = table.query(**kwargs)
|
||||
def full_query(table, ConsistentRead=True, **kwargs):
|
||||
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
||||
items = response['Items']
|
||||
while 'LastEvaluatedKey' in response:
|
||||
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
|
||||
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
||||
ConsistentRead=ConsistentRead, **kwargs)
|
||||
items.extend(response['Items'])
|
||||
return items
|
||||
|
||||
@@ -73,8 +79,8 @@ def full_query(table, **kwargs):
|
||||
# Note that count isn't simply len(items) - the server returns them
|
||||
# independently. e.g., with Select='COUNT' the items are not returned, but
|
||||
# count is.
|
||||
def full_query_and_counts(table, **kwargs):
|
||||
response = table.query(**kwargs)
|
||||
def full_query_and_counts(table, ConsistentRead=True, **kwargs):
|
||||
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
||||
items = []
|
||||
prefilter_count = 0
|
||||
postfilter_count = 0
|
||||
@@ -87,7 +93,8 @@ def full_query_and_counts(table, **kwargs):
|
||||
if 'ScannedCount' in response:
|
||||
prefilter_count = prefilter_count + response['ScannedCount']
|
||||
while 'LastEvaluatedKey' in response:
|
||||
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
|
||||
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
||||
ConsistentRead=ConsistentRead, **kwargs)
|
||||
if 'Items' in response:
|
||||
items.extend(response['Items'])
|
||||
pages = pages + 1
|
||||
|
||||
Reference in New Issue
Block a user