server: Asynchronous Accept events

This detaches accept from the task workflow.
This commit is contained in:
Michael Fabian 'Xaymar' Dirks
2019-01-06 23:41:33 +01:00
parent 98654b68f1
commit 18c772fafb
3 changed files with 97 additions and 31 deletions
+12 -1
View File
@@ -19,12 +19,23 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once #pragma once
#include <memory> #include <memory>
#include "error.hpp" #include "error.hpp"
#include "event.hpp"
#include "isocket.hpp" #include "isocket.hpp"
namespace datapath { namespace datapath {
class iserver { class iserver {
virtual datapath::error accept(std::shared_ptr<datapath::isocket>& socket) = 0; public /*event*/:
/** Accepted Connection Event
* This event is called if a new connection is pending evaluation.
*
* @param bool& Set to true to accept, false to decline.
* @param std::shared_ptr<datapath::isocket> Socket.
* @return void
*/
datapath::event<bool&, std::shared_ptr<datapath::isocket>> on_accept;
public:
virtual datapath::error close() = 0; virtual datapath::error close() = 0;
}; };
} // namespace datapath } // namespace datapath
+83 -26
View File
@@ -94,22 +94,18 @@ void datapath::windows::server::_watcher()
std::map<HANDLE, std::shared_ptr<datapath::windows::overlapped>> ovmap; std::map<HANDLE, std::shared_ptr<datapath::windows::overlapped>> ovmap;
while (!this->watcher.shutdown) { while (!this->watcher.shutdown) {
// Verify existing connections. // Wait for any overlapped objects.
{ if (ovmap.size() > 0) {
std::unique_lock<std::mutex> ul(this->lock); // No lock as we aren't touching any list or map yet.
for (auto itr = this->active_sockets.begin(); itr != this->active_sockets.end(); itr++) { std::vector<datapath::waitable*> waits;
if (itr->second.expired()) { waits.reserve(ovmap.size());
this->active_sockets.erase(itr);
this->waiting_sockets.push_back(itr->first); for (auto kv : ovmap) {
continue; waits.push_back(&(*kv.second));
}
auto obj = itr->second.lock();
if (!obj->good()) {
this->active_sockets.erase(itr);
this->waiting_sockets.push_back(itr->first);
continue;
}
} }
size_t index = 0;
datapath::error ec = datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(1));
} }
// Update list of overlappeds to track. // Update list of overlappeds to track.
@@ -136,26 +132,87 @@ void datapath::windows::server::_watcher()
} }
} }
// Wait for any overlapped objects. // Notify of pending sockets.
if (ovmap.size() > 0) { {
// No lock as we aren't touching any list or map yet. std::unique_lock<std::mutex> ul(this->lock);
std::vector<datapath::waitable*> waits; if (this->on_accept) {
waits.reserve(ovmap.size()); for (auto itr = this->pending_sockets.begin(); itr != this->pending_sockets.end();
itr++) {
bool accept = true;
for (auto kv : ovmap) { auto sock = std::make_shared<datapath::windows::socket>();
waits.push_back(&(*kv.second)); sock->_connect(*itr);
auto isock = std::dynamic_pointer_cast<datapath::isocket>(sock);
this->on_accept(accept, isock);
if (accept) {
this->pending_sockets.erase(itr);
this->active_sockets.insert({*itr, sock});
if ((this->waiting_sockets.size() + this->pending_sockets.size())
< WIN_BACKLOG_NUM) {
if ((this->sockets.size() <= this->max_clients)
&& (this->max_clients > 0)) {
HANDLE handle = _create_socket(this->path, false);
if (handle != INVALID_HANDLE_VALUE) {
this->sockets.push_back(handle);
this->waiting_sockets.push_back(handle);
}
}
}
} else {
// Force close and return to waiting.
sock->close();
this->pending_sockets.erase(itr);
this->waiting_sockets.push_back(*itr);
}
}
}
} }
size_t index = 0; // Verify existing connections.
datapath::error ec = datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(1)); {
std::unique_lock<std::mutex> ul(this->lock);
for (auto itr = this->active_sockets.begin(); itr != this->active_sockets.end(); itr++) {
if (itr->second.expired()) {
// Enforce backlog size
if ((this->waiting_sockets.size() + this->pending_sockets.size())
< WIN_BACKLOG_NUM) {
this->waiting_sockets.push_back(itr->first);
} else { } else {
DisconnectNamedPipe(itr->first);
CloseHandle(itr->first);
this->sockets.remove(itr->first);
}
this->active_sockets.erase(itr);
continue;
}
auto obj = itr->second.lock();
if (!obj->good()) {
// Enforce backlog size
if ((this->waiting_sockets.size() + this->pending_sockets.size())
< WIN_BACKLOG_NUM) {
this->waiting_sockets.push_back(itr->first);
} else {
DisconnectNamedPipe(itr->first);
CloseHandle(itr->first);
this->sockets.remove(itr->first);
}
this->active_sockets.erase(itr);
continue;
}
}
}
if (ovmap.size() == 0) {
// Just sleep 1ms to not use too much CPU. // Just sleep 1ms to not use too much CPU.
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
} }
} }
datapath::error datapath::windows::server::accept(std::shared_ptr<datapath::isocket>& socket) /*datapath::error datapath::windows::server::accept(std::shared_ptr<datapath::isocket>& socket)
{ {
std::unique_lock<std::mutex> ul(this->lock); std::unique_lock<std::mutex> ul(this->lock);
if (this->pending_sockets.size() == 0) { if (this->pending_sockets.size() == 0) {
@@ -185,7 +242,7 @@ datapath::error datapath::windows::server::accept(std::shared_ptr<datapath::isoc
this->active_sockets.insert({handle, socket}); this->active_sockets.insert({handle, socket});
return datapath::error::Success; return datapath::error::Success;
} }*/
datapath::error datapath::windows::server::close() datapath::error datapath::windows::server::close()
{ {
+1 -3
View File
@@ -31,7 +31,7 @@ extern "C" {
namespace datapath { namespace datapath {
namespace windows { namespace windows {
class server : public iserver { class server : public iserver, public std::enable_shared_from_this<datapath::windows::server> {
bool is_created = false; bool is_created = false;
size_t max_clients = -1; size_t max_clients = -1;
std::string path; std::string path;
@@ -61,8 +61,6 @@ namespace datapath {
void _watcher(); void _watcher();
public /*virtual override*/: public /*virtual override*/:
virtual datapath::error accept(std::shared_ptr<datapath::isocket>& socket) override;
virtual datapath::error close() override; virtual datapath::error close() override;
public: public: