diff --git a/include/iserver.hpp b/include/iserver.hpp index 449b8fe..ae0d5ae 100644 --- a/include/iserver.hpp +++ b/include/iserver.hpp @@ -19,12 +19,23 @@ along with this program. If not, see . #pragma once #include #include "error.hpp" +#include "event.hpp" #include "isocket.hpp" namespace datapath { class iserver { - virtual datapath::error accept(std::shared_ptr& socket) = 0; + public /*event*/: + /** Accepted Connection Event + * This event is called if a new connection is pending evaluation. + * + * @param bool& Set to true to accept, false to decline. + * @param std::shared_ptr Socket. + * @return void + */ + datapath::event> on_accept; + + public: virtual datapath::error close() = 0; }; } // namespace datapath diff --git a/source/windows/server.cpp b/source/windows/server.cpp index de0dc4f..3ea88a0 100644 --- a/source/windows/server.cpp +++ b/source/windows/server.cpp @@ -94,22 +94,18 @@ void datapath::windows::server::_watcher() std::map> ovmap; while (!this->watcher.shutdown) { - // Verify existing connections. - { - std::unique_lock ul(this->lock); - for (auto itr = this->active_sockets.begin(); itr != this->active_sockets.end(); itr++) { - if (itr->second.expired()) { - this->active_sockets.erase(itr); - this->waiting_sockets.push_back(itr->first); - continue; - } - auto obj = itr->second.lock(); - if (!obj->good()) { - this->active_sockets.erase(itr); - this->waiting_sockets.push_back(itr->first); - continue; - } + // Wait for any overlapped objects. + if (ovmap.size() > 0) { + // No lock as we aren't touching any list or map yet. + std::vector waits; + waits.reserve(ovmap.size()); + + for (auto kv : ovmap) { + waits.push_back(&(*kv.second)); } + + size_t index = 0; + datapath::error ec = datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(1)); } // Update list of overlappeds to track. @@ -136,26 +132,87 @@ void datapath::windows::server::_watcher() } } - // Wait for any overlapped objects. - if (ovmap.size() > 0) { - // No lock as we aren't touching any list or map yet. - std::vector waits; - waits.reserve(ovmap.size()); + // Notify of pending sockets. + { + std::unique_lock ul(this->lock); + if (this->on_accept) { + for (auto itr = this->pending_sockets.begin(); itr != this->pending_sockets.end(); + itr++) { + bool accept = true; - for (auto kv : ovmap) { - waits.push_back(&(*kv.second)); + auto sock = std::make_shared(); + sock->_connect(*itr); + + auto isock = std::dynamic_pointer_cast(sock); + this->on_accept(accept, isock); + + if (accept) { + this->pending_sockets.erase(itr); + this->active_sockets.insert({*itr, sock}); + + if ((this->waiting_sockets.size() + this->pending_sockets.size()) + < WIN_BACKLOG_NUM) { + if ((this->sockets.size() <= this->max_clients) + && (this->max_clients > 0)) { + HANDLE handle = _create_socket(this->path, false); + if (handle != INVALID_HANDLE_VALUE) { + this->sockets.push_back(handle); + this->waiting_sockets.push_back(handle); + } + } + } + } else { + // Force close and return to waiting. + sock->close(); + this->pending_sockets.erase(itr); + this->waiting_sockets.push_back(*itr); + } + } } + } - size_t index = 0; - datapath::error ec = datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(1)); - } else { + // Verify existing connections. + { + std::unique_lock ul(this->lock); + for (auto itr = this->active_sockets.begin(); itr != this->active_sockets.end(); itr++) { + if (itr->second.expired()) { + // Enforce backlog size + if ((this->waiting_sockets.size() + this->pending_sockets.size()) + < WIN_BACKLOG_NUM) { + this->waiting_sockets.push_back(itr->first); + } else { + DisconnectNamedPipe(itr->first); + CloseHandle(itr->first); + this->sockets.remove(itr->first); + } + this->active_sockets.erase(itr); + continue; + } + auto obj = itr->second.lock(); + if (!obj->good()) { + // Enforce backlog size + if ((this->waiting_sockets.size() + this->pending_sockets.size()) + < WIN_BACKLOG_NUM) { + this->waiting_sockets.push_back(itr->first); + } else { + DisconnectNamedPipe(itr->first); + CloseHandle(itr->first); + this->sockets.remove(itr->first); + } + this->active_sockets.erase(itr); + continue; + } + } + } + + if (ovmap.size() == 0) { // Just sleep 1ms to not use too much CPU. std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } -datapath::error datapath::windows::server::accept(std::shared_ptr& socket) +/*datapath::error datapath::windows::server::accept(std::shared_ptr& socket) { std::unique_lock ul(this->lock); if (this->pending_sockets.size() == 0) { @@ -185,7 +242,7 @@ datapath::error datapath::windows::server::accept(std::shared_ptractive_sockets.insert({handle, socket}); return datapath::error::Success; -} +}*/ datapath::error datapath::windows::server::close() { diff --git a/source/windows/server.hpp b/source/windows/server.hpp index 4c2166d..18a3532 100644 --- a/source/windows/server.hpp +++ b/source/windows/server.hpp @@ -31,7 +31,7 @@ extern "C" { namespace datapath { namespace windows { - class server : public iserver { + class server : public iserver, public std::enable_shared_from_this { bool is_created = false; size_t max_clients = -1; std::string path; @@ -61,8 +61,6 @@ namespace datapath { void _watcher(); public /*virtual override*/: - virtual datapath::error accept(std::shared_ptr& socket) override; - virtual datapath::error close() override; public: