untyped_result_set: mark get_blob() as returning unfragmented data
Blobs can be large, and unfragmented blobs can easily exceed 128k (as seen in #23903). Rename get_blob() to get_blob_unfragmented() to warn users. Note that most uses are fine as the blobs are really short strings. Closes scylladb/scylladb#24102
This commit is contained in:
committed by
Patryk Jędrzejczak
parent
ff8a119f26
commit
f195c05b0d
@@ -885,7 +885,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
for (const auto& col : schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(
|
||||
col.type->deserialize(row.get_blob(col.name_as_text())));
|
||||
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ static future<record> require_record(cql3::query_processor& qp, std::string_view
|
||||
}
|
||||
|
||||
static bool has_can_login(const cql3::untyped_result_set_row& row) {
|
||||
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob("can_login")).is_null());
|
||||
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
|
||||
}
|
||||
|
||||
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
|
||||
@@ -1442,7 +1442,7 @@ indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
|
||||
const auto& columns = row.get_columns();
|
||||
pk_columns.reserve(columns.size());
|
||||
for (const auto& column : columns) {
|
||||
pk_columns.push_back(row.get_blob(column->name->to_string()));
|
||||
pk_columns.push_back(row.get_blob_unfragmented(column->name->to_string()));
|
||||
}
|
||||
auto pk = partition_key::from_exploded(*_schema, pk_columns);
|
||||
auto dk = dht::decorate_key(*_schema, pk);
|
||||
@@ -1500,12 +1500,12 @@ indexed_table_select_statement::find_index_clustering_rows(query_processor& qp,
|
||||
for (size_t i = 0; i < rs.size(); i++) {
|
||||
const auto& row = rs.at(i);
|
||||
auto pk_columns = _schema->partition_key_columns() | std::views::transform([&] (auto& cdef) {
|
||||
return row.get_blob(cdef.name_as_text());
|
||||
return row.get_blob_unfragmented(cdef.name_as_text());
|
||||
});
|
||||
auto pk = partition_key::from_range(pk_columns);
|
||||
auto dk = dht::decorate_key(*_schema, pk);
|
||||
auto ck_columns = _schema->clustering_key_columns() | std::views::transform([&] (auto& cdef) {
|
||||
return row.get_blob(cdef.name_as_text());
|
||||
return row.get_blob_unfragmented(cdef.name_as_text());
|
||||
});
|
||||
auto ck = clustering_key::from_range(ck_columns);
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
|
||||
bool has(std::string_view) const;
|
||||
view_type get_view(std::string_view name) const;
|
||||
bytes get_blob(std::string_view name) const {
|
||||
bytes get_blob_unfragmented(std::string_view name) const {
|
||||
return to_bytes(get_view(name));
|
||||
}
|
||||
managed_bytes get_blob_fragmented(std::string_view name) const {
|
||||
@@ -128,7 +128,7 @@ public:
|
||||
value_cast<set_type_impl::native_type>(
|
||||
set_type_impl::get_instance(valtype,
|
||||
false)->deserialize(
|
||||
get_blob(name)));
|
||||
get_blob_unfragmented(name)));
|
||||
std::transform(vec.begin(), vec.end(), out, [](auto& p) {
|
||||
return value_cast<V>(p);
|
||||
});
|
||||
|
||||
@@ -185,7 +185,7 @@ future<> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cle
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
auto data = row.get_blob("data");
|
||||
auto data = row.get_blob_unfragmented("data");
|
||||
|
||||
blogger.debug("Replaying batch {}", id);
|
||||
|
||||
|
||||
@@ -449,7 +449,7 @@ public:
|
||||
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
|
||||
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
|
||||
return parallel_for_each(*result, [this, &dst](row_type& row) {
|
||||
auto name = row.get_blob("type_name");
|
||||
auto name = row.get_blob_unfragmented("type_name");
|
||||
auto columns = row.get_list<bytes>("field_names");
|
||||
auto types = row.get_list<sstring>("field_types");
|
||||
std::vector<data_type> field_types;
|
||||
|
||||
@@ -2787,7 +2787,7 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_i
|
||||
for (const auto& row : *results) {
|
||||
auto kind = deserialize_kind(row.get_as<sstring>("kind"));
|
||||
auto type = cql_type_parser::parse("" /*unused*/, row.get_as<sstring>("type"), data_dictionary::dummy_user_types_storage());
|
||||
auto name_bytes = row.get_blob("column_name_bytes");
|
||||
auto name_bytes = row.get_blob_unfragmented("column_name_bytes");
|
||||
column_id position = row.get_as<int32_t>("position");
|
||||
|
||||
auto order = row.get_as<sstring>("clustering_order");
|
||||
|
||||
@@ -1872,7 +1872,7 @@ future<system_keyspace::commitlog_cleanup_map> system_keyspace::get_commitlog_cl
|
||||
}
|
||||
|
||||
static set_type_impl::native_type deserialize_set_column(const schema& s, const cql3::untyped_result_set_row& row, const char* name) {
|
||||
auto blob = row.get_blob(name);
|
||||
auto blob = row.get_blob_unfragmented(name);
|
||||
auto cdef = s.get_column_definition(name);
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
return value_cast<set_type_impl::native_type>(deserialized);
|
||||
@@ -2655,7 +2655,7 @@ future<service::paxos::paxos_state> system_keyspace::load_paxos_state(partition_
|
||||
std::optional<service::paxos::proposal> accepted;
|
||||
if (row.has("proposal")) {
|
||||
accepted = service::paxos::proposal(row.get_as<utils::UUID>("proposal_ballot"),
|
||||
ser::deserialize_from_buffer<>(row.get_blob("proposal"), std::type_identity<frozen_mutation>(), 0));
|
||||
ser::deserialize_from_buffer<>(row.get_blob_unfragmented("proposal"), std::type_identity<frozen_mutation>(), 0));
|
||||
}
|
||||
|
||||
std::optional<service::paxos::proposal> most_recent;
|
||||
@@ -2663,7 +2663,7 @@ future<service::paxos::paxos_state> system_keyspace::load_paxos_state(partition_
|
||||
// the value can be missing if it was pruned, supply empty one since
|
||||
// it will not going to be used anyway
|
||||
auto fm = row.has("most_recent_commit") ?
|
||||
ser::deserialize_from_buffer<>(row.get_blob("most_recent_commit"), std::type_identity<frozen_mutation>(), 0) :
|
||||
ser::deserialize_from_buffer<>(row.get_blob_unfragmented("most_recent_commit"), std::type_identity<frozen_mutation>(), 0) :
|
||||
freeze(mutation(s, key));
|
||||
most_recent = service::paxos::proposal(row.get_as<utils::UUID>("most_recent_commit_at"),
|
||||
std::move(fm));
|
||||
|
||||
@@ -2885,7 +2885,7 @@ future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::syst
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col: schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(col.type->deserialize(row.get_blob(col.name_as_text())));
|
||||
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
|
||||
@@ -835,7 +835,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col: schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(col.type->deserialize(row.get_blob(col.name_as_text())));
|
||||
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user