Merge 'Fix bad performance for densely populated partition index pages' from Tomasz Grabiec

This applies to small partition workload where index pages have high partition count, and the index doesn't fit in cache. It was observed that the count can be in the order of hundreds. In such a workload pages undergo constant population, LSA compaction, and LSA eviction, which has severe impact on CPU utilization.

Refs https://scylladb.atlassian.net/browse/SCYLLADB-620

This PR reduces the impact by several changes:

  - reducing memory footprint in the partition index. Assuming partition key size is 16 bytes, the cost dropped from 96 bytes to 36 bytes per partition.

  - flattening the object graph and amortizing storage. Storing entries directly in the vector. Storing all key values in a single managed_bytes. Making index_entry a trivial struct.

  - index entries and key storage are now trivially moveable, and batched inside vector storage
    so LSA migration can use memcpy(), which amortizes the cost per key. This reduces the cost of LSA segment compaction.

 - LSA eviction is now pretty much constant time for the whole page
   regardless of the number of entries, because elements are trivial and batched inside vectors.
   Page eviction cost dropped from 50 us to 1 us.

Performance evaluated with:

   scylla perf-simple-query -c1 -m200M --partitions=1000000

Before:

```
7774.96 tps (166.0 allocs/op, 521.7 logallocs/op,  54.0 tasks/op,  802428 insns/op,  430457 cycles/op,        0 errors)
7511.08 tps (166.1 allocs/op, 527.2 logallocs/op,  54.0 tasks/op,  804185 insns/op,  430752 cycles/op,        0 errors)
7740.44 tps (166.3 allocs/op, 526.2 logallocs/op,  54.2 tasks/op,  805347 insns/op,  432117 cycles/op,        0 errors)
7818.72 tps (165.2 allocs/op, 517.6 logallocs/op,  53.7 tasks/op,  794965 insns/op,  427751 cycles/op,        0 errors)
7865.49 tps (165.1 allocs/op, 513.3 logallocs/op,  53.6 tasks/op,  788898 insns/op,  425171 cycles/op,        0 errors)
```

After (+318%):

```
32492.40 tps (130.7 allocs/op,  12.8 logallocs/op,  36.1 tasks/op,  109236 insns/op,  103203 cycles/op,        0 errors)
32591.99 tps (130.4 allocs/op,  12.8 logallocs/op,  36.0 tasks/op,  108947 insns/op,  102889 cycles/op,        0 errors)
32514.52 tps (130.6 allocs/op,  12.8 logallocs/op,  36.0 tasks/op,  109118 insns/op,  103219 cycles/op,        0 errors)
32491.14 tps (130.6 allocs/op,  12.8 logallocs/op,  36.0 tasks/op,  109349 insns/op,  103272 cycles/op,        0 errors)
32582.90 tps (130.5 allocs/op,  12.8 logallocs/op,  36.0 tasks/op,  109269 insns/op,  102872 cycles/op,        0 errors)
32479.43 tps (130.6 allocs/op,  12.8 logallocs/op,  36.0 tasks/op,  109313 insns/op,  103242 cycles/op,        0 errors)
32418.48 tps (130.7 allocs/op,  12.8 logallocs/op,  36.1 tasks/op,  109201 insns/op,  103301 cycles/op,        0 errors)
31394.14 tps (130.7 allocs/op,  12.8 logallocs/op,  36.1 tasks/op,  109267 insns/op,  103301 cycles/op,        0 errors)
32298.55 tps (130.7 allocs/op,  12.8 logallocs/op,  36.1 tasks/op,  109323 insns/op,  103551 cycles/op,        0 errors)
```

When the workload is miss-only, with both row cache and index cache disabled (no cache maintenance cost):

  perf-simple-query -c1 -m200M --duration 6000 --partitions=100000 --enable-index-cache=0 --enable-cache=0

Before:

```
9124.57 tps (146.2 allocs/op, 789.0 logallocs/op,  45.3 tasks/op,  889320 insns/op,  357937 cycles/op,        0 errors)
9437.23 tps (146.1 allocs/op, 789.3 logallocs/op,  45.3 tasks/op,  889613 insns/op,  357782 cycles/op,        0 errors)
9455.65 tps (146.0 allocs/op, 787.4 logallocs/op,  45.2 tasks/op,  887606 insns/op,  357167 cycles/op,        0 errors)
9451.22 tps (146.0 allocs/op, 787.4 logallocs/op,  45.3 tasks/op,  887627 insns/op,  357357 cycles/op,        0 errors)
9429.50 tps (146.0 allocs/op, 787.4 logallocs/op,  45.3 tasks/op,  887761 insns/op,  358148 cycles/op,        0 errors)
9430.29 tps (146.1 allocs/op, 788.2 logallocs/op,  45.3 tasks/op,  888501 insns/op,  357679 cycles/op,        0 errors)
9454.08 tps (146.0 allocs/op, 787.3 logallocs/op,  45.3 tasks/op,  887545 insns/op,  357132 cycles/op,        0 errors)
```

After (+55%):

```
14484.84 tps (150.7 allocs/op,   6.5 logallocs/op,  44.7 tasks/op,  396164 insns/op,  229490 cycles/op,        0 errors)
14526.21 tps (150.8 allocs/op,   6.5 logallocs/op,  44.8 tasks/op,  396401 insns/op,  228824 cycles/op,        0 errors)
14567.53 tps (150.7 allocs/op,   6.5 logallocs/op,  44.7 tasks/op,  396319 insns/op,  228701 cycles/op,        0 errors)
14545.63 tps (150.6 allocs/op,   6.5 logallocs/op,  44.7 tasks/op,  395889 insns/op,  228493 cycles/op,        0 errors)
14626.06 tps (150.5 allocs/op,   6.5 logallocs/op,  44.7 tasks/op,  395254 insns/op,  227891 cycles/op,        0 errors)
14593.74 tps (150.5 allocs/op,   6.5 logallocs/op,  44.7 tasks/op,  395480 insns/op,  227993 cycles/op,        0 errors)
14538.10 tps (150.8 allocs/op,   6.5 logallocs/op,  44.8 tasks/op,  397035 insns/op,  228831 cycles/op,        0 errors)
14527.18 tps (150.8 allocs/op,   6.5 logallocs/op,  44.8 tasks/op,  396992 insns/op,  228839 cycles/op,        0 errors)
```

Same as above, but with summary ratio increased from 0.0005 to 0.005 (smaller pages):

Before:

```
33906.70 tps (146.1 allocs/op,  83.6 logallocs/op,  45.1 tasks/op,  170553 insns/op,   98104 cycles/op,        0 errors)
32696.16 tps (146.0 allocs/op,  83.5 logallocs/op,  45.1 tasks/op,  170369 insns/op,   98405 cycles/op,        0 errors)
33889.05 tps (146.1 allocs/op,  83.6 logallocs/op,  45.1 tasks/op,  170551 insns/op,   98135 cycles/op,        0 errors)
33893.24 tps (146.1 allocs/op,  83.5 logallocs/op,  45.1 tasks/op,  170488 insns/op,   98168 cycles/op,        0 errors)
33836.73 tps (146.1 allocs/op,  83.6 logallocs/op,  45.1 tasks/op,  170528 insns/op,   98226 cycles/op,        0 errors)
33897.61 tps (146.0 allocs/op,  83.5 logallocs/op,  45.1 tasks/op,  170428 insns/op,   98081 cycles/op,        0 errors)
33834.73 tps (146.1 allocs/op,  83.5 logallocs/op,  45.1 tasks/op,  170438 insns/op,   98178 cycles/op,        0 errors)
33776.31 tps (146.3 allocs/op,  83.9 logallocs/op,  45.2 tasks/op,  170958 insns/op,   98418 cycles/op,        0 errors)
33808.08 tps (146.3 allocs/op,  83.9 logallocs/op,  45.2 tasks/op,  170940 insns/op,   98388 cycles/op,        0 errors)
```

After (+18%):

```
40081.51 tps (148.2 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  121047 insns/op,   82231 cycles/op,        0 errors)
40005.85 tps (148.6 allocs/op,   4.4 logallocs/op,  45.2 tasks/op,  121327 insns/op,   82545 cycles/op,        0 errors)
39816.75 tps (148.3 allocs/op,   4.4 logallocs/op,  45.1 tasks/op,  121067 insns/op,   82419 cycles/op,        0 errors)
39953.11 tps (148.1 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  121027 insns/op,   82258 cycles/op,        0 errors)
40073.96 tps (148.2 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  121006 insns/op,   82313 cycles/op,        0 errors)
39882.25 tps (148.2 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  120925 insns/op,   82320 cycles/op,        0 errors)
39916.08 tps (148.3 allocs/op,   4.4 logallocs/op,  45.1 tasks/op,  121054 insns/op,   82393 cycles/op,        0 errors)
39786.30 tps (148.2 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  121027 insns/op,   82465 cycles/op,        0 errors)
38662.45 tps (148.3 allocs/op,   4.4 logallocs/op,  45.0 tasks/op,  121108 insns/op,   82312 cycles/op,        0 errors)
39849.42 tps (148.3 allocs/op,   4.4 logallocs/op,  45.1 tasks/op,  121098 insns/op,   82447 cycles/op,        0 errors)
```

Closes scylladb/scylladb#28603

* github.com:scylladb/scylladb:
  sstables: mx: index_reader: Optimize parsing for no promoted index case
  vint: Use std::countl_zero()
  test: sstable_partition_index_cache_test: Validate scenario of pages with sparse promoted index placement
  sstables: mx: index_reader: Amoritze partition key storage
  managed_bytes: Hoist write_fragmented() to common header
  utils: managed_vector: Use std::uninitialized_move() to move objects
  sstables: mx: index_reader: Keep promoted_index info next to index_entry
  sstables: mx: index_reader: Extract partition_index_page::clear_gently()
  sstables: mx: index_reader: Shave-off 16 bytes from index_entry by using raw_token
  sstables: mx: index_reader: Reduce allocation_section overhead during index page parsing by batching allocation
  sstables: mx: index_reader: Keep index_entry directly in the vector
  dht: Introduce raw_token
  test: perf_simple_query: Add 'sstable-format' command-line option
  test: perf_simple_query: Add 'sstable-summary-ratio' command-line option
  test: perf-simple-query: Add option to disable index cache
  test: cql_test_env: Respect enable-index-cache config

(cherry picked from commit 5e7fb08bf3)

Closes scylladb/scylladb#29136

Closes scylladb/scylladb#29140
This commit is contained in:
Avi Kivity
2026-03-19 14:42:49 +02:00
committed by Botond Dénes
parent 6b9aa303d8
commit 1f7dca0225
14 changed files with 419 additions and 232 deletions

View File

