checkpoint

This commit is contained in:
Avi Kivity
2014-08-12 17:06:18 +03:00
parent 1b05c9ab9c
commit ebc58c56c3
6 changed files with 300 additions and 39 deletions

View File

@@ -1,5 +1,5 @@
mode = release
mode = debug
sanitize.debug = -fsanitize=address -fsanitize=leak -fsanitize=undefined
sanitize.release =
@@ -10,11 +10,11 @@ opt.release = -O2 -flto
sanitize = $(sanitize.$(mode))
opt = $(opt.$(mode))
CXXFLAGS = -std=gnu++1y -g -Wall $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden
CXXFLAGS = -std=gnu++1y -g -Wall -Werror $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden
tests = test-reactor
all: seastar $(tests)
all: seastar $(tests) httpd
clean:
rm seastar $(tests) *.o
@@ -25,5 +25,7 @@ seastar: main.o reactor.o
test-reactor: test-reactor.o reactor.o
$(CXX) $(CXXFLAGS) -o $@ $^
httpd: httpd.o reactor.o
$(CXX) $(CXXFLAGS) -o $@ $^
-include *.d

56
httpd.cc Normal file
View File

@@ -0,0 +1,56 @@
/*
* Copyright 2014 Cloudius Systems
*/
#include "reactor.hh"
#include <iostream>
#include <algorithm>
class http_server {
reactor& _r;
std::vector<std::unique_ptr<pollable_fd>> _listeners;
public:
http_server(reactor& r) : _r(r) {}
void listen(ipv4_addr addr) {
listen_options lo;
lo.reuse_address = true;
do_accepts(_r.listen(make_ipv4_address(addr), lo));
}
void do_accepts(std::unique_ptr<pollable_fd> lfd) {
auto l = lfd.get();
l->accept().then([this, lfd = std::move(lfd)] (future<accept_result> res) mutable {
accept_result ar = res.get();
auto fd = std::move(std::get<0>(ar));
auto addr = std::get<1>(ar);
(new connection(std::move(fd), addr))->read();
do_accepts(std::move(lfd));
});
}
class connection {
std::unique_ptr<pollable_fd> _fd;
socket_address _addr;
input_stream_buffer<char> _read_buf;
static constexpr size_t limit = 4096;
using tmp_buf = temporary_buffer<char>;
public:
connection(std::unique_ptr<pollable_fd>&& fd, socket_address addr)
: _fd(std::move(fd)), _addr(addr), _read_buf(*_fd, 8192) {}
void read() {
_read_buf.read_until(limit, '\n').then([this] (future<tmp_buf> fut_start_line) {
auto start_line = fut_start_line.get();
std::cout << std::string(start_line.begin(), start_line.end());
});
}
};
};
int main(int ac, char** av) {
reactor r;
http_server server(r);
server.listen({{}, 10000});
r.run();
return 0;
}

View File

