diff --git a/.gitignore b/.gitignore index a7de0c6..c7ea5a8 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ __pycache__ /dist /obj .ninja_log +.ninja_deps +/boost_* \ No newline at end of file diff --git a/.idea/customTargets.xml b/.idea/customTargets.xml index a6033b3..861fa49 100644 --- a/.idea/customTargets.xml +++ b/.idea/customTargets.xml @@ -2,7 +2,7 @@ - + diff --git a/.idea/makefile.xml b/.idea/makefile.xml index 6278399..bde9212 100644 --- a/.idea/makefile.xml +++ b/.idea/makefile.xml @@ -4,7 +4,7 @@ - + runtime_lib/main.cpp @@ -13,7 +13,7 @@ - + clean diff --git a/.idea/misc.xml b/.idea/misc.xml index 1caec42..d13647f 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -4,6 +4,9 @@ + + + diff --git a/build.ninja b/build.ninja index a871e52..cddde28 100644 --- a/build.ninja +++ b/build.ninja @@ -1,11 +1,18 @@ -cflags = -Wall -Wextra -g +cflags = -Wall -Wextra -g -Iboost_1_79_0 print_flags = -fdiagnostics-color=always +link_args = -c libraries = -lsodium rule cc - command = g++ $print_flags $cflags $libraries $in -o $out + deps = gcc + depfile = $out.d + command = g++ -MD -MF $out.d $print_flags $link_args $cflags $libraries $in -o $out -build dist/main: cc runtime_lib/main.cpp +build obj/call_frame.o: cc runtime_lib/call_frame.cpp +build obj/global_thread_pool.o: cc runtime_lib/global_thread_pool.cpp + +build dist/main: cc runtime_lib/main.cpp obj/call_frame.o obj/global_thread_pool.o + link_args = diff --git a/runtime_lib/call_frame.cpp b/runtime_lib/call_frame.cpp new file mode 100644 index 0000000..79fb186 --- /dev/null +++ b/runtime_lib/call_frame.cpp @@ -0,0 +1,56 @@ +// +// Created by anton on 6/24/22. +// + +#include "call_frame.h" +#include "helpers.h" + + +namespace pmp::core { + + + // public + + bool call_frame::evaluate() { + std::unique_lock lock(evaluation_lock, std::try_to_lock); + + // check if lock was acquired + if (!lock.owns_lock()) { + return false; + } + // we now hold the lock + + // only run if we are in the initial state + if (state != initial) + return false; + + // set housekeeping variables + state = running; + + try { + result = target_function(arguments, scope); + } catch (std::exception& ex) + { + std::cerr << "Error evaluating " << id << ": " << ex.what() << std::endl; + m_has_error = true; + } + catch (...) + { + std::cerr << "Error evaluating " << id << std::endl; + m_has_error = true; + } + + // children are empty, when the "end" of an execution chain has been reached + // this means that no further nodes were created during the execution + // we don't need to acquire any lock, since no children will ever be added + if (children.empty()) { + state = completed; + parent->mark_completion(); + } else { + // we do have children, so we must wait until they are finished to move on. + state = waiting; + } + + return true; + } +} diff --git a/runtime_lib/call_frame.h b/runtime_lib/call_frame.h new file mode 100644 index 0000000..5333726 --- /dev/null +++ b/runtime_lib/call_frame.h @@ -0,0 +1,88 @@ +// +// Created by anton on 6/24/22. +// + +#ifndef PMP_CALL_FRAME_H +#define PMP_CALL_FRAME_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "helpers.h" + + +namespace pmp::core { + + typedef void *call_frame_target(void * args, void * scope); + + typedef u64 frame_id_t; + + enum call_frame_state { + initial = 0, + running = 1, + waiting = 2, + completed = 3 + }; + + class call_frame { + + public: + call_frame(void *argument, void *scope, call_frame_target *target, frame_id_t id, const std::weak_ptr&parent) : + state(initial), arguments(argument), scope(scope), result(nullptr), + target_function(target), id(id), children(), children_lock(), parent(parent), + evaluation_lock(), m_has_error(false) { + + } + + std::atomic state; + void *arguments; + void *scope; + void* result; + call_frame_target *target_function; + frame_id_t id; + + bool evaluate(); + + private: + + void mark_completion() { + // we can only be notified of child completion when we are waiting... + assert(state == waiting); + + if (state == waiting && has_unfinished_children()) { + state = completed; + if (parent != nullptr) { + parent->mark_completion(); + } + } + } + + bool has_unfinished_children() { + // make sure the list of children is not modified + WITH_LOCK(children_lock); + // return true as soon as any child is not complete + return std::any_of(children.begin(), children.end(), [](auto child) { + return child->state != completed; + }); + + } + + std::list> children; + std::mutex children_lock; + const std::shared_ptr parent; + + // this lock is taken when the function is executed to prevent double-execution + std::mutex evaluation_lock; + std::atomic m_has_error; + + }; +} + +#endif //PMP_CALL_FRAME_H diff --git a/runtime_lib/global_thread_pool.cpp b/runtime_lib/global_thread_pool.cpp new file mode 100644 index 0000000..5458885 --- /dev/null +++ b/runtime_lib/global_thread_pool.cpp @@ -0,0 +1,62 @@ +// +// Created by anton on 6/24/22. +// + + +#include "global_thread_pool.h" + + +namespace pmp::core { + + // public + frame_id_t global_thread_pool::insert_frame(void *argument, void *scope, call_frame_target *target_function, frame_id_t parent) { + assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active)); + + frame_id_t id = m_last_id++; + + + + + return id; + } + + void global_thread_pool::join() { + assert(m_active); + + stop(); + + std::cout << "joining..." << std::endl; + + for (size_t i = 0; i < m_thread_count; i++) { + if (m_threads[i].joinable()) { + m_threads[i].join(); + std::cout << "thread " << i << " joined!" << std::endl; + } else { + std::cout << "thread " << i << " not joinable" << std::endl; + } + } + std::cout << "done!" << std::endl; + + + std::cout << "GlobalThreadPool finished!" << std::endl; + } + + // private + + void global_thread_pool::worker() { + // std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl; + using namespace std::chrono_literals; + + call_frame* current = m_root_frame; + + while (true) { + break; + + + if (!current->evaluate()) { + // someone else was faster + + } + } + } +} \ No newline at end of file diff --git a/runtime_lib/global_thread_pool.h b/runtime_lib/global_thread_pool.h new file mode 100644 index 0000000..4836374 --- /dev/null +++ b/runtime_lib/global_thread_pool.h @@ -0,0 +1,92 @@ +// +// Created by anton on 6/24/22. +// + +#ifndef PMP_GLOBAL_THREAD_POOL_H +#define PMP_GLOBAL_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "call_frame.h" +#include "helpers.h" + +namespace pmp::core { + + class global_thread_pool { + + private: + + call_frame* m_root_frame; + + // frame creation / lookup stuff + std::atomic m_last_id; + std::unordered_map m_frames; + std::mutex m_frames_lock; + + + // thread stuff + std::atomic m_active; + const size_t m_thread_count; + std::vector m_threads; + + boost::lockfree::stack> stack; + + void worker(); + + public: + explicit global_thread_pool(call_frame* root_frame) : m_root_frame(root_frame), m_last_id(0), m_frames(), + m_frames_lock(), m_active(true), m_thread_count(std::thread::hardware_concurrency()), + stack(100) + { + std::cout << "Init GlobalThreadPool" << std::endl; + + m_threads = std::vector(m_thread_count); + + for (size_t i = 0; i < m_thread_count; i++) { + m_threads[i] = std::thread([&] { + worker(); + }); + } + } + + ~global_thread_pool() { + if (m_active) { + std::cout << "GlobalThreadPool destructor called before join!" << std::endl; + join(); + } + + assert(!m_active); + } + + frame_id_t insert_frame(void *argument, void *scope, call_frame_target *target_function, frame_id_t parent); + + void stop() { m_active = false; } + + void join(); + + size_t thread_count() const { return m_thread_count; } + size_t thread_count() { return m_thread_count; } + + + }; + +} + +#endif //PMP_GLOBAL_THREAD_POOL_H diff --git a/runtime_lib/helpers.h b/runtime_lib/helpers.h new file mode 100644 index 0000000..ada5a41 --- /dev/null +++ b/runtime_lib/helpers.h @@ -0,0 +1,24 @@ +// +// Created by anton on 6/24/22. +// + +#ifndef PMP_HELPERS_H +#define PMP_HELPERS_H + +#include + + +#define NOT_REACHED() assert(0) + + +#define WITH_LOCK(lock_name) std::lock_guard lock_name ## _guard(lock_name) + + + +namespace pmp::core { + typedef uint64_t u64; + typedef int64_t i64; + +} + +#endif //PMP_HELPERS_H diff --git a/runtime_lib/main.cpp b/runtime_lib/main.cpp index 133403a..900a57d 100644 --- a/runtime_lib/main.cpp +++ b/runtime_lib/main.cpp @@ -17,137 +17,17 @@ #include #include #include +#include +#include "call_frame.h" +#include "global_thread_pool.h" +#include "helpers.h" -typedef void *call_frame_target(void *, void *); +using namespace pmp::core; -struct call_frame { - call_frame(void *argument, void *scope, call_frame_target *target, int id) : - arguments(argument), scope(scope), - target_function(target), id(id) { - - } - - void *arguments; - void *scope; - call_frame_target *target_function; - int id; -}; - - -class GlobalThreadPool { -public: - GlobalThreadPool() : m_queue(), m_queue_lock(), m_last_id(0) { - std::cout << "Init GlobalThreadPool" << std::endl; - - m_thread_count = std::thread::hardware_concurrency(); - - m_threads = std::vector(m_thread_count); - - for (size_t i = 0; i < m_thread_count; i++) { - m_threads[i] = std::thread([&] { - worker(); - }); - } - } - - ~GlobalThreadPool() { - if (m_active) { - std::cout << "GlobalThreadPool destructor called before join!" << std::endl; - join(); - } - - assert(!m_active); - } - - int insert(void *arguments, void *scope, call_frame_target *target_function) { - assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active)); - - int id = m_last_id++; - m_queue_lock.lock(); - m_queue.emplace(arguments, scope, target_function, id); - m_queue_lock.unlock(); - - return id; - } - - void stop() { - m_active = false; - } - - void join() { - assert(m_active); - - stop(); - - std::cout << "joining..." << std::endl; - - for (size_t i = 0; i < m_thread_count; i++) { - if (m_threads[i].joinable()) { - m_threads[i].join(); - std::cout << "thread " << i << " joined!" << std::endl; - } else { - std::cout << "thread " << i << " not joinable" << std::endl; - } - } - std::cout << "done!" << std::endl; - - - std::cout << "GlobalThreadPool finished!" << std::endl; - } - - size_t thread_count() const { return m_thread_count; } - -private: - - std::queue m_queue; - std::mutex m_queue_lock; - std::atomic m_last_id; - std::vector m_threads; - - std::atomic m_active = true; - - size_t m_thread_count; - - - void worker() { - // std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl; - using namespace std::chrono_literals; - - while (true) { - m_queue_lock.lock(); - - if (m_queue.empty()) { - m_queue_lock.unlock(); - if (!m_active) { - break; - } - std::this_thread::sleep_for(10ms); - continue; - } - - struct call_frame item = m_queue.front(); - m_queue.pop(); - m_queue_lock.unlock(); - - try { - item.target_function(item.arguments, item.scope); - } catch (const std::exception& t) { - std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << t.what() << "\n" - << " in work package #" << item.id << std::endl; - } catch (...) { - std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << "unknown" << "\n" - << " in work package #" << item.id << std::endl; - } - } - } - -}; - - void *task1(void *args, void *scope) { //std::cout << "hello from task 1" << std::endl; @@ -216,16 +96,16 @@ int main() { std::cout << "Hello, World!" << std::endl; - auto pool = GlobalThreadPool(); + auto pool = global_thread_pool(nullptr); std::mutex stdout_lock; std::cout << "Using " << pool.thread_count() << " workers" << std::endl; for (int i = 0; i < 2; i++) { - pool.insert(nullptr, &stdout_lock, task1); + pool.insert_frame(nullptr, &stdout_lock, task1, 0); } - std::cout << pool.insert(nullptr, &stdout_lock, task1) << std::endl; + std::cout << pool.insert_frame(nullptr, &stdout_lock, task1,0) << std::endl; pool.join();