From ebc58c56c3771d38e7fa2281b16fd69f96758953 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 12 Aug 2014 17:06:18 +0300 Subject: [PATCH] checkpoint --- Makefile | 8 +- httpd.cc | 56 ++++++++++++ reactor.cc | 12 ++- reactor.hh | 239 +++++++++++++++++++++++++++++++++++++++++++----- sstring.hh | 8 +- test-reactor.cc | 16 ++-- 6 files changed, 300 insertions(+), 39 deletions(-) create mode 100644 httpd.cc diff --git a/Makefile b/Makefile index 07db558b85..14e0cf0356 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -mode = release +mode = debug sanitize.debug = -fsanitize=address -fsanitize=leak -fsanitize=undefined sanitize.release = @@ -10,11 +10,11 @@ opt.release = -O2 -flto sanitize = $(sanitize.$(mode)) opt = $(opt.$(mode)) -CXXFLAGS = -std=gnu++1y -g -Wall $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden +CXXFLAGS = -std=gnu++1y -g -Wall -Werror $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden tests = test-reactor -all: seastar $(tests) +all: seastar $(tests) httpd clean: rm seastar $(tests) *.o @@ -25,5 +25,7 @@ seastar: main.o reactor.o test-reactor: test-reactor.o reactor.o $(CXX) $(CXXFLAGS) -o $@ $^ +httpd: httpd.o reactor.o + $(CXX) $(CXXFLAGS) -o $@ $^ -include *.d diff --git a/httpd.cc b/httpd.cc new file mode 100644 index 0000000000..574450740d --- /dev/null +++ b/httpd.cc @@ -0,0 +1,56 @@ +/* + * Copyright 2014 Cloudius Systems + */ + +#include "reactor.hh" +#include +#include + +class http_server { + reactor& _r; + std::vector> _listeners; +public: + http_server(reactor& r) : _r(r) {} + void listen(ipv4_addr addr) { + listen_options lo; + lo.reuse_address = true; + do_accepts(_r.listen(make_ipv4_address(addr), lo)); + } + void do_accepts(std::unique_ptr lfd) { + auto l = lfd.get(); + l->accept().then([this, lfd = std::move(lfd)] (future res) mutable { + accept_result ar = res.get(); + auto fd = std::move(std::get<0>(ar)); + auto addr = std::get<1>(ar); + (new connection(std::move(fd), addr))->read(); + do_accepts(std::move(lfd)); + }); + } + class connection { + std::unique_ptr _fd; + socket_address _addr; + input_stream_buffer _read_buf; + static constexpr size_t limit = 4096; + using tmp_buf = temporary_buffer; + public: + connection(std::unique_ptr&& fd, socket_address addr) + : _fd(std::move(fd)), _addr(addr), _read_buf(*_fd, 8192) {} + void read() { + _read_buf.read_until(limit, '\n').then([this] (future fut_start_line) { + auto start_line = fut_start_line.get(); + std::cout << std::string(start_line.begin(), start_line.end()); + }); + } + }; +}; + +int main(int ac, char** av) { + reactor r; + http_server server(r); + server.listen({{}, 10000}); + r.run(); + return 0; + +} + + diff --git a/reactor.cc b/reactor.cc index c0e93f46b1..af420eb83a 100644 --- a/reactor.cc +++ b/reactor.cc @@ -60,11 +60,20 @@ reactor::listen(socket_address sa, listen_options opts) { int r = ::bind(fd, &sa.u.sa, sizeof(sa.u.sas)); assert(r != -1); ::listen(fd, 100); - return std::unique_ptr(new pollable_fd(*this, fd)); + return std::unique_ptr(new pollable_fd(fd)); } void reactor::run() { + std::vector> current_tasks; while (true) { + while (!_pending_tasks.empty()) { + std::swap(_pending_tasks, current_tasks); + for (auto&& tsk : current_tasks) { + tsk->run(); + tsk.reset(); + } + current_tasks.clear(); + } std::array eevt; int nr = ::epoll_wait(_epollfd, eevt.data(), eevt.size(), -1); assert(nr != -1); @@ -99,3 +108,4 @@ socket_address make_ipv4_address(ipv4_addr addr) { return sa; } +reactor the_reactor; diff --git a/reactor.hh b/reactor.hh index c1d5893b89..e7d790fe5c 100644 --- a/reactor.hh +++ b/reactor.hh @@ -20,6 +20,8 @@ #include #include #include +#include +#include class socket_address; class reactor; @@ -111,38 +113,31 @@ struct future_state { bool has_promise() const { return _promise; } bool has_future() const { return _future; } void wait(); + void make_ready(); void set(const T& value) { assert(_state == state::future); _state = state::result; new (&_u.value) T(value); - if (_task) { - _task->run(); - } + make_ready(); } void set(T&& value) { assert(_state == state::future); _state = state::result; new (&_u.value) T(std::move(value)); - if (_task) { - _task->run(); - } + make_ready(); } template void set(A... a) { assert(_state == state::future); _state = state::result; new (&_u.value) T(std::forward(a)...); - if (_task) { - _task->run(); - } + schedule(); } void set_exception(std::exception_ptr ex) { assert(_state == state::future); _state = state::exception; new (&_u.ex) std::exception(ex); - if (_task) { - _task->run(); - } + make_ready(); } T get() { while (_state == state::future) { @@ -244,6 +239,7 @@ class reactor { public: int _epollfd; io_context_t _io_context; + std::vector> _pending_tasks; private: void epoll_add_in(pollable_fd& fd, std::unique_ptr t); void epoll_add_out(pollable_fd& fd, std::unique_ptr t); @@ -260,28 +256,37 @@ public: future accept(pollable_fd& listen_fd); future read_some(pollable_fd& fd, void* buffer, size_t size); + future read_some(pollable_fd& fd, const std::vector& iov); - future write_some(pollable_fd& fd, void* buffer, size_t size); + future write_some(pollable_fd& fd, const void* buffer, size_t size); - future write_all(pollable_fd& fd, void* buffer, size_t size); + future write_all(pollable_fd& fd, const void* buffer, size_t size); void run(); + void add_task(std::unique_ptr&& t) { _pending_tasks.push_back(std::move(t)); } private: - void write_all_part(pollable_fd& fd, void* buffer, size_t size, + void write_all_part(pollable_fd& fd, const void* buffer, size_t size, promise result, size_t completed); friend class pollable_fd; }; +extern reactor the_reactor; + class pollable_fd { public: - ~pollable_fd() { r.forget(*this); ::close(fd); } + ~pollable_fd() { the_reactor.forget(*this); ::close(fd); } + future read_some(char* buffer, size_t size) { return the_reactor.read_some(*this, buffer, size); } + future read_some(uint8_t* buffer, size_t size) { return the_reactor.read_some(*this, buffer, size); } + future read_some(const std::vector& iov) { return the_reactor.read_some(*this, iov); } + future write_all(const char* buffer, size_t size) { return the_reactor.write_all(*this, buffer, size); } + future write_all(const uint8_t* buffer, size_t size) { return the_reactor.write_all(*this, buffer, size); } + future accept() { return the_reactor.accept(*this); } protected: - explicit pollable_fd(reactor& r, int fd) : r(r), fd(fd) {} + explicit pollable_fd(int fd) : fd(fd) {} pollable_fd(const pollable_fd&) = delete; void operator=(const pollable_fd&) = delete; - reactor& r; int fd; int events = 0; std::unique_ptr pollin; @@ -289,6 +294,85 @@ protected: friend class reactor; }; +// A temporary_buffer either points inside a larger buffer, or, if the requested size +// is too large, or if the larger buffer is scattered, contains its own storage. +// +// A temporary_buffer must be consumed before the next operation on the underlying +// input_stream_buffer is initiated. +template +class temporary_buffer { + static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); + std::unique_ptr _own_buffer; + CharType* _buffer; + size_t _size; +public: + explicit temporary_buffer(size_t size) : _own_buffer(new CharType[size]), _buffer(_own_buffer.get()), _size(size) {} + explicit temporary_buffer(CharType* borrow, size_t size) : _own_buffer(), _buffer(borrow), _size(size) {} + temporary_buffer() = delete; + temporary_buffer(const temporary_buffer&) = delete; + temporary_buffer(temporary_buffer&& x) : _own_buffer(std::move(x._own_buffer)), _buffer(x._buffer), _size(x._size) { + x._buffer = nullptr; + x._size = 0; + } + void operator=(const temporary_buffer&) = delete; + temporary_buffer& operator=(temporary_buffer&& x) { + _own_buffer = std::move(x._own_buffer); + _buffer = x._buffer; + _size = x._size; + x._buffer = nullptr; + x._size = 0; + return *this; + } + const CharType* get() const { return _buffer; } + CharType* get_write() { return _buffer; } + size_t size() const { return _size; } + const CharType* begin() { return _buffer; } + const CharType* end() { return _buffer + _size; } + bool owning() const { return bool(_own_buffer); } + temporary_buffer prefix(size_t size) && { + auto ret = std::move(*this); + ret._size = size; + return ret; + } +}; + +template +class input_stream_buffer { + static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); + pollable_fd& _fd; + std::unique_ptr _buf; + size_t _size; + size_t _begin = 0; + size_t _end = 0; +private: + using tmp_buf = temporary_buffer; + size_t available() const { return _end - _begin; } + size_t possibly_available() const { return _size - _begin; } + tmp_buf allocate(size_t n) { + if (n <= possibly_available()) { + return tmp_buf(_buf.get() + _begin, n); + } else { + return tmp_buf(n); + } + } + void advance(size_t n) { + _begin += n; + if (_begin == _end) { + _begin = _end = 0; + } + } +public: + using char_type = CharType; + input_stream_buffer(pollable_fd& fd, size_t size) : _fd(fd), _buf(new char_type[size]), _size(size) {} + future> read_exactly(size_t n); + future> read_until(size_t limit, CharType eol); + future> read_until(size_t limit, const CharType* eol, size_t eol_len); +private: + void read_exactly_part(size_t n, promise pr, tmp_buf buf, size_t completed); + void read_until_part(size_t n, CharType eol, promise pr, tmp_buf buf, size_t completed); +}; + + inline future reactor::accept(pollable_fd& listenfd) { @@ -299,7 +383,7 @@ reactor::accept(pollable_fd& listenfd) { socklen_t sl = sizeof(&sa.u.sas); int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC); assert(fd != -1); - pr.set_value(accept_result{std::unique_ptr(new pollable_fd(*this, fd)), sa}); + pr.set_value(accept_result{std::unique_ptr(new pollable_fd(fd)), sa}); })); return fut; } @@ -319,7 +403,23 @@ reactor::read_some(pollable_fd& fd, void* buffer, size_t len) { inline future -reactor::write_some(pollable_fd& fd, void* buffer, size_t len) { +reactor::read_some(pollable_fd& fd, const std::vector& iov) { + promise pr; + auto fut = pr.get_future(); + epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable { + ::msghdr mh = {}; + mh.msg_iov = &iov[0]; + mh.msg_iovlen = iov.size(); + ssize_t r = ::recvmsg(rfd, &mh, 0); + assert(r != -1); + pr.set_value(r); + })); + return fut; +} + +inline +future +reactor::write_some(pollable_fd& fd, const void* buffer, size_t len) { promise pr; auto fut = pr.get_future(); epoll_add_out(fd, make_task([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable { @@ -332,12 +432,12 @@ reactor::write_some(pollable_fd& fd, void* buffer, size_t len) { inline void -reactor::write_all_part(pollable_fd& fd, void* buffer, size_t len, +reactor::write_all_part(pollable_fd& fd, const void* buffer, size_t len, promise result, size_t completed) { if (completed == len) { result.set_value(completed); } else { - write_some(fd, static_cast(buffer) + completed, len - completed).then( + write_some(fd, static_cast(buffer) + completed, len - completed).then( [&fd, buffer, len, result = std::move(result), completed, this] (future part) mutable { write_all_part(fd, buffer, len, std::move(result), completed + part.get()); }); @@ -346,7 +446,7 @@ reactor::write_all_part(pollable_fd& fd, void* buffer, size_t len, inline future -reactor::write_all(pollable_fd& fd, void* buffer, size_t len) { +reactor::write_all(pollable_fd& fd, const void* buffer, size_t len) { assert(len); promise pr; auto fut = pr.get_future(); @@ -354,6 +454,99 @@ reactor::write_all(pollable_fd& fd, void* buffer, size_t len) { return fut; } +template +void input_stream_buffer::read_exactly_part(size_t n, promise pr, tmp_buf out, size_t completed) { + if (available()) { + auto now = std::min(n - completed, available()); + if (out.owned()) { + std::copy(_buf.get() + _begin, _buf.get() + _begin + now, out.get() + completed); + } + advance(now); + completed += now; + } + if (completed == n) { + pr.set_value(out); + return; + } + // _buf is now empty + if (out.owned()) { + _fd.read_some(out.get() + completed, n - completed).then( + [this, pr = std::move(pr), out = std::move(out), completed, n] (future now) mutable { + completed += now.get(); + read_exactly_part(n, std::move(pr), std::move(out), completed); + }); + } else { + _fd.read_some(_buf.get(), _size).then( + [this, pr = std::move(pr), out = std::move(out), completed, n] (future now) mutable { + _end = now.get(); + read_exactly_part(n, std::move(pr), std::move(out), completed); + }); + } +} + +template +future> +input_stream_buffer::read_exactly(size_t n) { + promise pr; + auto fut = pr.get_future(); + auto buf = allocate(n); + read_exactly_part(n, std::move(pr), buf, 0); + return fut; +} + + +template +void input_stream_buffer::read_until_part(size_t limit, CharType eol, promise pr, tmp_buf out, + size_t completed) { + auto to_search = std::min(limit - completed, available()); + auto i = std::find(_buf.get() + _begin, _buf.get() + _begin + to_search, eol); + auto nr_found = i - (_buf.get() + _begin); + if (i != _buf.get() + _begin + to_search || completed + nr_found == limit) { + if (out.owning()) { + std::copy(_buf.get() + _begin, i, out.get_write() + completed); + } + advance(nr_found); + completed += nr_found; + pr.set_value(std::move(out).prefix(completed)); + } else { + if (!out.owning() && _end == _size) { + // wrapping around, must allocate + auto new_out = tmp_buf(limit); + std::copy(out.begin(), out.end(), new_out.get_write()); + out = std::move(new_out); + } + if (!out.owning()) { + std::copy(_buf.get() + _begin, _buf.get() + _end, out.get_write() + completed); + completed += _end - _begin; + _begin = _end = 0; + } + _fd.read_some(_buf.get() + _end, _size - _end).then( + [this, limit, eol, pr = std::move(pr), out = std::move(out), completed] (future now) mutable { + _end += now.get(); + read_until_part(limit, eol, std::move(pr), std::move(out), completed); + }); + } +} + +template +future> +input_stream_buffer::read_until(size_t limit, CharType eol) { + promise pr; + auto fut = pr.get_future(); + read_until_part(limit, eol, std::move(pr), allocate(possibly_available()), 0); + return fut; +} + +template +void future_state::make_ready() { + if (_task) { + the_reactor.add_task(std::move(_task)); + } +} + +#if 0 +future> read_until(size_t limit, const CharType* eol, size_t eol_len); +#endif #endif /* REACTOR_HH_ */ diff --git a/sstring.hh b/sstring.hh index f20197f8c2..7c0f15401f 100644 --- a/sstring.hh +++ b/sstring.hh @@ -13,6 +13,7 @@ #include #include #include +#include template class basic_sstring { @@ -59,7 +60,7 @@ public: x.u.internal.size = 0; x.u.internal.str[0] = '\0'; } - basic_sstring(const char* x, size_t len) { + basic_sstring(const char_type* x, size_t len) { if (size_type(size) != size) { throw std::overflow_error("sstring overflow"); } @@ -73,8 +74,9 @@ public: std::copy(x, x + size + 1, u.external.str); } } - basic_sstring(const char* x) : basic_sstring(x, std::strlen(x)) {} - basic_sstring(std::string& x) : basic_sstring(x.c_str(), x.size()) {} + basic_sstring(const char_type* x) : basic_sstring(x, std::strlen(x)) {} + basic_sstring(std::basic_string& x) : basic_sstring(x.c_str(), x.size()) {} + basic_sstring(std::initializer_list x) : basic_sstring(x.begin(), x.end() - x.begin()) {} ~basic_sstring() noexcept { if (!is_external()) { delete[] u.external.str; diff --git a/test-reactor.cc b/test-reactor.cc index 1d18f07b68..ade3b53aa6 100644 --- a/test-reactor.cc +++ b/test-reactor.cc @@ -9,18 +9,16 @@ #include struct test { - reactor r; std::unique_ptr listener; struct connection { - connection(reactor& r, std::unique_ptr fd) : r(r), fd(std::move(fd)) {} - reactor& r; + connection(std::unique_ptr fd) : fd(std::move(fd)) {} std::unique_ptr fd; char buffer[8192]; void copy_data() { - r.read_some(*fd, buffer, sizeof(buffer)).then([this] (future fut) { + fd->read_some(buffer, sizeof(buffer)).then([this] (future fut) { auto n = fut.get(); if (n) { - r.write_all(*fd, buffer, n).then([this, n] (future fut) { + fd->write_all(buffer, n).then([this, n] (future fut) { if (fut.get() == n) { copy_data(); } @@ -32,11 +30,11 @@ struct test { } }; void new_connection(accept_result&& accepted) { - auto c = new connection(r, std::move(std::get<0>(accepted))); + auto c = new connection(std::move(std::get<0>(accepted))); c->copy_data(); } void start_accept() { - r.accept(*listener).then([this] (future fut) { + the_reactor.accept(*listener).then([this] (future fut) { new_connection(fut.get()); start_accept(); }); @@ -49,9 +47,9 @@ int main(int ac, char** av) ipv4_addr addr{{}, 10000}; listen_options lo; lo.reuse_address = true; - t.listener = t.r.listen(make_ipv4_address(addr), lo); + t.listener = the_reactor.listen(make_ipv4_address(addr), lo); t.start_accept(); - t.r.run(); + the_reactor.run(); return 0; }