Merge "Remove default timeouts" from Botond
" Timeouts defaulted to `db::no_timeout` are dangerous. They allow any modifications to the code to drop timeouts and introduce a source of unbounded request queue to the system. This series removes the last such default timeouts from the code. No problems were found, only test code had to be updated. tests: unit(dev) " * 'no-default-timeouts/v1' of https://github.com/denesb/scylla: database: database::query*(), database::apply*(): remove default timeouts database: table::query(): remove default timeout mutation_query: data_query(): remove default timeout mutation_query: mutation_query(): remove default timeout multishard_mutation_query: query_mutations_on_all_shards(): remove default timeout reader_concurrency_semaphore: wait_admission(): remove default timeout utils/logallog: run_when_memory_available(): remove default timeout
This commit is contained in:
@@ -1208,10 +1208,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
cmd.row_limit,
|
||||
cmd.partition_limit,
|
||||
cmd.timestamp,
|
||||
timeout,
|
||||
cf.get_config().max_memory_for_unlimited_query,
|
||||
std::move(accounter),
|
||||
std::move(trace_state),
|
||||
timeout,
|
||||
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
|
||||
if (f.failed()) {
|
||||
++s->total_reads_failed;
|
||||
@@ -1606,7 +1606,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, c
|
||||
auto uuid = m.column_family_id();
|
||||
auto& cf = find_column_family(uuid);
|
||||
cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
10
database.hh
10
database.hh
@@ -787,7 +787,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state,
|
||||
query::result_memory_limiter& memory_limiter,
|
||||
uint64_t max_result_size,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx = { });
|
||||
|
||||
void start();
|
||||
@@ -1474,14 +1474,14 @@ public:
|
||||
unsigned shard_of(const frozen_mutation& m);
|
||||
future<lw_shared_ptr<query::result>, cache_temperature> query(schema_ptr, const query::read_command& cmd, query::result_options opts,
|
||||
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
|
||||
uint64_t max_result_size, db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
uint64_t max_result_size, db::timeout_clock::time_point timeout);
|
||||
future<reconcilable_result, cache_temperature> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
|
||||
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
db::timeout_clock::time_point timeout);
|
||||
// Apply the mutation atomically.
|
||||
// Throws timed_out_error when timeout is reached.
|
||||
future<> apply(schema_ptr, const frozen_mutation&, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
future<> apply(schema_ptr, const frozen_mutation&, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout);
|
||||
future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
|
||||
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
|
||||
|
||||
@@ -68,4 +68,4 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
uint64_t max_size,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
@@ -2168,9 +2168,9 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
db::timeout_clock::time_point timeout,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
{
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
|
||||
@@ -2263,10 +2263,10 @@ static do_mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
{
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
|
||||
@@ -2303,14 +2303,14 @@ mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
{
|
||||
return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice),
|
||||
row_limit, partition_limit, query_time, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
|
||||
row_limit, partition_limit, query_time, timeout, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), std::move(cache_ctx));
|
||||
}
|
||||
|
||||
deletable_row::deletable_row(clustering_row&& cr)
|
||||
|
||||
@@ -161,10 +161,10 @@ future<reconcilable_result> mutation_query(
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter = { },
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
query::querier_cache_context cache_ctx = { });
|
||||
|
||||
future<> data_query(
|
||||
@@ -176,9 +176,9 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
db::timeout_clock::time_point timeout,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
query::querier_cache_context cache_ctx = { });
|
||||
|
||||
|
||||
@@ -191,10 +191,10 @@ class mutation_query_stage {
|
||||
uint32_t,
|
||||
uint32_t,
|
||||
gc_clock::time_point,
|
||||
db::timeout_clock::time_point,
|
||||
uint64_t,
|
||||
query::result_memory_accounter&&,
|
||||
tracing::trace_state_ptr,
|
||||
db::timeout_clock::time_point,
|
||||
query::querier_cache_context> _execution_stage;
|
||||
public:
|
||||
explicit mutation_query_stage();
|
||||
|
||||
@@ -199,7 +199,7 @@ public:
|
||||
return _inactive_read_stats;
|
||||
}
|
||||
|
||||
future<reader_permit> wait_admission(size_t memory, db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
future<reader_permit> wait_admission(size_t memory, db::timeout_clock::time_point timeout);
|
||||
|
||||
/// Consume the specific amount of resources without waiting.
|
||||
reader_permit consume_resources(resources r);
|
||||
|
||||
2
table.cc
2
table.cc
@@ -2397,7 +2397,7 @@ table::query(schema_ptr s,
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, _config.max_memory_for_unlimited_query, trace_state, timeout, cache_ctx);
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, timeout, _config.max_memory_for_unlimited_query, trace_state, cache_ctx);
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
|
||||
@@ -56,13 +56,13 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
||||
mutation m(s, pkey);
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
||||
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no).get();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
|
||||
auto assert_query_result = [&] (size_t expected_size) {
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 1000);
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
||||
};
|
||||
assert_query_result(1000);
|
||||
@@ -92,34 +92,34 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
||||
mutation m(s, pkey);
|
||||
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no).get();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
for (uint32_t i = 3; i <= 8; ++i) {
|
||||
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
||||
mutation m(s, pkey);
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no).get();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
||||
}
|
||||
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 3);
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::nullopt, 5);
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::nullopt, 3);
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
|
||||
auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
});
|
||||
@@ -138,7 +138,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
|
||||
service::get_local_migration_manager().announce_new_column_family(s, true).get();
|
||||
column_family& cf = e.local_db().find_column_family(s);
|
||||
for (auto&& m : partitions) {
|
||||
e.local_db().apply(cf.schema(), freeze(m), db::commitlog::force_sync::no).get();
|
||||
e.local_db().apply(cf.schema(), freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
cf.flush().get();
|
||||
cf.get_row_cache().invalidate([] {}).get();
|
||||
|
||||
@@ -649,11 +649,11 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
|
||||
//
|
||||
// The allocation of the first element won't change the memory usage inside
|
||||
// the group and we'll be okay to do that a second time.
|
||||
auto fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); });
|
||||
auto fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), true);
|
||||
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
|
||||
|
||||
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); });
|
||||
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), true);
|
||||
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
|
||||
|
||||
@@ -663,7 +663,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
|
||||
|
||||
// We should not be permitted to go forward with a new allocation now...
|
||||
BOOST_TEST_MESSAGE(format("now = {}", lowres_clock::now().time_since_epoch().count()));
|
||||
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); });
|
||||
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
BOOST_REQUIRE_GT(simple.memory_used(), logalloc::segment_size);
|
||||
|
||||
@@ -705,7 +705,7 @@ SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_child_alloc) {
|
||||
child_region->alloc();
|
||||
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
|
||||
|
||||
auto fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); });
|
||||
auto fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), true);
|
||||
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
|
||||
|
||||
@@ -714,7 +714,7 @@ SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_child_alloc) {
|
||||
child_region->alloc();
|
||||
BOOST_REQUIRE_GE(child.memory_used(), 2 * logalloc::segment_size);
|
||||
|
||||
fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); });
|
||||
fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
|
||||
|
||||
@@ -735,7 +735,7 @@ SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_parent_alloc) {
|
||||
parent_region->alloc();
|
||||
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
|
||||
|
||||
auto fut = child.run_when_memory_available([] {});
|
||||
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
|
||||
parent_region.reset();
|
||||
@@ -762,7 +762,7 @@ SEASTAR_TEST_CASE(test_region_groups_fifo_order) {
|
||||
for (auto index = 0; index < 100; ++index) {
|
||||
auto fut = rg.run_when_memory_available([exec_cnt, index] {
|
||||
BOOST_REQUIRE_EQUAL(index, (*exec_cnt)++);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
executions.push_back(std::move(fut));
|
||||
}
|
||||
@@ -794,7 +794,7 @@ SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_moving_restrict
|
||||
});
|
||||
BOOST_REQUIRE_GE(inner.memory_used(), logalloc::segment_size);
|
||||
|
||||
auto fut = child.run_when_memory_available([] {});
|
||||
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
|
||||
// Now fill the root...
|
||||
@@ -846,7 +846,7 @@ SEASTAR_TEST_CASE(test_region_groups_tree_hierarchy_throttling_leaf_alloc) {
|
||||
future<> try_alloc(size_t size) {
|
||||
return _rg.run_when_memory_available([this, size] {
|
||||
alloc(size);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
}
|
||||
void reset() {
|
||||
_region.reset(new test_region(_rg));
|
||||
@@ -981,7 +981,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
|
||||
(void)simple.unleash(make_ready_future<>());
|
||||
|
||||
// Can't run this function until we have reclaimed something
|
||||
auto fut = simple.rg().run_when_memory_available([] {});
|
||||
auto fut = simple.rg().run_when_memory_available([] {}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
@@ -1009,7 +1009,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offen
|
||||
// Can't run this function until we have reclaimed
|
||||
auto fut = simple.rg().run_when_memory_available([&simple] {
|
||||
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes().size(), 3);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
@@ -1043,7 +1043,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offend
|
||||
// that the leaves are forced correctly.
|
||||
auto fut = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
@@ -1072,7 +1072,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_bl
|
||||
// that the root reclaims
|
||||
auto fut = leaf.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
@@ -1098,7 +1098,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_
|
||||
|
||||
auto fut = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
||||
@@ -1127,11 +1127,11 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r
|
||||
|
||||
auto fut_root = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
auto fut_leaf = leaf.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
|
||||
// Initially not available
|
||||
BOOST_REQUIRE_EQUAL(fut_root.available(), false);
|
||||
@@ -1185,7 +1185,7 @@ SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_reg
|
||||
// Trigger group size change (Refs issue #2021)
|
||||
gr.update(-10);
|
||||
gr.update(+10);
|
||||
});
|
||||
}, db::no_timeout);
|
||||
BOOST_REQUIRE(!f.available());
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
|
||||
auto cmd = query::read_command(s->id(), s->version(), s->full_slice(), 7, gc_clock::now(), std::nullopt, query::max_partitions,
|
||||
utils::make_random_uuid(), true);
|
||||
|
||||
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, std::numeric_limits<uint64_t>::max()).get();
|
||||
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, std::numeric_limits<uint64_t>::max(), db::no_timeout).get();
|
||||
|
||||
check_cache_population(env.db(), 1);
|
||||
|
||||
@@ -108,7 +108,7 @@ static std::vector<mutation> read_all_partitions_one_by_one(distributed<database
|
||||
const auto cmd = query::read_command(s->id(), s->version(), s->full_slice(), query::max_rows);
|
||||
const auto range = dht::partition_range::make_singular(pkey);
|
||||
return make_foreign(std::make_unique<reconcilable_result>(
|
||||
db.query_mutations(std::move(s), cmd, range, std::move(accounter), nullptr).get0()));
|
||||
db.query_mutations(std::move(s), cmd, range, std::move(accounter), nullptr, db::no_timeout).get0()));
|
||||
});
|
||||
}).get0();
|
||||
|
||||
@@ -132,7 +132,7 @@ read_partitions_with_paged_scan(distributed<database>& db, schema_ptr s, uint32_
|
||||
|
||||
// First page is special, needs to have `is_first_page` set.
|
||||
{
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {range}, nullptr, max_size).get0());
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {range}, nullptr, max_size, db::no_timeout).get0());
|
||||
for (auto& part : res->partitions()) {
|
||||
auto mut = part.mut().unfreeze(s);
|
||||
results.emplace_back(std::move(mut));
|
||||
@@ -176,7 +176,7 @@ read_partitions_with_paged_scan(distributed<database>& db, schema_ptr s, uint32_
|
||||
cmd.slice.set_range(*s, last_pkey.key(), std::move(ckranges));
|
||||
}
|
||||
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {pkrange}, nullptr, max_size).get0());
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {pkrange}, nullptr, max_size, db::no_timeout).get0());
|
||||
|
||||
if (is_stateful) {
|
||||
BOOST_REQUIRE(aggregate_querier_cache_stat(db, &query::querier_cache::stats::lookups) >= npages);
|
||||
|
||||
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
// FIXME: use mutation assertions
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
@@ -124,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -160,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -174,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -207,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -237,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -265,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -285,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -297,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -324,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -348,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -370,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -396,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.is_empty();
|
||||
@@ -447,7 +447,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
auto query_time = now + std::chrono::seconds(1);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time,
|
||||
max_memory_for_reverse_query).get0();
|
||||
db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
@@ -466,28 +466,28 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
auto src = make_source({m1});
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(s, partition_key::from_single_value(*s, "key2"));
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
@@ -510,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -526,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, max_memory_for_reverse_query).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -549,11 +549,11 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), digest_only_builder, max_memory_for_reverse_query).get0();
|
||||
gc_clock::now(), digest_only_builder, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), result_and_digest_builder, max_memory_for_reverse_query).get0();
|
||||
gc_clock::now(), result_and_digest_builder, db::no_timeout, max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
@@ -1083,7 +1083,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
return async([&] {
|
||||
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
|
||||
// Testing the tracker here, no need to have a base cost.
|
||||
auto permit = semaphore.wait_admission(0).get0();
|
||||
auto permit = semaphore.wait_admission(0, db::no_timeout).get0();
|
||||
|
||||
{
|
||||
auto tracked_file = make_tracked_file(file(shared_ptr<file_impl>(make_shared<dummy_file_impl>())), permit);
|
||||
@@ -2063,7 +2063,7 @@ public:
|
||||
// Add a waiter, so that all registered inactive reads are
|
||||
// immediately evicted.
|
||||
// We don't care about the returned future.
|
||||
(void)_contexts[shard].semaphore->wait_admission(1);
|
||||
(void)_contexts[shard].semaphore->wait_admission(1, db::no_timeout);
|
||||
} else {
|
||||
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}));
|
||||
}
|
||||
|
||||
@@ -685,7 +685,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
const auto per_permit_memory = resources.memory / resources.count;
|
||||
|
||||
for (int i = 0; i < resources.count; ++i) {
|
||||
permits.emplace_back(semaphore.wait_admission(per_permit_memory).get0());
|
||||
permits.emplace_back(semaphore.wait_admission(per_permit_memory, db::no_timeout).get0());
|
||||
}
|
||||
|
||||
BOOST_CHECK_EQUAL(semaphore.available_resources().count, 0);
|
||||
@@ -752,7 +752,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
|
||||
|
||||
auto permit2_fut = sem.wait_admission(1);
|
||||
auto permit2_fut = sem.wait_admission(1, db::no_timeout);
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.waiters(), 1);
|
||||
|
||||
|
||||
@@ -187,7 +187,7 @@ void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned c
|
||||
|
||||
if (sem.waiters()) {
|
||||
testlog.trace("Waiting for queue to drain");
|
||||
sem.wait_admission(1).get();
|
||||
sem.wait_admission(1, db::no_timeout).get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,7 +282,7 @@ int main(int argc, char** argv) {
|
||||
uint64_t i = 0;
|
||||
while (i < sstables) {
|
||||
auto m = gen();
|
||||
env.local_db().apply(s, freeze(m), db::commitlog::force_sync::no).get();
|
||||
env.local_db().apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
if (tab.active_memtable().occupancy().used_space() > sstable_size) {
|
||||
tab.flush().get();
|
||||
++i;
|
||||
|
||||
@@ -172,7 +172,7 @@ int main(int argc, char** argv) {
|
||||
auto&& col = *s->get_column_definition(to_bytes("v"));
|
||||
m.set_clustered_cell(ck, col, atomic_cell::make_live(*col.type, api::new_timestamp(), serialized(value)));
|
||||
auto t0 = clock::now();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no).get();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
writes_hist.add(std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - t0).count());
|
||||
++mutations;
|
||||
}
|
||||
|
||||
@@ -335,7 +335,7 @@ public:
|
||||
//
|
||||
// When timeout is reached first, the returned future is resolved with timed_out_error exception.
|
||||
template <typename Func>
|
||||
futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||
futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout) {
|
||||
// We disallow future-returning functions here, because otherwise memory may be available
|
||||
// when we start executing it, but no longer available in the middle of the execution.
|
||||
static_assert(!is_future<std::result_of_t<Func()>>::value, "future-returning functions are not permitted.");
|
||||
|
||||
Reference in New Issue
Block a user