diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index e56c96c..0b8bd2e 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -1 +1,2 @@ +add_subdirectory(benchmark) add_subdirectory(single-process-ipc) \ No newline at end of file diff --git a/samples/benchmark/CMakeLists.txt b/samples/benchmark/CMakeLists.txt new file mode 100644 index 0000000..6971265 --- /dev/null +++ b/samples/benchmark/CMakeLists.txt @@ -0,0 +1,27 @@ +cmake_minimum_required(VERSION 3.5) +project(sample_benchmark) + +SET(PROJECT_SOURCES + "main.cpp" + "measurer.hpp" + "measurer.cpp" +) + +SET(PROJECT_LIBRARIES + datapath +) + +# Includes +include_directories( + ${PROJECT_SOURCE_DIR} +) + +# Building +ADD_EXECUTABLE(${PROJECT_NAME} + ${PROJECT_SOURCES} +) + +# Linking +TARGET_LINK_LIBRARIES(${PROJECT_NAME} + ${PROJECT_LIBRARIES} +) diff --git a/samples/benchmark/main.cpp b/samples/benchmark/main.cpp new file mode 100644 index 0000000..fe28f3f --- /dev/null +++ b/samples/benchmark/main.cpp @@ -0,0 +1,341 @@ +/* +Sample for DataPath +Copyright (C) 2019 Michael Fabian Dirks + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "datapath.hpp" +#include "measurer.hpp" + +static auto log_time = std::chrono::high_resolution_clock::now(); + +#define COUNT 10000 + +void log(std::string format, ...) +{ + // Time + uint32_t hour, minute, second; + uint64_t nanosecond; + + { + auto log_now = std::chrono::high_resolution_clock::now(); + + nanosecond = (log_now - log_time).count(); + second = nanosecond / 1000000000; + nanosecond %= 1000000000; + minute = second / 60; + second %= 60; + hour = minute / 60; + minute %= 60; + } + + // Message + std::vector msg; + { + va_list args; + va_start(args, format); + msg.resize(vsnprintf(nullptr, 0, format.c_str(), args) + 1); + vsnprintf(msg.data(), msg.size(), format.c_str(), args); + va_end(args); + } + + printf("[%02" PRIu32 ":%02" PRIu32 ":%02" PRIu32 ".%09" PRIu64 "] %.*s\n", hour, minute, second, nanosecond, + (int)msg.size(), msg.data()); +} + +// For simplicity, client and server will be isolated. +class server { + std::shared_ptr dp; + + struct client { + std::shared_ptr dp; + server* parent; + + std::mutex task_lock; + std::list> tasks; + + struct { + std::thread task; + bool shutdown; + } watcher; + + void _watcher() + { + std::vector waits; + + while (!this->watcher.shutdown) { + size_t wait_cnt = 0; + + { + std::unique_lock ul(this->task_lock); + wait_cnt = this->tasks.size(); + } + + if (wait_cnt == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + { + std::unique_lock ul(this->task_lock); + waits.resize(0); + waits.reserve(this->tasks.size()); + for (auto task : this->tasks) { + waits.push_back(&(*task)); + if (waits.size() >= 64) + break; + } + } + + size_t index = 0; + datapath::error ec = + datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(0)); + if (ec != datapath::error::Success) { + ec = datapath::waitable::wait_any(waits, index, + std::chrono::milliseconds(1)); + } + + if (ec == datapath::error::Success) { + std::unique_lock ul(this->task_lock); + std::list>::iterator task; + for (auto itr = this->tasks.begin(); itr != this->tasks.end(); itr++) { + if (&*(*itr) == waits[index]) { + task = itr; + break; + } + } + this->tasks.erase(task); + } + waits.resize(0); + } + } + } + + void handle_close() + { + //std::unique_lock ul(this->parent->socket_lock); + //this->parent->sockets.erase(dp); + } + + void handle_message(const std::vector& data) + { + std::shared_ptr task; + datapath::error ec = dp->write(task, data); + if (ec == datapath::error::Success) { + std::unique_lock ul(this->task_lock); + tasks.push_back(task); + } + } + + public: + client(server* parent, std::shared_ptr socket) + { + this->parent = parent; + dp = socket; + socket->on_close.add(std::bind(&server::client::handle_close, this)); + socket->on_message.add(std::bind(&server::client::handle_message, this, std::placeholders::_1)); + + this->watcher.shutdown = false; + this->watcher.task = std::thread(std::bind(&server::client::_watcher, this)); + } + + ~client() + { + this->watcher.shutdown = true; + if (this->watcher.task.joinable()) { + this->watcher.task.join(); + } + + dp->close(); + }; + }; + + std::mutex socket_lock; + std::map, std::shared_ptr> sockets; + + private: + void handle_accept(bool& should_accept, std::shared_ptr socket) + { + if (!socket->good()) { + should_accept = false; + return; + } + + { + std::unique_lock ul(this->socket_lock); + this->sockets.insert({socket, std::make_shared(this, socket)}); + } + } + + public: + server(std::string path) + { + datapath::error ec = datapath::host(dp, path, + datapath::permissions::User | datapath::permissions::Group + | datapath::permissions::World); + if (ec != datapath::error::Success) { + throw std::exception(); + } + + dp->on_accept.add( + std::bind(&server::handle_accept, this, std::placeholders::_1, std::placeholders::_2)); + } + + ~server() + { + { + std::unique_lock ul(this->socket_lock); + this->sockets.clear(); + } + if (dp) { + dp->close(); + } + } +}; + +class client { + std::shared_ptr dp; + measurer ms; + std::vector write_buf; + + uint64_t cnt = 0; + uint64_t send_cnt = 0; + bool can_send_msg = true; + + struct { + std::thread task; + bool shutdown; + } watcher; + + private: + void _watcher() + { + std::shared_ptr task; + + while (!this->watcher.shutdown) { + if (send_cnt >= COUNT) { + break; + } + + auto start = std::chrono::high_resolution_clock::now(); + if (!task && send_cnt < COUNT && can_send_msg) { + write_buf.resize(sizeof(uint64_t)); + reinterpret_cast(write_buf[0]) = + std::chrono::high_resolution_clock::now().time_since_epoch().count(); + datapath::error ec = dp->write(task, write_buf); + send_cnt++; + can_send_msg = false; + } + + if (task) { + datapath::error ec = task->wait(std::chrono::milliseconds(0)); + if (ec != datapath::error::Success) { + ec = task->wait(std::chrono::milliseconds(100)); + } + + if (ec == datapath::error::Success) { + //log("Sent Message, error code %lu...", uint32_t(ec)); + task.reset(); + } else if (ec != datapath::error::TimedOut) { + log("Failed, error code %lu...", uint32_t(ec)); + task.reset(); + } + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + } + + void handle_message(const std::vector& data) + { + can_send_msg = true; + // Message is a timestamp of when it was sent. + uint64_t cur = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + uint64_t msg = reinterpret_cast(data[0]); + uint64_t dlt = cur - msg; + + ms.track(std::chrono::nanoseconds(dlt)); + + cnt++; + if (cnt % 1000 == 0) { + log("%llu messages.", cnt); + } + if (cnt >= COUNT) { + // Print stats. + log("Buffer Size: %llu", write_buf.size()); + log("99.9%%ile: %10llu ns", ms.percentile(0.999).count()); + log("99.0%%ile: %10llu ns", ms.percentile(0.99).count()); + log("90.0%%ile: %10llu ns", ms.percentile(0.9).count()); + log("50.0%%ile: %10llu ns", ms.percentile(0.5).count()); + log("10.0%%ile: %10llu ns", ms.percentile(0.1).count()); + log(" 1.0%%ile: %10llu ns", ms.percentile(0.01).count()); + log(" 0.1%%ile: %10llu ns", ms.percentile(0.001).count()); + } + } + + void handle_close() + { + this->watcher.shutdown = true; + } + + void close() + { + this->watcher.shutdown = true; + if (this->watcher.task.joinable()) { + this->watcher.task.join(); + } + } + + public: + client(std::string path) + { + datapath::error ec = datapath::connect(dp, path); + if (ec != datapath::error::Success) { + throw std::exception(); + } + + write_buf.resize(sizeof(uint64_t)); + + this->watcher.shutdown = false; + this->watcher.task = std::thread(std::bind(&client::_watcher, this)); + + dp->on_close.add(std::bind(&client::handle_close, this)); + dp->on_message.add(std::bind(&client::handle_message, this, std::placeholders::_1)); + } + + ~client() + { + close(); + } +}; + +int main(int argc, const char* argv[]) +{ + std::shared_ptr myServer = std::make_shared("single-process-ipc"); + std::shared_ptr myClient = std::make_shared("single-process-ipc"); + + std::cin.get(); + myClient.reset(); + myServer.reset(); + + return 0; +} diff --git a/samples/benchmark/measurer.cpp b/samples/benchmark/measurer.cpp new file mode 100644 index 0000000..81c19ec --- /dev/null +++ b/samples/benchmark/measurer.cpp @@ -0,0 +1,169 @@ +/* +Sample for DataPath +Copyright (C) 2019 Michael Fabian Dirks + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +#include "measurer.hpp" +#include + +measurer::instance::instance(std::shared_ptr parent) + : parent(parent), start(std::chrono::high_resolution_clock::now()) +{} + +measurer::instance::~instance() +{ + auto end = std::chrono::high_resolution_clock::now(); + auto dur = end - this->start; + if (this->parent) { + this->parent->track(dur); + } +} + +void measurer::instance::cancel() +{ + this->parent.reset(); +} + +void measurer::instance::reparent(std::shared_ptr parent) +{ + this->parent = parent; +} + +measurer::measurer() {} + +measurer::~measurer() {} + +std::shared_ptr measurer::track() +{ + return std::make_shared(this->shared_from_this()); +} + +void measurer::track(std::chrono::nanoseconds duration) +{ + std::unique_lock ul(this->lock); + auto itr = timings.find(duration); + if (itr == timings.end()) { + timings.insert({duration, 1}); + } else { + itr->second++; + } +} + +uint64_t measurer::count() +{ + uint64_t count = 0; + + std::map copy_timings; + { + std::unique_lock ul(this->lock); + std::copy(this->timings.begin(), this->timings.end(), std::inserter(copy_timings, copy_timings.end())); + } + + for (auto kv : copy_timings) { + count += kv.second; + } + + return count; +} + +std::chrono::nanoseconds measurer::total_duration() +{ + std::chrono::nanoseconds duration; + + std::map copy_timings; + { + std::unique_lock ul(this->lock); + std::copy(this->timings.begin(), this->timings.end(), std::inserter(copy_timings, copy_timings.end())); + } + + for (auto kv : copy_timings) { + duration += kv.first * kv.second; + } + + return duration; +} + +double_t measurer::average_duration() +{ + std::chrono::nanoseconds duration; + uint64_t count = 0; + + std::map copy_timings; + { + std::unique_lock ul(this->lock); + std::copy(this->timings.begin(), this->timings.end(), std::inserter(copy_timings, copy_timings.end())); + } + + for (auto kv : copy_timings) { + duration += kv.first * kv.second; + count += kv.second; + } + + return double_t(duration.count()) / double_t(count); +} + +template +inline bool is_equal(T a, T b, T c) +{ + return (a == b) || ((a >= (b - c)) && (a <= (b + c))); +} + +std::chrono::nanoseconds measurer::percentile(double_t percentile, bool by_time) +{ + uint64_t calls = count(); + + std::map copy_timings; + { + std::unique_lock ul(this->lock); + std::copy(this->timings.begin(), this->timings.end(), std::inserter(copy_timings, copy_timings.end())); + } + if (by_time) { // Return by time percentile. + // Find largest and smallest time. + std::chrono::nanoseconds smallest = copy_timings.begin()->first; + std::chrono::nanoseconds largest = copy_timings.rbegin()->first; + + std::chrono::nanoseconds variance = largest - smallest; + std::chrono::nanoseconds threshold = + std::chrono::nanoseconds(smallest.count() + int64_t(variance.count() * percentile)); + + for (auto kv : copy_timings) { + double_t kv_pct = double_t((kv.first - smallest).count()) / double_t(variance.count()); + if (is_equal(kv_pct, percentile, 0.00005) || (kv_pct > percentile)) { + return std::chrono::nanoseconds(kv.first); + } + } + } else { // Return by call percentile. + if (percentile == 0.0) { + return copy_timings.begin()->first; + } + + uint64_t accu_calls_now = 0; + for (auto kv : copy_timings) { + uint64_t accu_calls_last = accu_calls_now; + accu_calls_now += kv.second; + + double_t percentile_last = double_t(accu_calls_last) / double_t(calls); + double_t percentile_now = double_t(accu_calls_now) / double_t(calls); + + if (is_equal(percentile, percentile_now, 0.0005) + || ((percentile_last < percentile) && (percentile_now > percentile))) { + return std::chrono::nanoseconds(kv.first); + } + } + } + + return std::chrono::nanoseconds(-1); +} diff --git a/samples/benchmark/measurer.hpp b/samples/benchmark/measurer.hpp new file mode 100644 index 0000000..386b036 --- /dev/null +++ b/samples/benchmark/measurer.hpp @@ -0,0 +1,60 @@ +/* +Sample for DataPath +Copyright (C) 2019 Michael Fabian Dirks + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +#include +#include +#include +#include + +class measurer : std::enable_shared_from_this { + std::map timings; + + std::mutex lock; + + public: + class instance { + std::shared_ptr parent; + std::chrono::high_resolution_clock::time_point start; + + public: + instance(std::shared_ptr parent); + + ~instance(); + + void cancel(); + + void reparent(std::shared_ptr parent); + }; + + public: + measurer(); + + ~measurer(); + + std::shared_ptr track(); + + void track(std::chrono::nanoseconds duration); + + uint64_t count(); + + std::chrono::nanoseconds total_duration(); + + double_t average_duration(); + + std::chrono::nanoseconds percentile(double_t percentile, bool by_time = false); +}; diff --git a/samples/single-process-ipc/.gitignore b/samples/single-process-ipc/.gitignore deleted file mode 100644 index 796b96d..0000000 --- a/samples/single-process-ipc/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/build