diff --git a/.gitignore b/.gitignore index 7f93ebf..a7de0c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ -venv +/venv __pycache__ +/dist +/obj +.ninja_log diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/customTargets.xml b/.idea/customTargets.xml new file mode 100644 index 0000000..a6033b3 --- /dev/null +++ b/.idea/customTargets.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/makefile.xml b/.idea/makefile.xml new file mode 100644 index 0000000..6278399 --- /dev/null +++ b/.idea/makefile.xml @@ -0,0 +1,25 @@ + + + + + + + + + runtime_lib/main.cpp + + + + + + + + + clean + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..1caec42 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Makefile b/Makefile deleted file mode 100644 index e69de29..0000000 diff --git a/build.ninja b/build.ninja new file mode 100644 index 0000000..a871e52 --- /dev/null +++ b/build.ninja @@ -0,0 +1,11 @@ +cflags = -Wall -Wextra -g +print_flags = -fdiagnostics-color=always + +libraries = -lsodium + +rule cc + command = g++ $print_flags $cflags $libraries $in -o $out + +build dist/main: cc runtime_lib/main.cpp + + diff --git a/runtime_lib/ProgramQueue.h b/runtime_lib/ProgramQueue.h index 3f59c93..6a2a319 100644 --- a/runtime_lib/ProgramQueue.h +++ b/runtime_lib/ProgramQueue.h @@ -1,2 +1,117 @@ #pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +using call_frame_target = void*call_frame_target(void*, void*); + +struct packaged_call_frame { + void* arguments; +} + +class package_of_work { + public: + + package_of_work(int kind, call_frame_target target, void* scope_) : + kind_id(kind), work_factor(1), target_function(target), scope(scope_), finalized(false) + { + queue = std::queue() + } + + int kind_id; + float work_factor; + std::queue queue; + + call_frame_target target_function; + void* scope; + + bool finalized; +} + +struct call_frame { + void* arguments; + void* scope; + call_frame_target target_function; + int id; +} + +struct call_result { + int nothing; +} + + +class GlobalThreadPool { + public: + GlobalThreadPool(); + + int create_package(void* scope, call_frame_target target_function); + void submit_to_package(int kind_id, void* arguments); + struct call_result join_package(int kind_id); + + + private: + std::unordered_map m_work_packages; + std::stack m_work_package_stack; + std::mutex m_work_packages_lock; + + std::queue m_calls; + std::mutex m_calls_lock; + + std::unordered_map m_results; + std::mutex m_results_lock; + + volatile int m_last_id; + std::mutex m_id_lock; + + std::vector m_trheads; + + void thread_worker(); +} + +void GlobalThreadPool::thread_worker() { + using namespace std::chrono_literals; + package_of_work* current = NULL; + + while (true) { + if (m_work_package_stack.size > 0) { + // grab a reference to the first queue + current = m_work_package_stack.top; + } + + // if not finalized, try again + if (!current->finalized && current->queue.size == 0) { + std::this_thread::sleep_for(100ms); + continue + } + + + + } + +} + +int GlobalThreadPool::create_package(void* scope, call_frame_target target_function) +{ + this->id_lock.lock() + int kind_id = this->last_id++; + this->id_lock.unlock() + + this->work_packages_lock.lock() + auto package = package_of_work(kind_id, target_function, scope); + this->work_packages[kind_id] = package; + this->work_package_stack.push(package); + this->work_packages_lock.unlock() + + return kind_id; +} + + diff --git a/runtime_lib/main.cpp b/runtime_lib/main.cpp new file mode 100644 index 0000000..133403a --- /dev/null +++ b/runtime_lib/main.cpp @@ -0,0 +1,240 @@ +// +// Created by anton on 6/22/22. +// +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +typedef void *call_frame_target(void *, void *); + + + + +struct call_frame { + call_frame(void *argument, void *scope, call_frame_target *target, int id) : + arguments(argument), scope(scope), + target_function(target), id(id) { + + } + + void *arguments; + void *scope; + call_frame_target *target_function; + int id; +}; + + +class GlobalThreadPool { +public: + GlobalThreadPool() : m_queue(), m_queue_lock(), m_last_id(0) { + std::cout << "Init GlobalThreadPool" << std::endl; + + m_thread_count = std::thread::hardware_concurrency(); + + m_threads = std::vector(m_thread_count); + + for (size_t i = 0; i < m_thread_count; i++) { + m_threads[i] = std::thread([&] { + worker(); + }); + } + } + + ~GlobalThreadPool() { + if (m_active) { + std::cout << "GlobalThreadPool destructor called before join!" << std::endl; + join(); + } + + assert(!m_active); + } + + int insert(void *arguments, void *scope, call_frame_target *target_function) { + assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active)); + + int id = m_last_id++; + m_queue_lock.lock(); + m_queue.emplace(arguments, scope, target_function, id); + m_queue_lock.unlock(); + + return id; + } + + void stop() { + m_active = false; + } + + void join() { + assert(m_active); + + stop(); + + std::cout << "joining..." << std::endl; + + for (size_t i = 0; i < m_thread_count; i++) { + if (m_threads[i].joinable()) { + m_threads[i].join(); + std::cout << "thread " << i << " joined!" << std::endl; + } else { + std::cout << "thread " << i << " not joinable" << std::endl; + } + } + std::cout << "done!" << std::endl; + + + std::cout << "GlobalThreadPool finished!" << std::endl; + } + + size_t thread_count() const { return m_thread_count; } + +private: + + std::queue m_queue; + std::mutex m_queue_lock; + std::atomic m_last_id; + std::vector m_threads; + + std::atomic m_active = true; + + size_t m_thread_count; + + + void worker() { + // std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl; + using namespace std::chrono_literals; + + while (true) { + m_queue_lock.lock(); + + if (m_queue.empty()) { + m_queue_lock.unlock(); + if (!m_active) { + break; + } + std::this_thread::sleep_for(10ms); + continue; + } + + struct call_frame item = m_queue.front(); + m_queue.pop(); + m_queue_lock.unlock(); + + try { + item.target_function(item.arguments, item.scope); + } catch (const std::exception& t) { + std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << t.what() << "\n" + << " in work package #" << item.id << std::endl; + } catch (...) { + std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << "unknown" << "\n" + << " in work package #" << item.id << std::endl; + } + } + } + +}; + + +void *task1(void *args, void *scope) { + //std::cout << "hello from task 1" << std::endl; + + assert(("Scope can't be nullptr", scope != nullptr)); + + auto *print_lock = reinterpret_cast(scope); + + using u64 = unsigned long long int; + + u64 word = 0; + u64 len = 0; + + long long int seed = 0; + + std::random_device dev; + + // FIXME: random_device entropy seems to be stuck at 0. + // currently falling back to libsodium, but that's not sustainable. + if (dev.entropy() == 0) { + randombytes_buf(&seed, sizeof(long long int)); + } else { + seed = dev(); + } + + std::mt19937 rng(seed); + std::uniform_int_distribution next_number(0, 26); + + std::map lengths; + //std::unordered_map words; + + for (u64 i = 0; i < 10000000; i++) { + unsigned long num = next_number(rng); + + if (num == 0) { + lengths[len]++; + //words[word]++; + word = 0; + len = 0; + continue; + } + + word *= 26; + word += num; + len++; + } + + print_lock->lock(); + + std::cout << "Word lengths:\n"; + for (auto& pair : lengths) { + if (pair.first > 20) { + break; + } + std::cout << pair.first << ": " << pair.second << "\n"; + } + + std::cout << "Most used words:\n"; + + print_lock->unlock(); + + return nullptr; +} + +int main() { + sodium_init(); + + std::cout << "Hello, World!" << std::endl; + + auto pool = GlobalThreadPool(); + std::mutex stdout_lock; + + std::cout << "Using " << pool.thread_count() << " workers" << std::endl; + + for (int i = 0; i < 2; i++) { + pool.insert(nullptr, &stdout_lock, task1); + } + + std::cout << pool.insert(nullptr, &stdout_lock, task1) << std::endl; + + pool.join(); + + return 0; +} + + + + + + +