@@ -30,6 +30,31 @@ enum class token_kind {
after_all_keys,
};
// Represents a token for partition keys.
// Has a disengaged state, which sorts before all engaged states.
struct raw_token {
int64_t value;
/// Constructs a disengaged token.
raw_token() : value(std::numeric_limits<int64_t>::min()) {}
/// Constructs an engaged token.
/// The token must be of token_kind::key kind.
explicit raw_token(const token&);
explicit raw_token(int64_t v) : value(v) {};
std::strong_ordering operator<=>(const raw_token& o) const noexcept = default;
std::strong_ordering operator<=>(const token& o) const noexcept;
/// Returns true iff engaged.
explicit operator bool() const noexcept {
return value != std::numeric_limits<int64_t>::min();
}
};
using raw_token_opt = seastar::optimized_optional<raw_token>;
class token {
// INT64_MIN is not a legal token, but a special value used to represent
// infinity in token intervals.
@@ -52,6 +77,10 @@ public:
constexpr explicit token(int64_t d) noexcept : token(kind::key, normalize(d)) {}
token(raw_token raw) noexcept
: token(raw ? kind::key : kind::before_all_keys, raw.value)
{ }
// This constructor seems redundant with the bytes_view constructor, but
// it's necessary for IDL, which passes a deserialized_bytes_proxy here.
// (deserialized_bytes_proxy is convertible to bytes&&, but not bytes_view.)
@@ -223,6 +252,29 @@ public:
}
};
inline
raw_token::raw_token(const token& t)
: value(t.raw())
{
#ifdef DEBUG
assert(t._kind == token::kind::key);
#endif
}
inline
std::strong_ordering raw_token::operator<=>(const token& o) const noexcept {
switch (o._kind) {
case token::kind::after_all_keys:
return std::strong_ordering::less;
case token::kind::before_all_keys:
// before_all_keys has a raw value set to the same raw value as a disengaged raw_token, and sorts before all keys.
// So we can order them by just comparing raw values.
[[fallthrough]];
case token::kind::key:
return value <=> o._data;
}
}
inline constexpr std::strong_ordering tri_compare_raw(const int64_t l1, const int64_t l2) noexcept {
if (l1 == l2) {
return std::strong_ordering::equal;
@@ -329,6 +381,17 @@ struct fmt::formatter<dht::token> : fmt::formatter<string_view> {
}
};
template <>
struct fmt::formatter<dht::raw_token> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(const dht::raw_token& t, FormatContext& ctx) const {
if (!t) {
return fmt::format_to(ctx.out(), "null");
}
return fmt::format_to(ctx.out(), "{}", t.value);
}
};
namespace std {
template<>

View File

@@ -201,95 +201,49 @@ public:
virtual future<std::optional<entry_info>> next_entry() = 0;
};
// Allocated inside LSA.
class promoted_index {
deletion_time _del_time;
uint64_t _promoted_index_start;
uint32_t _promoted_index_size;
uint32_t _num_blocks;
public:
promoted_index(const schema& s,
deletion_time del_time,
uint64_t promoted_index_start,
uint32_t promoted_index_size,
uint32_t num_blocks)
: _del_time{del_time}
, _promoted_index_start(promoted_index_start)
, _promoted_index_size(promoted_index_size)
, _num_blocks(num_blocks)
{ }
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
// Call under allocating_section.
// For sstable versions >= mc the returned cursor will be of type `bsearch_clustered_cursor`.
std::unique_ptr<clustered_index_cursor> make_cursor(shared_sstable,
reader_permit,
tracing::trace_state_ptr,
file_input_stream_options,
use_caching);
// Promoted index information produced by the parser.
struct parsed_promoted_index_entry {
deletion_time del_time;
uint64_t promoted_index_start;
uint32_t promoted_index_size;
uint32_t num_blocks;
};
using promoted_index = parsed_promoted_index_entry;
// A partition index element.
// Allocated inside LSA.
class index_entry {
private:
managed_bytes _key;
mutable std::optional<dht::token> _token;
uint64_t _position;
managed_ref<promoted_index> _index;
struct [[gnu::packed]] index_entry {
mutable int64_t raw_token;
uint64_t data_file_offset;
uint32_t key_offset;
public:
key_view get_key() const {
return key_view{_key};
}
// May allocate so must be called under allocating_section.
decorated_key_view get_decorated_key(const schema& s) const {
if (!_token) {
_token.emplace(s.get_partitioner().get_token(get_key()));
}
return decorated_key_view(*_token, get_key());
}
uint64_t position() const { return _position; };
std::optional<deletion_time> get_deletion_time() const {
if (_index) {
return _index->get_deletion_time();
}
return {};
}
index_entry(managed_bytes&& key, uint64_t position, managed_ref<promoted_index>&& index)
: _key(std::move(key))
, _position(position)
, _index(std::move(index))
{}
index_entry(index_entry&&) = default;
index_entry& operator=(index_entry&&) = default;
// Can be nullptr
const managed_ref<promoted_index>& get_promoted_index() const { return _index; }
managed_ref<promoted_index>& get_promoted_index() { return _index; }
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
size_t external_memory_usage() const {
return _key.external_memory_usage() + _index.external_memory_usage();
}
uint64_t position() const { return data_file_offset; }
dht::raw_token token() const { return dht::raw_token(raw_token); }
};
// Required for optimized LSA migration of storage of managed_vector.
static_assert(std::is_trivially_move_assignable_v<index_entry>);
static_assert(std::is_trivially_move_assignable_v<parsed_promoted_index_entry>);
// A partition index page.
//
// Allocated in the standard allocator space but with an LSA allocator as the current allocator.
// So the shallow part is in the standard allocator but all indirect objects are inside LSA.
class partition_index_page {
public:
lsa::chunked_managed_vector<managed_ref<index_entry>> _entries;
lsa::chunked_managed_vector<index_entry> _entries;
managed_bytes _key_storage;
// Stores promoted index information of index entries.
// The i-th element corresponds to the i-th entry in _entries.
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
// that entry doesn't have a promoted index.
// It's not chunked, because promoted index is present only when there are large partitions in the page,
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
// which is typical in workloads with small partitions.
managed_vector<promoted_index> _promoted_indexes;
public:
partition_index_page() = default;
partition_index_page(partition_index_page&&) noexcept = default;
@@ -298,15 +252,68 @@ public:
bool empty() const { return _entries.empty(); }
size_t size() const { return _entries.size(); }
stop_iteration clear_gently() {
// Vectors have trivial storage, so are fast to destroy.
return stop_iteration::yes;
}
void clear_one_entry() {
_entries.pop_back();
}
bool has_promoted_index(size_t i) const {
return i < _promoted_indexes.size() && _promoted_indexes[i].promoted_index_size > 0;
}
/// Get promoted index for the i-th entry.
/// Call only when has_promoted_index(i) is true.
const promoted_index& get_promoted_index(size_t i) const {
return _promoted_indexes[i];
}
/// Get promoted index for the i-th entry.
/// Call only when has_promoted_index(i) is true.
promoted_index& get_promoted_index(size_t i) {
return _promoted_indexes[i];
}
/// Get promoted index size for the i-th entry.
uint32_t get_promoted_index_size(size_t i) const {
return has_promoted_index(i) ? get_promoted_index(i).promoted_index_size : 0;
}
/// Get deletion_time for partition represented by the i-th entry.
/// Returns disengaged optional if the entry doesn't have a promoted index, so we don't know the deletion_time.
/// It has to be read from the data file.
std::optional<deletion_time> get_deletion_time(size_t i) const {
if (has_promoted_index(i)) {
return get_promoted_index(i).del_time;
}
return {};
}
key_view get_key(size_t i) const {
auto start = _entries[i].key_offset;
auto end = i + 1 < _entries.size() ? _entries[i + 1].key_offset : _key_storage.size();
auto v = managed_bytes_view(_key_storage).prefix(end);
v.remove_prefix(start);
return key_view(v);
}
decorated_key_view get_decorated_key(const schema& s, size_t i) const {
auto key = get_key(i);
auto t = _entries[i].token();
if (!t) {
t = dht::raw_token(s.get_partitioner().get_token(key));
_entries[i].raw_token = t.value;
}
return decorated_key_view(dht::token(t), key);
}
size_t external_memory_usage() const {
size_t size = _entries.external_memory_usage();
for (auto&& e : _entries) {
size += sizeof(index_entry) + e->external_memory_usage();
}
size += _promoted_indexes.external_memory_usage();
size += _key_storage.external_memory_usage();
return size;
}
};

View File

@@ -25,14 +25,6 @@ namespace sstables {
extern seastar::logger sstlog;
extern thread_local mc::cached_promoted_index::metrics promoted_index_cache_metrics;
// Promoted index information produced by the parser.
struct parsed_promoted_index_entry {
deletion_time del_time;
uint64_t promoted_index_start;
uint32_t promoted_index_size;
uint32_t num_blocks;
};
// Partition index entry information produced by the parser.
struct parsed_partition_index_entry {
temporary_buffer<char> key;
@@ -53,44 +45,72 @@ class index_consumer {
schema_ptr _s;
logalloc::allocating_section _alloc_section;
logalloc::region& _region;
utils::chunked_vector<parsed_partition_index_entry> _parsed_entries;
size_t _max_promoted_index_entry_plus_one = 0; // Highest index +1 in _parsed_entries which has a promoted index.
size_t _key_storage_size = 0;
public:
index_list indexes;
index_consumer(logalloc::region& r, schema_ptr s)
: _s(std::move(s))
, _region(r)
{ }
~index_consumer() {
with_allocator(_region.allocator(), [&] {
indexes._entries.clear_and_release();
});
void consume_entry(parsed_partition_index_entry&& e) {
_key_storage_size += e.key.size();
_parsed_entries.emplace_back(std::move(e));
if (e.promoted_index) {
_max_promoted_index_entry_plus_one = std::max(_max_promoted_index_entry_plus_one, _parsed_entries.size());
}
}
void consume_entry(parsed_partition_index_entry&& e) {
_alloc_section(_region, [&] {
future<index_list> finalize() {
index_list result;
// In case of exception, need to deallocate under region allocator.
auto delete_result = seastar::defer([&] {
with_allocator(_region.allocator(), [&] {
managed_ref<promoted_index> pi;
if (e.promoted_index) {
pi = make_managed<promoted_index>(*_s,
e.promoted_index->del_time,
e.promoted_index->promoted_index_start,
e.promoted_index->promoted_index_size,
e.promoted_index->num_blocks);
}
auto key = managed_bytes(reinterpret_cast<const bytes::value_type*>(e.key.get()), e.key.size());
indexes._entries.emplace_back(make_managed<index_entry>(std::move(key), e.data_file_offset, std::move(pi)));
result._entries = {};
result._promoted_indexes = {};
result._key_storage = {};
});
});
auto i = _parsed_entries.begin();
size_t key_offset = 0;
while (i != _parsed_entries.end()) {
_alloc_section(_region, [&] {
with_allocator(_region.allocator(), [&] {
result._entries.reserve(_parsed_entries.size());
result._promoted_indexes.resize(_max_promoted_index_entry_plus_one);
if (result._key_storage.empty()) {
result._key_storage = managed_bytes(managed_bytes::initialized_later(), _key_storage_size);
}
managed_bytes_mutable_view key_out(result._key_storage);
key_out.remove_prefix(key_offset);
while (i != _parsed_entries.end()) {
parsed_partition_index_entry& e = *i;
if (e.promoted_index) {
result._promoted_indexes[result._entries.size()] = *e.promoted_index;
}
write_fragmented(key_out, std::string_view(e.key.begin(), e.key.size()));
result._entries.emplace_back(index_entry{dht::raw_token().value, e.data_file_offset, key_offset});
++i;
key_offset += e.key.size();
if (need_preempt()) {
break;
}
}
});
});
co_await coroutine::maybe_yield();
}
delete_result.cancel();
_parsed_entries.clear();
co_return std::move(result);
}
void prepare(uint64_t size) {
_alloc_section = logalloc::allocating_section();
_alloc_section(_region, [&] {
with_allocator(_region.allocator(), [&] {
indexes._entries.reserve(size);
});
});
_max_promoted_index_entry_plus_one = 0;
_key_storage_size = 0;
_parsed_entries.clear();
_parsed_entries.reserve(size);
}
};
@@ -195,10 +215,14 @@ public:
switch (_state) {
// START comes first, to make the handling of the 0-quantity case simpler
state_START:
case state::START:
sstlog.trace("{}: pos {} state {} - data.size()={}", fmt::ptr(this), current_pos(), state::START, data.size());
_state = state::KEY_SIZE;
break;
if (data.size() == 0) {
break;
}
[[fallthrough]];
case state::KEY_SIZE:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_SIZE);
_entry_offset = current_pos();
@@ -224,7 +248,16 @@ public:
case state::PROMOTED_SIZE:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::PROMOTED_SIZE);
_position = this->_u64;
if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
if (is_mc_format() && data.size() && *data.begin() == 0) { // promoted_index_size == 0
data.trim_front(1);
_consumer.consume_entry(parsed_partition_index_entry{
.key = std::move(_key),
.data_file_offset = _position,
.index_offset = _entry_offset,
.promoted_index = std::nullopt
});
goto state_START;
} else if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
_state = state::PARTITION_HEADER_LENGTH_1;
break;
}
@@ -336,33 +369,6 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.index_filename()));
}
inline
std::unique_ptr<clustered_index_cursor> promoted_index::make_cursor(shared_sstable sst,
reader_permit permit,
tracing::trace_state_ptr trace_state,
file_input_stream_options options,
use_caching caching)
{
if (sst->get_version() >= sstable_version_types::mc) [[likely]] {
seastar::shared_ptr<cached_file> cached_file_ptr = caching
? sst->_cached_index_file
: seastar::make_shared<cached_file>(make_tracked_index_file(*sst, permit, trace_state, caching),
sst->manager().get_cache_tracker().get_index_cached_file_stats(),
sst->manager().get_cache_tracker().get_lru(),
sst->manager().get_cache_tracker().region(),
sst->_index_file_size);
return std::make_unique<mc::bsearch_clustered_cursor>(*sst->get_schema(),
_promoted_index_start, _promoted_index_size,
promoted_index_cache_metrics, permit,
sst->get_column_translation(), cached_file_ptr, _num_blocks, trace_state, sst->features());
}
auto file = make_tracked_index_file(*sst, permit, std::move(trace_state), caching);
auto promoted_index_stream = make_file_input_stream(std::move(file), _promoted_index_start, _promoted_index_size,options);
return std::make_unique<scanning_clustered_index_cursor>(*sst->get_schema(), permit,
std::move(promoted_index_stream), _promoted_index_size, _num_blocks, std::nullopt);
}
// Less-comparator for lookups in the partition index.
class index_comparator {
dht::ring_position_comparator_for_sstables _tri_cmp;
@@ -373,27 +379,17 @@ public:
return _tri_cmp(e.get_decorated_key(), rp) < 0;
}
bool operator()(const index_entry& e, dht::ring_position_view rp) const {
return _tri_cmp(e.get_decorated_key(_tri_cmp.s), rp) < 0;
}
bool operator()(const managed_ref<index_entry>& e, dht::ring_position_view rp) const {
return operator()(*e, rp);
}
bool operator()(dht::ring_position_view rp, const managed_ref<index_entry>& e) const {
return operator()(rp, *e);
}
bool operator()(dht::ring_position_view rp, const summary_entry& e) const {
return _tri_cmp(e.get_decorated_key(), rp) > 0;
}
bool operator()(dht::ring_position_view rp, const index_entry& e) const {
return _tri_cmp(e.get_decorated_key(_tri_cmp.s), rp) > 0;
}
};
inline
std::strong_ordering index_entry_tri_cmp(const schema& s, partition_index_page& page, size_t idx, dht::ring_position_view rp) {
dht::ring_position_comparator_for_sstables tri_cmp(s);
return tri_cmp(page.get_decorated_key(s, idx), rp);
}
// Contains information about index_reader position in the index file
struct index_bound {
index_bound() = default;
@@ -534,7 +530,7 @@ private:
if (ex) {
return make_exception_future<index_list>(std::move(ex));
}
return make_ready_future<index_list>(std::move(bound.consumer->indexes));
return bound.consumer->finalize();
});
});
};
@@ -547,17 +543,18 @@ private:
if (bound.current_list->empty()) {
throw malformed_sstable_exception(format("missing index entry for summary index {} (bound {})", summary_idx, fmt::ptr(&bound)), _sstable->index_filename());
}
bound.data_file_position = bound.current_list->_entries[0]->position();
bound.data_file_position = bound.current_list->_entries[0].position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
if (sstlog.is_enabled(seastar::log_level::trace)) {
sstlog.trace("index {} bound {}: page:", fmt::ptr(this), fmt::ptr(&bound));
logalloc::reclaim_lock rl(_region);
for (auto&& e : bound.current_list->_entries) {
for (size_t i = 0; i < bound.current_list->_entries.size(); ++i) {
auto& e = bound.current_list->_entries[i];
auto dk = dht::decorate_key(*_sstable->_schema,
e->get_key().to_partition_key(*_sstable->_schema));
sstlog.trace(" {} -> {}", dk, e->position());
bound.current_list->get_key(i).to_partition_key(*_sstable->_schema));
sstlog.trace(" {} -> {}", dk, e.position());
}
}
@@ -601,7 +598,13 @@ private:
// Valid if partition_data_ready(bound)
index_entry& current_partition_entry(index_bound& bound) {
parse_assert(bool(bound.current_list), _sstable->index_filename());
return *bound.current_list->_entries[bound.current_index_idx];
return bound.current_list->_entries[bound.current_index_idx];
}
// Valid if partition_data_ready(bound)
partition_index_page& current_page(index_bound& bound) {
parse_assert(bool(bound.current_list), _sstable->index_filename());
return *bound.current_list;
}
future<> advance_to_next_partition(index_bound& bound) {
@@ -614,7 +617,7 @@ private:
if (bound.current_index_idx + 1 < bound.current_list->size()) {
++bound.current_index_idx;
bound.current_pi_idx = 0;
bound.data_file_position = bound.current_list->_entries[bound.current_index_idx]->position();
bound.data_file_position = bound.current_list->_entries[bound.current_index_idx].position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
return reset_clustered_cursor(bound);
@@ -677,9 +680,13 @@ private:
return advance_to_page(bound, summary_idx).then([this, &bound, pos, summary_idx] {
sstlog.trace("index {}: old page index = {}", fmt::ptr(this), bound.current_index_idx);
auto i = _alloc_section(_region, [&] {
auto& entries = bound.current_list->_entries;
return std::lower_bound(std::begin(entries) + bound.current_index_idx, std::end(entries), pos,
index_comparator(*_sstable->_schema));
auto& page = *bound.current_list;
auto& s = *_sstable->_schema;
auto r = std::views::iota(bound.current_index_idx, page._entries.size());
auto it = std::ranges::partition_point(r, [&] (int idx) {
return index_entry_tri_cmp(s, page, idx, pos) < 0;
});
return page._entries.begin() + bound.current_index_idx + std::ranges::distance(r.begin(), it);
});
// i is valid until next allocation point
auto& entries = bound.current_list->_entries;
@@ -694,7 +701,7 @@ private:
}
bound.current_index_idx = std::distance(std::begin(entries), i);
bound.current_pi_idx = 0;
bound.data_file_position = (*i)->position();
bound.data_file_position = (*i).position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
sstlog.trace("index {}: new page index = {}, pos={}", fmt::ptr(this), bound.current_index_idx, bound.data_file_position);
@@ -794,6 +801,34 @@ public:
}
}
static
std::unique_ptr<clustered_index_cursor> make_cursor(const parsed_promoted_index_entry& pi,
shared_sstable sst,
reader_permit permit,
tracing::trace_state_ptr trace_state,
file_input_stream_options options,
use_caching caching)
{
if (sst->get_version() >= sstable_version_types::mc) [[likely]] {
seastar::shared_ptr<cached_file> cached_file_ptr = caching
? sst->_cached_index_file
: seastar::make_shared<cached_file>(make_tracked_index_file(*sst, permit, trace_state, caching),
sst->manager().get_cache_tracker().get_index_cached_file_stats(),
sst->manager().get_cache_tracker().get_lru(),
sst->manager().get_cache_tracker().region(),
sst->_index_file_size);
return std::make_unique<mc::bsearch_clustered_cursor>(*sst->get_schema(),
pi.promoted_index_start, pi.promoted_index_size,
promoted_index_cache_metrics, permit,
sst->get_column_translation(), cached_file_ptr, pi.num_blocks, trace_state, sst->features());
}
auto file = make_tracked_index_file(*sst, permit, std::move(trace_state), caching);
auto promoted_index_stream = make_file_input_stream(std::move(file), pi.promoted_index_start, pi.promoted_index_size,options);
return std::make_unique<scanning_clustered_index_cursor>(*sst->get_schema(), permit,
std::move(promoted_index_stream), pi.promoted_index_size, pi.num_blocks, std::nullopt);
}
// Ensures that partition_data_ready() returns true.
// Can be called only when !eof()
future<> read_partition_data() override {
@@ -829,10 +864,10 @@ public:
clustered_index_cursor* current_clustered_cursor(index_bound& bound) {
if (!bound.clustered_cursor) {
_alloc_section(_region, [&] {
index_entry& e = current_partition_entry(bound);
promoted_index* pi = e.get_promoted_index().get();
if (pi) {
bound.clustered_cursor = pi->make_cursor(_sstable, _permit, _trace_state,
partition_index_page& page = current_page(bound);
if (page.has_promoted_index(bound.current_index_idx)) {
promoted_index& pi = page.get_promoted_index(bound.current_index_idx);
bound.clustered_cursor = make_cursor(pi, _sstable, _permit, _trace_state,
get_file_input_stream_options(), _use_caching);
}
});
@@ -855,15 +890,15 @@ public:
// It may be unavailable for old sstables for which this information was not generated.
// Can be called only when partition_data_ready().
std::optional<sstables::deletion_time> partition_tombstone() override {
return current_partition_entry(_lower_bound).get_deletion_time();
return current_page(_lower_bound).get_deletion_time(_lower_bound.current_index_idx);
}
// Returns the key for current partition.
// Can be called only when partition_data_ready().
std::optional<partition_key> get_partition_key() override {
return _alloc_section(_region, [this] {
index_entry& e = current_partition_entry(_lower_bound);
return e.get_key().to_partition_key(*_sstable->_schema);
return current_page(_lower_bound).get_key(_lower_bound.current_index_idx)
.to_partition_key(*_sstable->_schema);
});
}
@@ -877,8 +912,8 @@ public:
// Returns the number of promoted index entries for the current partition.
// Can be called only when partition_data_ready().
uint64_t get_promoted_index_size() {
index_entry& e = current_partition_entry(_lower_bound);
return e.get_promoted_index_size();
partition_index_page& page = current_page(_lower_bound);
return page.get_promoted_index_size(_lower_bound.current_index_idx);
}
bool partition_data_ready() const override {
@@ -969,9 +1004,9 @@ public:
return make_ready_future<bool>(false);
}
return read_partition_data().then([this, key] {
index_comparator cmp(*_sstable->_schema);
bool found = _alloc_section(_region, [&] {
return cmp(key, current_partition_entry(_lower_bound)) == 0;
auto& page = current_page(_lower_bound);
return index_entry_tri_cmp(*_sstable->_schema, page, _lower_bound.current_index_idx, key) == 0;
});
return make_ready_future<bool>(found);
});

View File

@@ -257,14 +257,11 @@ public:
while (partial_page || i != _cache.end()) {
if (partial_page) {
auto preempted = with_allocator(_region.allocator(), [&] {
while (!partial_page->empty()) {
partial_page->clear_one_entry();
if (need_preempt()) {
return true;
}
while (partial_page->clear_gently() != stop_iteration::yes) {
return true;
}
partial_page.reset();
return false;
return need_preempt();
});
if (preempted) {
auto key = (i != _cache.end()) ? std::optional(i->key()) : std::nullopt;

View File

@@ -1073,7 +1073,6 @@ public:
friend class mc::writer;
friend class index_reader;
friend class promoted_index;
friend class sstables_manager;
template <typename DataConsumeRowsContext>
friend future<std::unique_ptr<DataConsumeRowsContext>>

View File

@@ -20,16 +20,24 @@ static void add_entry(logalloc::region& r,
const schema& s,
partition_index_page& page,
const partition_key& key,
uint64_t position)
uint64_t position,
std::optional<parsed_promoted_index_entry> promoted_index = std::nullopt)
{
logalloc::allocating_section as;
as(r, [&] {
with_allocator(r.allocator(), [&] {
sstables::key sst_key = sstables::key::from_partition_key(s, key);
page._entries.push_back(make_managed<index_entry>(
managed_bytes(sst_key.get_bytes()),
position,
managed_ref<promoted_index>()));
auto key_offset = page._key_storage.size();
auto old_storage = std::move(page._key_storage);
page._key_storage = managed_bytes(managed_bytes::initialized_later(), key_offset + sst_key.get_bytes().size());
auto out = managed_bytes_mutable_view(page._key_storage);
write_fragmented(out, managed_bytes_view(old_storage));
write_fragmented(out, single_fragmented_view(bytes_view(sst_key)));
page._entries.push_back(index_entry{dht::raw_token_opt()->value, position, key_offset});
if (promoted_index) {
page._promoted_indexes.resize(page._entries.size());
page._promoted_indexes[page._entries.size() - 1] = *promoted_index;
}
});
});
}
@@ -54,10 +62,10 @@ static partition_index_page make_page0(logalloc::region& r, simple_schema& s) {
static void has_page0(partition_index_cache::entry_ptr ptr) {
BOOST_REQUIRE(!ptr->empty());
BOOST_REQUIRE_EQUAL(ptr->_entries.size(), 4);
BOOST_REQUIRE_EQUAL(ptr->_entries[0]->position(), 0);
BOOST_REQUIRE_EQUAL(ptr->_entries[1]->position(), 1);
BOOST_REQUIRE_EQUAL(ptr->_entries[2]->position(), 2);
BOOST_REQUIRE_EQUAL(ptr->_entries[3]->position(), 3);
BOOST_REQUIRE_EQUAL(ptr->_entries[0].position(), 0);
BOOST_REQUIRE_EQUAL(ptr->_entries[1].position(), 1);
BOOST_REQUIRE_EQUAL(ptr->_entries[2].position(), 2);
BOOST_REQUIRE_EQUAL(ptr->_entries[3].position(), 3);
};
SEASTAR_THREAD_TEST_CASE(test_caching) {
@@ -139,6 +147,59 @@ SEASTAR_THREAD_TEST_CASE(test_caching) {
}
}
SEASTAR_THREAD_TEST_CASE(test_sparse_promoted_index) {
::lru lru;
simple_schema s;
logalloc::region r;
partition_index_cache_stats stats;
partition_index_cache cache(lru, r, stats);
auto page0_loader = [&] (partition_index_cache::key_type k) -> future<partition_index_page> {
partition_index_page page;
auto destroy_page = defer([&] {
with_allocator(r.allocator(), [&] {
auto p = std::move(page);
});
});
add_entry(r, *s.schema(), page, s.make_pkey(0).key(), 0);
add_entry(r, *s.schema(), page, s.make_pkey(1).key(), 1, parsed_promoted_index_entry{
.promoted_index_start = 1,
.promoted_index_size = 10,
.num_blocks = 3
});
add_entry(r, *s.schema(), page, s.make_pkey(2).key(), 2);
add_entry(r, *s.schema(), page, s.make_pkey(3).key(), 3, parsed_promoted_index_entry{
.promoted_index_start = 2,
.promoted_index_size = 13,
.num_blocks = 1
});
add_entry(r, *s.schema(), page, s.make_pkey(4).key(), 4);
destroy_page.cancel();
co_return std::move(page);
};
auto page = cache.get_or_load(0, page0_loader).get();
BOOST_REQUIRE_EQUAL(page->has_promoted_index(0), false);
BOOST_REQUIRE_EQUAL(page->has_promoted_index(1), true);
BOOST_REQUIRE_EQUAL(page->has_promoted_index(2), false);
BOOST_REQUIRE_EQUAL(page->has_promoted_index(3), true);
BOOST_REQUIRE_EQUAL(page->has_promoted_index(4), false);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(1).promoted_index_start, 1);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(1).promoted_index_size, 10);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(1).num_blocks, 3);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(3).promoted_index_start, 2);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(3).promoted_index_size, 13);
BOOST_REQUIRE_EQUAL(page->get_promoted_index(3).num_blocks, 1);
with_allocator(r.allocator(), [&] {
lru.evict_all();
});
}
template <typename T>
static future<> ignore_result(future<T>&& f) {
return f.then_wrapped([] (auto&& f) {

View File

@@ -330,4 +330,28 @@ SEASTAR_THREAD_TEST_CASE(test_stale_version_notification) {
std::cerr.rdbuf(oldCerr);
BOOST_TEST(my_stream.str().find("topology version 0 held for") != std::string::npos);
}
}
SEASTAR_THREAD_TEST_CASE(test_raw_token) {
const auto t1 = dht::token::from_int64(1);
const auto t2 = dht::token::from_int64(2);
dht::raw_token_opt rt_opt;
BOOST_REQUIRE(!rt_opt);
rt_opt = dht::raw_token(t1);
BOOST_REQUIRE(*rt_opt == t1);
BOOST_REQUIRE(dht::raw_token() == dht::minimum_token());
BOOST_REQUIRE(dht::raw_token() < dht::raw_token(dht::first_token()));
BOOST_REQUIRE(dht::raw_token() < dht::first_token());
BOOST_REQUIRE(dht::raw_token() < dht::maximum_token());
auto rt1 = dht::raw_token(t1);
BOOST_REQUIRE(bool(rt1));
BOOST_REQUIRE(rt1 > dht::raw_token());
BOOST_REQUIRE(rt1 > dht::minimum_token());
BOOST_REQUIRE_EQUAL(rt1, t1);
BOOST_REQUIRE(rt1 == t1);
BOOST_REQUIRE(rt1 < t2);
BOOST_REQUIRE(rt1 < dht::maximum_token());
}

View File

@@ -545,6 +545,9 @@ private:
cfg->ring_delay_ms.set(500);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
smp::invoke_on_all([&] {
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
}).get();
create_directories((data_dir_path + "/system").c_str());
create_directories(cfg->commitlog_directory().c_str());
create_directories(cfg->schema_commitlog_directory().c_str());

View File

@@ -330,10 +330,13 @@ int scylla_simple_query_main(int argc, char** argv) {
("counters", "test counters")
("tablets", "use tablets")
("initial-tablets", bpo::value<unsigned>()->default_value(128), "initial number of tablets")
("sstable-summary-ratio", bpo::value<double>(), "Generate summary entry, so that summary file size / data file size ~= this ratio")
("sstable-format", bpo::value<std::string>(), "SSTable format name to use")
("flush", "flush memtables before test")
("memtable-partitions", bpo::value<unsigned>(), "apply this number of partitions to memtable, then flush")
("json-result", bpo::value<std::string>(), "name of the json result file")
("enable-cache", bpo::value<bool>()->default_value(true), "enable row cache")
("enable-index-cache", bpo::value<bool>()->default_value(true), "enable partition index cache")
("stop-on-error", bpo::value<bool>()->default_value(true), "stop after encountering the first error")
("timeout", bpo::value<std::string>()->default_value(""), "use timeout")
("bypass-cache", "use bypass cache when querying")
@@ -357,8 +360,19 @@ int scylla_simple_query_main(int argc, char** argv) {
auto db_cfg = ::make_shared<db::config>(ext);
const auto enable_cache = app.configuration()["enable-cache"].as<bool>();
const auto enable_index_cache = app.configuration()["enable-index-cache"].as<bool>();
std::cout << "enable-cache=" << enable_cache << '\n';
std::cout << "enable-index-cache=" << enable_index_cache << '\n';
db_cfg->enable_cache(enable_cache);
db_cfg->cache_index_pages(enable_index_cache);
if (app.configuration().contains("sstable-summary-ratio")) {
db_cfg->sstable_summary_ratio(app.configuration()["sstable-summary-ratio"].as<double>());
}
std::cout << "sstable-summary-ratio=" << db_cfg->sstable_summary_ratio() << '\n';
if (app.configuration().contains("sstable-format")) {
db_cfg->sstable_format(app.configuration()["sstable-format"].as<std::string>());
}
std::cout << "sstable-format=" << db_cfg->sstable_format() << '\n';
cql_test_config cfg(db_cfg);
if (app.configuration().contains("tablets")) {
cfg.db_config->tablets_mode_for_new_keyspaces.set(db::tablets_mode_t::mode::enabled);

View File

@@ -715,15 +715,6 @@ void write_collection_value(bytes_ostream& out, atomic_cell_value_view val) {
}
}
void write_fragmented(managed_bytes_mutable_view& out, std::string_view val) {
while (val.size() > 0) {
size_t current_n = std::min(val.size(), out.current_fragment().size());
memcpy(out.current_fragment().data(), val.data(), current_n);
val.remove_prefix(current_n);
out.remove_prefix(current_n);
}
}
template<std::integral T>
void write_simple(managed_bytes_mutable_view& out, std::type_identity_t<T> val) {
val = net::hton(val);

View File

@@ -566,6 +566,16 @@ inline managed_bytes::managed_bytes(const managed_bytes& o) {
}
}
inline
void write_fragmented(managed_bytes_mutable_view& out, std::string_view val) {
while (val.size() > 0) {
size_t current_n = std::min(val.size(), out.current_fragment().size());
memcpy(out.current_fragment().data(), val.data(), current_n);
val.remove_prefix(current_n);
out.remove_prefix(current_n);
}
}
template<>
struct appending_hash<managed_bytes_view> {
template<Hasher Hasher>

View File

@@ -10,6 +10,7 @@
#include <array>
#include <type_traits>
#include <algorithm>
#include "utils/allocation_strategy.hh"
@@ -27,10 +28,8 @@ private:
T _data[0];
external(external&& other) noexcept : _backref(other._backref) {
for (unsigned i = 0; i < _backref->size(); i++) {
new (_data + i) T(std::move(other._data[i]));
other._data[i].~T();
}
std::uninitialized_move(other._data, other._data + other._backref->_size, _data);
std::destroy(other._data, other._data + other._backref->_size);
_backref->_data = _data;
}
size_t storage_size() const noexcept {

View File

@@ -18,15 +18,6 @@
static_assert(-1 == ~0, "Not a twos-complement architecture");
// Accounts for the case that all bits are zero.
static vint_size_type count_leading_zero_bits(uint64_t n) noexcept {
if (n == 0) {
return vint_size_type(std::numeric_limits<uint64_t>::digits);
}
return vint_size_type(count_leading_zeros(n));
}
static constexpr uint64_t encode_zigzag(int64_t n) noexcept {
// The right shift has to be arithmetic and not logical.
return (static_cast<uint64_t>(n) << 1) ^ static_cast<uint64_t>(n >> 63);
@@ -55,16 +46,9 @@ int64_t signed_vint::deserialize(bytes_view v) {
return decode_zigzag(un);
}
vint_size_type signed_vint::serialized_size_from_first_byte(bytes::value_type first_byte) {
return unsigned_vint::serialized_size_from_first_byte(first_byte);
}
// The number of additional bytes that we need to read.
static vint_size_type count_extra_bytes(int8_t first_byte) {
// Sign extension.
const int64_t v(first_byte);
return count_leading_zero_bits(static_cast<uint64_t>(~v)) - vint_size_type(64 - 8);
return std::countl_zero(static_cast<uint8_t>(~first_byte));
}
static void encode(uint64_t value, vint_size_type size, bytes::iterator out) {
@@ -139,8 +123,3 @@ uint64_t unsigned_vint::deserialize(bytes_view v) {
#endif
return result;
}
vint_size_type unsigned_vint::serialized_size_from_first_byte(bytes::value_type first_byte) {
int8_t first_byte_casted = first_byte;
return 1 + (first_byte_casted >= 0 ? 0 : count_extra_bytes(first_byte_casted));
}

View File

@@ -35,6 +35,7 @@
#include "bytes.hh"
#include <cstdint>
#include <bit>
using vint_size_type = bytes::size_type;
@@ -49,7 +50,9 @@ struct unsigned_vint final {
static value_type deserialize(bytes_view v);
static vint_size_type serialized_size_from_first_byte(bytes::value_type first_byte);
static vint_size_type serialized_size_from_first_byte(bytes::value_type first_byte) {
return 1 + std::countl_zero(static_cast<uint8_t>(~first_byte));
}
};
struct signed_vint final {
@@ -61,5 +64,7 @@ struct signed_vint final {
static value_type deserialize(bytes_view v);
static vint_size_type serialized_size_from_first_byte(bytes::value_type first_byte);
static vint_size_type serialized_size_from_first_byte(bytes::value_type first_byte) {
return unsigned_vint::serialized_size_from_first_byte(first_byte);
}
};