lwt: for each statement in cas_request provide a row in CAS result set

Previously batch statement result set included rows for only
those updates which have a prefetch data present (i.e. there
was an "old" (pre-existing) row for a key).

Also, these rows were sorted not in the order in which statements
appear in the batch, but in the order of updated clustering keys.

If we have a batch which updates a few non-existent keys, then
it's impossible to figure out which update inserted a new key
by looking at the query response. Not only because the responses
may not correspond to the order of statements in the batch, but
even some rows may not show up in the result set at all.

The patch proposes the following fix:

For conditional batch statements the result set now always
includes a row for each LWT statement, in the same order
in which individual statements appear in the batch.

This way we can always tell which update did actually insert
a new key or update the existing one.

`update_parameters::prefetch_data::row::is_in_cas_result_set`
member variable was removed as well as supporting code in
`cas_request::applies_to` which iterated through cas updates
and marked individual `prefetch_data` rows as "need to be in
cas result set".

Instead now `cas_request::applies_to` is significantly
simplified since it doesn't do anything more than checking
`stmt.applies_to()` in short-circuiting manner.

A few tests for the issue are written, other lwt-batch-related
tests were adjusted accordingly to include rows in result set
for each statement inside conditional batches.

Tests: unit(dev, debug)

Co-authored-by: Konstantin Osipov <kostja@scylladb.com>
Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2020-09-04 13:13:26 +03:00
parent feaf2b6320
commit 92fd515186
5 changed files with 377 additions and 75 deletions

View File

@@ -123,46 +123,16 @@ lw_shared_ptr<query::read_command> cas_request::read_command(service::storage_pr
}
bool cas_request::applies_to() const {
const partition_key& pkey = _key.front().start()->value().key().value();
const clustering_key empty_ckey = clustering_key::make_empty();
bool applies = true;
bool is_cas_result_set_empty = true;
bool has_static_column_conditions = false;
for (const cas_row_update& op: _updates) {
if (op.statement.has_conditions() == false) {
if (!op.statement.has_conditions()) {
continue;
}
if (op.statement.has_static_column_conditions()) {
has_static_column_conditions = true;
}
const auto* row = find_old_row(op);
if (row) {
row->is_in_cas_result_set = true;
is_cas_result_set_empty = false;
}
if (!applies) {
// No need to check this condition as we have already failed a previous one.
// Continuing the loop just to set is_in_cas_result_set flag for all involved
// statements, which is necessary to build the CAS result set.
continue;
}
applies = op.statement.applies_to(row, op.options);
}
if (has_static_column_conditions && is_cas_result_set_empty) {
// If none of the fetched rows matches clustering key restrictions and hence none of them is
// included into the CAS result set, but there is a static column condition in the CAS batch,
// we must still include the static row into the result set. Consider the following example:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// DELETE v FROM t WHERE p=1 AND c=1 IF v=1 AND s=1;
// In this case the conditional DELETE must return [applied=False, v=null, s=1].
const auto* row = _rows.find_row(pkey, empty_ckey);
if (row) {
row->is_in_cas_result_set = true;
// No need to check subsequent conditions as we have already failed the current one.
if (!op.statement.applies_to(find_old_row(op), op.options)) {
return false;
}
}
return applies;
return true;
}
std::optional<mutation> cas_request::apply(foreign_ptr<lw_shared_ptr<query::result>> qr,
@@ -197,45 +167,66 @@ const update_parameters::prefetch_data::row* cas_request::find_old_row(const cas
seastar::shared_ptr<cql_transport::messages::result_message>
cas_request::build_cas_result_set(seastar::shared_ptr<cql3::metadata> metadata,
const column_set& columns,
bool is_applied) const {
const column_set& columns,
bool is_applied) const {
const partition_key& pkey = _key.front().start()->value().key().value();
const clustering_key empty_ckey = clustering_key::make_empty();
auto result_set = std::make_unique<cql3::result_set>(metadata);
for (const auto& it : _rows.rows) {
const update_parameters::prefetch_data::row& cell_map = it.second;
if (!cell_map.is_in_cas_result_set) {
continue;
}
std::vector<bytes_opt> row;
row.reserve(metadata->value_count());
row.emplace_back(boolean_type->decompose(is_applied));
for (ordinal_column_id id = columns.find_first(); id != column_set::npos; id = columns.find_next(id)) {
const auto it = cell_map.cells.find(id);
if (it == cell_map.cells.end()) {
row.emplace_back(bytes_opt{});
} else {
const data_value& cell = it->second;
const abstract_type& cell_type = *cell.type();
const abstract_type& column_type = *_rows.schema->column_at(id).type;
if (column_type.is_listlike() && cell_type.is_map()) {
// List/sets are fetched as maps, but need to be stored as sets.
const listlike_collection_type_impl& list_type = static_cast<const listlike_collection_type_impl&>(column_type);
const map_type_impl& map_type = static_cast<const map_type_impl&>(cell_type);
row.emplace_back(list_type.serialize_map(map_type, cell));
} else {
row.emplace_back(cell_type.decompose(cell));
}
for (const cas_row_update& op: _updates) {
// Construct the result set row
std::vector<bytes_opt> rs_row;
rs_row.reserve(metadata->value_count());
rs_row.emplace_back(boolean_type->decompose(is_applied));
// Get old row from prefetched data for the row update
const auto* old_row = find_old_row(op);
if (!old_row) {
if (!op.statement.has_static_column_conditions()) {
// In case there is no old row, leave all other columns null
// so that we can infer whether the update attempts to insert a
// non-existing row.
rs_row.resize(metadata->value_count());
result_set->add_row(std::move(rs_row));
continue;
}
// If none of the fetched rows matches clustering key restrictions,
// but there is a static column condition in the CAS batch,
// we must still include the static row into the result set. Consider the following example:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// DELETE v FROM t WHERE p=1 AND c=1 IF v=1 AND s=1;
// In this case the conditional DELETE must return [applied=False, v=null, s=1].
old_row = _rows.find_row(pkey, empty_ckey);
if (!old_row) {
// In case there is no old row, leave all other columns null
// so that we can infer whether the update attempts to insert a
// non-existing row.
rs_row.resize(metadata->value_count());
result_set->add_row(std::move(rs_row));
continue;
}
}
result_set->add_row(std::move(row));
}
if (result_set->empty()) {
// Is the case when, e.g., IF EXISTS or IF NOT EXISTS finds no row.
std::vector<bytes_opt> row;
row.emplace_back(boolean_type->decompose(is_applied));
row.resize(metadata->value_count());
result_set->add_row(std::move(row));
// Fill in the cells from prefetch data (old row) into the result set row
for (ordinal_column_id id = columns.find_first(); id != column_set::npos; id = columns.find_next(id)) {
const auto it = old_row->cells.find(id);
if (it == old_row->cells.end()) {
rs_row.emplace_back(bytes_opt{});
continue;
}
const data_value& cell = it->second;
const abstract_type& cell_type = *cell.type();
const abstract_type& column_type = *_rows.schema->column_at(id).type;
if (column_type.is_listlike() && cell_type.is_map()) {
// List/sets are fetched as maps, but need to be stored as sets.
const listlike_collection_type_impl& list_type = static_cast<const listlike_collection_type_impl&>(column_type);
const map_type_impl& map_type = static_cast<const map_type_impl&>(cell_type);
rs_row.emplace_back(list_type.serialize_map(map_type, cell));
} else {
rs_row.emplace_back(cell_type.decompose(cell));
}
}
result_set->add_row(std::move(rs_row));
}
cql3::result result(std::move(result_set));
return seastar::make_shared<cql_transport::messages::result_message::rows>(std::move(result));

View File

@@ -96,10 +96,6 @@ public:
struct row {
// Order CAS columns by ordinal column id.
std::map<ordinal_column_id, data_value> cells;
// Set if the statement is used for checking conditions of a CAS request.
// Only those statements that have this flag set should be included into
// the CAS result set.
mutable bool is_in_cas_result_set = false;
// Return true if this row has at least one static column set.
bool has_static_columns(const schema& schema) const {
if (!schema.has_static_columns()) {

View File

@@ -1105,6 +1105,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -1130,6 +1133,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -1532,6 +1538,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true",
"a" : "2",
"b" : "2"
},
{
"[applied]" : "true",
"a" : "2",
@@ -1560,6 +1571,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true",
"a" : "4",
"b" : "4"
},
{
"[applied]" : "true",
"a" : "4",
@@ -1588,6 +1604,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true",
"a" : "5",
"b" : "5"
},
{
"[applied]" : "true",
"a" : "5",
@@ -1616,6 +1637,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -1642,6 +1666,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "3",
"b" : "3"
},
{
"[applied]" : "false",
"a" : "3",
@@ -1656,6 +1685,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "3",
"b" : "3"
},
{
"[applied]" : "false",
"a" : "3",
@@ -1670,6 +1704,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "3",
"b" : "3"
},
{
"[applied]" : "false",
"a" : "3",
@@ -1684,6 +1723,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "3",
"b" : "3"
},
{
"[applied]" : "false",
"a" : "3",
@@ -1698,6 +1742,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "3",
"b" : "3"
},
{
"[applied]" : "false",
"a" : "3",
@@ -1724,6 +1773,11 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "6",
"b" : "6"
},
{
"[applied]" : "false",
"a" : "6",
@@ -1970,6 +2024,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -1995,6 +2052,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2011,6 +2071,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2027,6 +2090,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2043,6 +2109,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2059,6 +2128,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2077,6 +2149,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "false"
},
{
"[applied]" : "false"
}
@@ -2093,6 +2168,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -2117,6 +2195,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -2141,6 +2222,9 @@ APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}

View File

@@ -221,4 +221,46 @@ begin batch
insert into lwt (key, ck, cv) values (1, 0, {'b', 'c'})
apply batch;
select * from lwt;
drop table lwt;
drop table lwt;
--
-- A test case for Issue #7113
-- Return one row per each LWT statement
-- in a batch, in statement order.
--
CREATE TABLE IF NOT EXISTS gh7113 (
part int,
key int,
lwt_trivial int,
int1 int,
int2 int,
PRIMARY KEY (part, key)
);
BEGIN BATCH
UPDATE gh7113 SET int1 = 6 WHERE part = 0 AND key = 4 IF lwt_trivial = null
APPLY BATCH;
BEGIN BATCH
UPDATE gh7113 SET int2 = 0, int1 = 0 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 1, int1 = 6 WHERE part = 0 AND key = 7 IF lwt_trivial = null
APPLY BATCH;
BEGIN BATCH
UPDATE gh7113 SET int2 = 0, int1 = 2 WHERE part = 0 AND key = 9 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 7 WHERE part = 0 AND key = 0 IF lwt_trivial = null
APPLY BATCH;
BEGIN BATCH
UPDATE gh7113 SET int1 = 6, int2 = 7 WHERE part = 0 AND key = 1 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 4 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 2 WHERE part = 0 AND key = 3 IF lwt_trivial = null
APPLY BATCH;
BEGIN BATCH
UPDATE gh7113 SET int2 = 1 WHERE part = 0 AND key = 4 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 1 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 4, int2 = 8 WHERE part = 0 AND key = 9 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 0, int2 = 9 WHERE part = 0 AND key = 0 IF lwt_trivial = null
APPLY BATCH;

View File

@@ -68,6 +68,9 @@ apply batch;
"[applied]" : "false",
"a" : "1",
"b" : "1"
},
{
"[applied]" : "false"
}
]
}
@@ -83,6 +86,9 @@ apply batch;
"[applied]" : "false",
"a" : "1",
"b" : "1"
},
{
"[applied]" : "false"
}
]
}
@@ -127,6 +133,11 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "false",
"a" : "1",
"b" : "1"
},
{
"[applied]" : "false",
"a" : "1",
@@ -204,6 +215,9 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -245,6 +259,9 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
@@ -279,6 +296,9 @@ apply batch;
"b" : "1",
"c" : "[\"1\"]",
"d" : "[\"1\", \"2\"]"
},
{
"[applied]" : "true"
}
]
}
@@ -310,6 +330,9 @@ apply batch;
"a" : "1",
"b" : "1",
"c" : "[\"1\", \"3\"]"
},
{
"[applied]" : "true"
}
]
}
@@ -450,6 +473,13 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true",
"a" : "1",
"b" : "1",
"c" : "1",
"d" : "1"
},
{
"[applied]" : "true",
"a" : "1",
@@ -543,6 +573,12 @@ apply batch;
"c" : "1",
"i" : "1",
"p" : "1"
},
{
"[applied]" : "false",
"c" : "2",
"i" : "2",
"p" : "1"
}
]
}
@@ -559,6 +595,12 @@ apply batch;
"c" : "1",
"i" : "1",
"p" : "1"
},
{
"[applied]" : "true",
"c" : "2",
"i" : "2",
"p" : "1"
}
]
}
@@ -572,6 +614,13 @@ apply batch;
[
{
"[applied]" : "false"
},
{
"[applied]" : "false",
"c" : "2",
"i" : "2",
"l" : "[1, 3, 4]",
"p" : "1"
}
]
}
@@ -589,6 +638,13 @@ apply batch;
"i" : "2",
"l" : "[1, 2]",
"p" : "1"
},
{
"[applied]" : "true",
"c" : "2",
"i" : "2",
"l" : "[1, 3, 4]",
"p" : "1"
}
]
}
@@ -633,6 +689,9 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true",
"p" : "1",
@@ -690,6 +749,12 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true",
"ck" : "0",
"cv" : "[\"a\", \"b\"]",
"key" : "1"
},
{
"[applied]" : "true",
"ck" : "0",
@@ -730,6 +795,12 @@ apply batch;
{
"rows" :
[
{
"[applied]" : "true",
"ck" : "0",
"cv" : "[\"a\", \"b\"]",
"key" : "1"
},
{
"[applied]" : "true",
"ck" : "0",
@@ -753,3 +824,121 @@ drop table lwt;
{
"status" : "ok"
}
--
-- A test case for Issue #7113
-- Return one row per each LWT statement
-- in a batch, in statement order.
--
CREATE TABLE IF NOT EXISTS gh7113 (
part int,
key int,
lwt_trivial int,
int1 int,
int2 int,
PRIMARY KEY (part, key)
);
{
"status" : "ok"
}
BEGIN BATCH
UPDATE gh7113 SET int1 = 6 WHERE part = 0 AND key = 4 IF lwt_trivial = null
APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
}
]
}
BEGIN BATCH
UPDATE gh7113 SET int2 = 0, int1 = 0 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 1, int1 = 6 WHERE part = 0 AND key = 7 IF lwt_trivial = null
APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true"
}
]
}
BEGIN BATCH
UPDATE gh7113 SET int2 = 0, int1 = 2 WHERE part = 0 AND key = 9 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 7 WHERE part = 0 AND key = 0 IF lwt_trivial = null
APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true",
"key" : "0",
"part" : "0"
}
]
}
BEGIN BATCH
UPDATE gh7113 SET int1 = 6, int2 = 7 WHERE part = 0 AND key = 1 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 4 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 2 WHERE part = 0 AND key = 3 IF lwt_trivial = null
APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true"
},
{
"[applied]" : "true",
"key" : "0",
"part" : "0"
},
{
"[applied]" : "true"
}
]
}
BEGIN BATCH
UPDATE gh7113 SET int2 = 1 WHERE part = 0 AND key = 4 IF lwt_trivial = null
UPDATE gh7113 SET int2 = 1 WHERE part = 0 AND key = 0 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 4, int2 = 8 WHERE part = 0 AND key = 9 IF lwt_trivial = null
UPDATE gh7113 SET int1 = 0, int2 = 9 WHERE part = 0 AND key = 0 IF lwt_trivial = null
APPLY BATCH;
{
"rows" :
[
{
"[applied]" : "true",
"key" : "4",
"part" : "0"
},
{
"[applied]" : "true",
"key" : "0",
"part" : "0"
},
{
"[applied]" : "true",
"key" : "9",
"part" : "0"
},
{
"[applied]" : "true",
"key" : "0",
"part" : "0"
}
]
}