alternator: Fix item counting in batch operations
This patch fixes the logic for counting items in batch operations.
Previously, the item count in requests was inaccurate, it count the
number of tabels in get_item and the request_items in write_items.
The new logic correctly counts each individual item in `BatchGetItem`
and `BatchWriteItem` requests.
Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 905408f764)
This commit is contained in:
@@ -2195,7 +2195,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
mutation_builders.reserve(request_items.MemberCount());
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
batch_size++;
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
@@ -2216,6 +2215,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else if (r_name == "DeleteRequest") {
|
||||
const rjson::value& key = (r->value)["Key"];
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
@@ -2226,6 +2226,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Unknown BatchWriteItem request type: {}", r_name));
|
||||
}
|
||||
@@ -3483,7 +3484,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
|
||||
@@ -3497,6 +3498,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rs.add(key);
|
||||
check_key(key, rs.schema);
|
||||
}
|
||||
batch_size += rs.requests.size();
|
||||
requests.emplace_back(std::move(rs));
|
||||
}
|
||||
|
||||
@@ -3504,7 +3506,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
co_await verify_permission(client_state, tr.schema, auth::permission::SELECT);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += requests.size();
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
|
||||
Reference in New Issue
Block a user