@@ -60,11 +60,20 @@ reactor::listen(socket_address sa, listen_options opts) {
int r = ::bind(fd, &sa.u.sa, sizeof(sa.u.sas));
assert(r != -1);
::listen(fd, 100);
return std::unique_ptr<pollable_fd>(new pollable_fd(*this, fd));
return std::unique_ptr<pollable_fd>(new pollable_fd(fd));
}
void reactor::run() {
std::vector<std::unique_ptr<task>> current_tasks;
while (true) {
while (!_pending_tasks.empty()) {
std::swap(_pending_tasks, current_tasks);
for (auto&& tsk : current_tasks) {
tsk->run();
tsk.reset();
}
current_tasks.clear();
}
std::array<epoll_event, 128> eevt;
int nr = ::epoll_wait(_epollfd, eevt.data(), eevt.size(), -1);
assert(nr != -1);
@@ -99,3 +108,4 @@ socket_address make_ipv4_address(ipv4_addr addr) {
return sa;
}
reactor the_reactor;

View File

@@ -20,6 +20,8 @@
#include <stdexcept>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <algorithm>
class socket_address;
class reactor;
@@ -111,38 +113,31 @@ struct future_state {
bool has_promise() const { return _promise; }
bool has_future() const { return _future; }
void wait();
void make_ready();
void set(const T& value) {
assert(_state == state::future);
_state = state::result;
new (&_u.value) T(value);
if (_task) {
_task->run();
}
make_ready();
}
void set(T&& value) {
assert(_state == state::future);
_state = state::result;
new (&_u.value) T(std::move(value));
if (_task) {
_task->run();
}
make_ready();
}
template <typename... A>
void set(A... a) {
assert(_state == state::future);
_state = state::result;
new (&_u.value) T(std::forward(a)...);
if (_task) {
_task->run();
}
schedule();
}
void set_exception(std::exception_ptr ex) {
assert(_state == state::future);
_state = state::exception;
new (&_u.ex) std::exception(ex);
if (_task) {
_task->run();
}
make_ready();
}
T get() {
while (_state == state::future) {
@@ -244,6 +239,7 @@ class reactor {
public:
int _epollfd;
io_context_t _io_context;
std::vector<std::unique_ptr<task>> _pending_tasks;
private:
void epoll_add_in(pollable_fd& fd, std::unique_ptr<task> t);
void epoll_add_out(pollable_fd& fd, std::unique_ptr<task> t);
@@ -260,28 +256,37 @@ public:
future<accept_result> accept(pollable_fd& listen_fd);
future<size_t> read_some(pollable_fd& fd, void* buffer, size_t size);
future<size_t> read_some(pollable_fd& fd, const std::vector<iovec>& iov);
future<size_t> write_some(pollable_fd& fd, void* buffer, size_t size);
future<size_t> write_some(pollable_fd& fd, const void* buffer, size_t size);
future<size_t> write_all(pollable_fd& fd, void* buffer, size_t size);
future<size_t> write_all(pollable_fd& fd, const void* buffer, size_t size);
void run();
void add_task(std::unique_ptr<task>&& t) { _pending_tasks.push_back(std::move(t)); }
private:
void write_all_part(pollable_fd& fd, void* buffer, size_t size,
void write_all_part(pollable_fd& fd, const void* buffer, size_t size,
promise<size_t> result, size_t completed);
friend class pollable_fd;
};
extern reactor the_reactor;
class pollable_fd {
public:
~pollable_fd() { r.forget(*this); ::close(fd); }
~pollable_fd() { the_reactor.forget(*this); ::close(fd); }
future<size_t> read_some(char* buffer, size_t size) { return the_reactor.read_some(*this, buffer, size); }
future<size_t> read_some(uint8_t* buffer, size_t size) { return the_reactor.read_some(*this, buffer, size); }
future<size_t> read_some(const std::vector<iovec>& iov) { return the_reactor.read_some(*this, iov); }
future<size_t> write_all(const char* buffer, size_t size) { return the_reactor.write_all(*this, buffer, size); }
future<size_t> write_all(const uint8_t* buffer, size_t size) { return the_reactor.write_all(*this, buffer, size); }
future<accept_result> accept() { return the_reactor.accept(*this); }
protected:
explicit pollable_fd(reactor& r, int fd) : r(r), fd(fd) {}
explicit pollable_fd(int fd) : fd(fd) {}
pollable_fd(const pollable_fd&) = delete;
void operator=(const pollable_fd&) = delete;
reactor& r;
int fd;
int events = 0;
std::unique_ptr<task> pollin;
@@ -289,6 +294,85 @@ protected:
friend class reactor;
};
// A temporary_buffer either points inside a larger buffer, or, if the requested size
// is too large, or if the larger buffer is scattered, contains its own storage.
//
// A temporary_buffer must be consumed before the next operation on the underlying
// input_stream_buffer is initiated.
template <typename CharType>
class temporary_buffer {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
std::unique_ptr<CharType[]> _own_buffer;
CharType* _buffer;
size_t _size;
public:
explicit temporary_buffer(size_t size) : _own_buffer(new CharType[size]), _buffer(_own_buffer.get()), _size(size) {}
explicit temporary_buffer(CharType* borrow, size_t size) : _own_buffer(), _buffer(borrow), _size(size) {}
temporary_buffer() = delete;
temporary_buffer(const temporary_buffer&) = delete;
temporary_buffer(temporary_buffer&& x) : _own_buffer(std::move(x._own_buffer)), _buffer(x._buffer), _size(x._size) {
x._buffer = nullptr;
x._size = 0;
}
void operator=(const temporary_buffer&) = delete;
temporary_buffer& operator=(temporary_buffer&& x) {
_own_buffer = std::move(x._own_buffer);
_buffer = x._buffer;
_size = x._size;
x._buffer = nullptr;
x._size = 0;
return *this;
}
const CharType* get() const { return _buffer; }
CharType* get_write() { return _buffer; }
size_t size() const { return _size; }
const CharType* begin() { return _buffer; }
const CharType* end() { return _buffer + _size; }
bool owning() const { return bool(_own_buffer); }
temporary_buffer prefix(size_t size) && {
auto ret = std::move(*this);
ret._size = size;
return ret;
}
};
template <typename CharType>
class input_stream_buffer {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
pollable_fd& _fd;
std::unique_ptr<CharType[]> _buf;
size_t _size;
size_t _begin = 0;
size_t _end = 0;
private:
using tmp_buf = temporary_buffer<CharType>;
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
tmp_buf allocate(size_t n) {
if (n <= possibly_available()) {
return tmp_buf(_buf.get() + _begin, n);
} else {
return tmp_buf(n);
}
}
void advance(size_t n) {
_begin += n;
if (_begin == _end) {
_begin = _end = 0;
}
}
public:
using char_type = CharType;
input_stream_buffer(pollable_fd& fd, size_t size) : _fd(fd), _buf(new char_type[size]), _size(size) {}
future<temporary_buffer<CharType>> read_exactly(size_t n);
future<temporary_buffer<CharType>> read_until(size_t limit, CharType eol);
future<temporary_buffer<CharType>> read_until(size_t limit, const CharType* eol, size_t eol_len);
private:
void read_exactly_part(size_t n, promise<tmp_buf> pr, tmp_buf buf, size_t completed);
void read_until_part(size_t n, CharType eol, promise<tmp_buf> pr, tmp_buf buf, size_t completed);
};
inline
future<accept_result>
reactor::accept(pollable_fd& listenfd) {
@@ -299,7 +383,7 @@ reactor::accept(pollable_fd& listenfd) {
socklen_t sl = sizeof(&sa.u.sas);
int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC);
assert(fd != -1);
pr.set_value(accept_result{std::unique_ptr<pollable_fd>(new pollable_fd(*this, fd)), sa});
pr.set_value(accept_result{std::unique_ptr<pollable_fd>(new pollable_fd(fd)), sa});
}));
return fut;
}
@@ -319,7 +403,23 @@ reactor::read_some(pollable_fd& fd, void* buffer, size_t len) {
inline
future<size_t>
reactor::write_some(pollable_fd& fd, void* buffer, size_t len) {
reactor::read_some(pollable_fd& fd, const std::vector<iovec>& iov) {
promise<size_t> pr;
auto fut = pr.get_future();
epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable {
::msghdr mh = {};
mh.msg_iov = &iov[0];
mh.msg_iovlen = iov.size();
ssize_t r = ::recvmsg(rfd, &mh, 0);
assert(r != -1);
pr.set_value(r);
}));
return fut;
}
inline
future<size_t>
reactor::write_some(pollable_fd& fd, const void* buffer, size_t len) {
promise<size_t> pr;
auto fut = pr.get_future();
epoll_add_out(fd, make_task([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable {
@@ -332,12 +432,12 @@ reactor::write_some(pollable_fd& fd, void* buffer, size_t len) {
inline
void
reactor::write_all_part(pollable_fd& fd, void* buffer, size_t len,
reactor::write_all_part(pollable_fd& fd, const void* buffer, size_t len,
promise<size_t> result, size_t completed) {
if (completed == len) {
result.set_value(completed);
} else {
write_some(fd, static_cast<char*>(buffer) + completed, len - completed).then(
write_some(fd, static_cast<const char*>(buffer) + completed, len - completed).then(
[&fd, buffer, len, result = std::move(result), completed, this] (future<size_t> part) mutable {
write_all_part(fd, buffer, len, std::move(result), completed + part.get());
});
@@ -346,7 +446,7 @@ reactor::write_all_part(pollable_fd& fd, void* buffer, size_t len,
inline
future<size_t>
reactor::write_all(pollable_fd& fd, void* buffer, size_t len) {
reactor::write_all(pollable_fd& fd, const void* buffer, size_t len) {
assert(len);
promise<size_t> pr;
auto fut = pr.get_future();
@@ -354,6 +454,99 @@ reactor::write_all(pollable_fd& fd, void* buffer, size_t len) {
return fut;
}
template <typename CharType>
void input_stream_buffer<CharType>::read_exactly_part(size_t n, promise<tmp_buf> pr, tmp_buf out, size_t completed) {
if (available()) {
auto now = std::min(n - completed, available());
if (out.owned()) {
std::copy(_buf.get() + _begin, _buf.get() + _begin + now, out.get() + completed);
}
advance(now);
completed += now;
}
if (completed == n) {
pr.set_value(out);
return;
}
// _buf is now empty
if (out.owned()) {
_fd.read_some(out.get() + completed, n - completed).then(
[this, pr = std::move(pr), out = std::move(out), completed, n] (future<size_t> now) mutable {
completed += now.get();
read_exactly_part(n, std::move(pr), std::move(out), completed);
});
} else {
_fd.read_some(_buf.get(), _size).then(
[this, pr = std::move(pr), out = std::move(out), completed, n] (future<size_t> now) mutable {
_end = now.get();
read_exactly_part(n, std::move(pr), std::move(out), completed);
});
}
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream_buffer<CharType>::read_exactly(size_t n) {
promise<tmp_buf> pr;
auto fut = pr.get_future();
auto buf = allocate(n);
read_exactly_part(n, std::move(pr), buf, 0);
return fut;
}
template <typename CharType>
void input_stream_buffer<CharType>::read_until_part(size_t limit, CharType eol, promise<tmp_buf> pr, tmp_buf out,
size_t completed) {
auto to_search = std::min(limit - completed, available());
auto i = std::find(_buf.get() + _begin, _buf.get() + _begin + to_search, eol);
auto nr_found = i - (_buf.get() + _begin);
if (i != _buf.get() + _begin + to_search || completed + nr_found == limit) {
if (out.owning()) {
std::copy(_buf.get() + _begin, i, out.get_write() + completed);
}
advance(nr_found);
completed += nr_found;
pr.set_value(std::move(out).prefix(completed));
} else {
if (!out.owning() && _end == _size) {
// wrapping around, must allocate
auto new_out = tmp_buf(limit);
std::copy(out.begin(), out.end(), new_out.get_write());
out = std::move(new_out);
}
if (!out.owning()) {
std::copy(_buf.get() + _begin, _buf.get() + _end, out.get_write() + completed);
completed += _end - _begin;
_begin = _end = 0;
}
_fd.read_some(_buf.get() + _end, _size - _end).then(
[this, limit, eol, pr = std::move(pr), out = std::move(out), completed] (future<size_t> now) mutable {
_end += now.get();
read_until_part(limit, eol, std::move(pr), std::move(out), completed);
});
}
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream_buffer<CharType>::read_until(size_t limit, CharType eol) {
promise<tmp_buf> pr;
auto fut = pr.get_future();
read_until_part(limit, eol, std::move(pr), allocate(possibly_available()), 0);
return fut;
}
template <typename T>
void future_state<T>::make_ready() {
if (_task) {
the_reactor.add_task(std::move(_task));
}
}
#if 0
future<temporary_buffer<CharType>> read_until(size_t limit, const CharType* eol, size_t eol_len);
#endif
#endif /* REACTOR_HH_ */

View File

@@ -13,6 +13,7 @@
#include <string>
#include <cstring>
#include <stdexcept>
#include <initializer_list>
template <typename char_type, typename size_type, size_type max_size>
class basic_sstring {
@@ -59,7 +60,7 @@ public:
x.u.internal.size = 0;
x.u.internal.str[0] = '\0';
}
basic_sstring(const char* x, size_t len) {
basic_sstring(const char_type* x, size_t len) {
if (size_type(size) != size) {
throw std::overflow_error("sstring overflow");
}
@@ -73,8 +74,9 @@ public:
std::copy(x, x + size + 1, u.external.str);
}
}
basic_sstring(const char* x) : basic_sstring(x, std::strlen(x)) {}
basic_sstring(std::string& x) : basic_sstring(x.c_str(), x.size()) {}
basic_sstring(const char_type* x) : basic_sstring(x, std::strlen(x)) {}
basic_sstring(std::basic_string<char_type>& x) : basic_sstring(x.c_str(), x.size()) {}
basic_sstring(std::initializer_list<char_type> x) : basic_sstring(x.begin(), x.end() - x.begin()) {}
~basic_sstring() noexcept {
if (!is_external()) {
delete[] u.external.str;

View File

@@ -9,18 +9,16 @@
#include <iostream>
struct test {
reactor r;
std::unique_ptr<pollable_fd> listener;
struct connection {
connection(reactor& r, std::unique_ptr<pollable_fd> fd) : r(r), fd(std::move(fd)) {}
reactor& r;
connection(std::unique_ptr<pollable_fd> fd) : fd(std::move(fd)) {}
std::unique_ptr<pollable_fd> fd;
char buffer[8192];
void copy_data() {
r.read_some(*fd, buffer, sizeof(buffer)).then([this] (future<size_t> fut) {
fd->read_some(buffer, sizeof(buffer)).then([this] (future<size_t> fut) {
auto n = fut.get();
if (n) {
r.write_all(*fd, buffer, n).then([this, n] (future<size_t> fut) {
fd->write_all(buffer, n).then([this, n] (future<size_t> fut) {
if (fut.get() == n) {
copy_data();
}
@@ -32,11 +30,11 @@ struct test {
}
};
void new_connection(accept_result&& accepted) {
auto c = new connection(r, std::move(std::get<0>(accepted)));
auto c = new connection(std::move(std::get<0>(accepted)));
c->copy_data();
}
void start_accept() {
r.accept(*listener).then([this] (future<accept_result> fut) {
the_reactor.accept(*listener).then([this] (future<accept_result> fut) {
new_connection(fut.get());
start_accept();
});
@@ -49,9 +47,9 @@ int main(int ac, char** av)
ipv4_addr addr{{}, 10000};
listen_options lo;
lo.reuse_address = true;
t.listener = t.r.listen(make_ipv4_address(addr), lo);
t.listener = the_reactor.listen(make_ipv4_address(addr), lo);
t.start_accept();
t.r.run();
the_reactor.run();
return 0;
}