From 8a4df2cc3a8c42ede35991c82511539e666ec20e Mon Sep 17 00:00:00 2001 From: Michael Fabian 'Xaymar' Dirks Date: Tue, 8 Jan 2019 03:06:08 +0100 Subject: [PATCH] samples/single-process-ipc: Add single process IPC sample A basic sample to show the layout required to do any IPC work for both client and server. It does not have a multi-process architecture and instead relies on threading, but this already shows the speed that should be expected when using DataPath. --- samples/CMakeLists.txt | 1 + samples/single-process-ipc/.gitignore | 1 + samples/single-process-ipc/CMakeLists.txt | 25 ++ samples/single-process-ipc/main.cpp | 308 ++++++++++++++++++++++ 4 files changed, 335 insertions(+) create mode 100644 samples/CMakeLists.txt create mode 100644 samples/single-process-ipc/.gitignore create mode 100644 samples/single-process-ipc/CMakeLists.txt create mode 100644 samples/single-process-ipc/main.cpp diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt new file mode 100644 index 0000000..e56c96c --- /dev/null +++ b/samples/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(single-process-ipc) \ No newline at end of file diff --git a/samples/single-process-ipc/.gitignore b/samples/single-process-ipc/.gitignore new file mode 100644 index 0000000..796b96d --- /dev/null +++ b/samples/single-process-ipc/.gitignore @@ -0,0 +1 @@ +/build diff --git a/samples/single-process-ipc/CMakeLists.txt b/samples/single-process-ipc/CMakeLists.txt new file mode 100644 index 0000000..b51309c --- /dev/null +++ b/samples/single-process-ipc/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.5) +project(sample_single-process-ipc) + +SET(PROJECT_SOURCES + "${PROJECT_SOURCE_DIR}/main.cpp" +) + +SET(PROJECT_LIBRARIES + datapath +) + +# Includes +include_directories( + ${PROJECT_SOURCE_DIR} +) + +# Building +ADD_EXECUTABLE(${PROJECT_NAME} + ${PROJECT_SOURCES} +) + +# Linking +TARGET_LINK_LIBRARIES(${PROJECT_NAME} + ${PROJECT_LIBRARIES} +) diff --git a/samples/single-process-ipc/main.cpp b/samples/single-process-ipc/main.cpp new file mode 100644 index 0000000..e7840fa --- /dev/null +++ b/samples/single-process-ipc/main.cpp @@ -0,0 +1,308 @@ +/* +Sample for DataPath +Copyright (C) 2019 Michael Fabian Dirks + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "datapath.hpp" + +std::vector messages = { + "This is an introduction to the basics of IPC. It will simply send strings to the other side, with the other side " + "verifying the string and then sending that string back unmodified.", + "As this is just a sample, there is no need for this to be as accurate as possible."}; + +static auto log_time = std::chrono::high_resolution_clock::now(); + +void log(std::string format, ...) +{ + // Time + uint32_t hour, minute, second; + uint64_t nanosecond; + + { + auto log_now = std::chrono::high_resolution_clock::now(); + + nanosecond = (log_now - log_time).count(); + second = nanosecond / 1000000000; + nanosecond %= 1000000000; + minute = second / 60; + second %= 60; + hour = minute / 60; + minute %= 60; + } + + // Message + std::vector msg; + { + va_list args; + va_start(args, format); + msg.resize(vsnprintf(nullptr, 0, format.c_str(), args)); + vsnprintf(msg.data(), msg.size(), format.c_str(), args); + va_end(args); + } + + printf("[%02" PRIu32 ":%02" PRIu32 ":%02" PRIu32 ".%09" PRIu64 "] %.*s\n", hour, minute, second, nanosecond, + (int)msg.size(), msg.data()); +} + +// For simplicity, client and server will be isolated. +class server { + std::shared_ptr dp; + + struct client { + std::shared_ptr dp; + server* parent; + + std::mutex task_lock; + std::list> tasks; + + struct { + std::thread task; + bool shutdown; + } watcher; + + void _watcher() + { + while (!this->watcher.shutdown) { + size_t wait_cnt = 0; + + { + std::unique_lock ul(this->task_lock); + wait_cnt = this->tasks.size(); + } + + if (wait_cnt == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + std::vector waits; + { + std::unique_lock ul(this->task_lock); + waits.reserve(this->tasks.size()); + for (auto task : this->tasks) { + waits.push_back(&(*task)); + } + } + + size_t index = 0; + datapath::error ec = + datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(0)); + if (ec != datapath::error::Success) { + ec = datapath::waitable::wait_any(waits, index, + std::chrono::milliseconds(1)); + } + + if (ec == datapath::error::Success) { + std::unique_lock ul(this->task_lock); + std::list>::iterator task; + for (auto itr = this->tasks.begin(); itr != this->tasks.end(); itr++) { + if (&*(*itr) == waits[index]) { + task = itr; + break; + } + } + this->tasks.erase(task); + } + } + } + } + + void handle_close() + { + std::unique_lock ul(this->parent->socket_lock); + this->parent->sockets.erase(dp); + } + + void handle_message(const std::vector& data) + { + std::shared_ptr task; + datapath::error ec = dp->write(task, data); + if (ec == datapath::error::Success) { + std::unique_lock ul(this->task_lock); + tasks.push_back(task); + } + } + + public: + client(server* parent, std::shared_ptr socket) + { + this->parent = parent; + dp = socket; + socket->on_close.add(std::bind(&server::client::handle_close, this)); + socket->on_message.add(std::bind(&server::client::handle_message, this, std::placeholders::_1)); + + this->watcher.shutdown = false; + this->watcher.task = std::thread(std::bind(&server::client::_watcher, this)); + } + + ~client() + { + this->watcher.shutdown = true; + if (this->watcher.task.joinable()) { + this->watcher.task.join(); + } + + dp->close(); + }; + }; + + std::mutex socket_lock; + std::map, std::shared_ptr> sockets; + + private: + void handle_accept(bool& should_accept, std::shared_ptr socket) + { + if (!socket->good()) { + should_accept = false; + return; + } + + { + std::unique_lock ul(this->socket_lock); + this->sockets.insert({socket, std::make_shared(this, socket)}); + } + } + + public: + server(std::string path) + { + datapath::error ec = datapath::host(dp, path, + datapath::permissions::User | datapath::permissions::Group + | datapath::permissions::World); + if (ec != datapath::error::Success) { + throw std::exception(); + } + + dp->on_accept.add( + std::bind(&server::handle_accept, this, std::placeholders::_1, std::placeholders::_2)); + } + + ~server() + { + if (dp) { + dp->close(); + } + } +}; + +class client { + std::shared_ptr dp; + + struct { + std::thread task; + bool shutdown; + } watcher; + + private: + void _watcher() + { + std::shared_ptr task; + + while (!this->watcher.shutdown) { + auto start = std::chrono::high_resolution_clock::now(); + if (!task) { + std::vector msg; + msg.resize(sizeof(uint64_t)); + reinterpret_cast(msg[0]) = + std::chrono::high_resolution_clock::now().time_since_epoch().count(); + datapath::error ec = dp->write(task, msg); + } + + datapath::error ec = task->wait(std::chrono::milliseconds(0)); + if (ec != datapath::error::Success) { + ec = task->wait(std::chrono::milliseconds(100)); + } + + if (ec == datapath::error::Success) { + log("Sent Message, error code %lu...", uint32_t(ec)); + task.reset(); + } else if (ec != datapath::error::TimedOut) { + log("Failed, error code %lu...", uint32_t(ec)); + task.reset(); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto dur = end - start; + + if (!task) { + if (dur < std::chrono::milliseconds(250)) { + std::this_thread::sleep_for(std::chrono::milliseconds(250) - dur); + } + } + } + } + + void handle_message(const std::vector& data) + { + // Message is a timestamp of when it was sent. + uint64_t cur = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + uint64_t msg = reinterpret_cast(data[0]); + uint64_t dlt = cur - msg; + + log("Message RTT of %llu ns.", dlt); + } + + void handle_close() + { + this->watcher.shutdown = true; + } + + void close() + { + this->watcher.shutdown = true; + if (this->watcher.task.joinable()) { + this->watcher.task.join(); + } + } + + public: + client(std::string path) + { + datapath::error ec = datapath::connect(dp, path); + if (ec != datapath::error::Success) { + throw std::exception(); + } + this->watcher.shutdown = false; + this->watcher.task = std::thread(std::bind(&client::_watcher, this)); + + dp->on_close.add(std::bind(&client::handle_close, this)); + dp->on_message.add(std::bind(&client::handle_message, this, std::placeholders::_1)); + } + + ~client() + { + close(); + } +}; + +int main(int argc, const char* argv[]) +{ + std::shared_ptr myServer = std::make_shared("single-process-ipc"); + std::shared_ptr myClient = std::make_shared("single-process-ipc"); + + std::cin.get(); + myClient.reset(); + myServer.reset(); + + return 0; +}