trying out ninja, branching out into multiple files

master
Anton Lydike 2 years ago
parent b404b2f567
commit 17c49a8b8d

2
.gitignore vendored

@ -3,3 +3,5 @@ __pycache__
/dist
/obj
.ninja_log
.ninja_deps
/boost_*

@ -2,7 +2,7 @@
<project version="4">
<component name="CLionExternalBuildManager">
<target id="5c1cb7ec-1aa4-4f6f-97c1-504abfc76bd3" name="main" defaultType="MAKE">
<configuration id="ea6404e0-1feb-4070-b1c1-4295409dace7" name="main">
<configuration id="ea6404e0-1feb-4070-b1c1-4295409dace7" name="main" toolchainName="Default">
<build type="MAKE">
<make targetName="runtime_lib/main.cpp" />
</build>

@ -4,7 +4,7 @@
<user-build-targets>
<build-target name="runtime_lib/main.cpp">
<build-configurations>
<build-configuration>
<build-configuration toolchainName="Default">
<make-targets>
<make-target>runtime_lib/main.cpp</make-target>
</make-targets>
@ -13,7 +13,7 @@
</build-target>
<build-target name="clean">
<build-configurations>
<build-configuration>
<build-configuration toolchainName="Default">
<make-targets>
<make-target>clean</make-target>
</make-targets>

@ -4,6 +4,9 @@
<sourceRoots>
<file path="$PROJECT_DIR$/runtime_lib" />
</sourceRoots>
<libraryRoots>
<file path="$PROJECT_DIR$/boost_1_79_0" />
</libraryRoots>
<excludeRoots>
<file path="$PROJECT_DIR$/dist" />
<file path="$PROJECT_DIR$/obj" />

@ -1,11 +1,18 @@
cflags = -Wall -Wextra -g
cflags = -Wall -Wextra -g -Iboost_1_79_0
print_flags = -fdiagnostics-color=always
link_args = -c
libraries = -lsodium
rule cc
command = g++ $print_flags $cflags $libraries $in -o $out
deps = gcc
depfile = $out.d
command = g++ -MD -MF $out.d $print_flags $link_args $cflags $libraries $in -o $out
build dist/main: cc runtime_lib/main.cpp
build obj/call_frame.o: cc runtime_lib/call_frame.cpp
build obj/global_thread_pool.o: cc runtime_lib/global_thread_pool.cpp
build dist/main: cc runtime_lib/main.cpp obj/call_frame.o obj/global_thread_pool.o
link_args =

@ -0,0 +1,56 @@
//
// Created by anton on 6/24/22.
//
#include "call_frame.h"
#include "helpers.h"
namespace pmp::core {
// public
bool call_frame::evaluate() {
std::unique_lock<std::mutex> lock(evaluation_lock, std::try_to_lock);
// check if lock was acquired
if (!lock.owns_lock()) {
return false;
}
// we now hold the lock
// only run if we are in the initial state
if (state != initial)
return false;
// set housekeeping variables
state = running;
try {
result = target_function(arguments, scope);
} catch (std::exception& ex)
{
std::cerr << "Error evaluating " << id << ": " << ex.what() << std::endl;
m_has_error = true;
}
catch (...)
{
std::cerr << "Error evaluating " << id << std::endl;
m_has_error = true;
}
// children are empty, when the "end" of an execution chain has been reached
// this means that no further nodes were created during the execution
// we don't need to acquire any lock, since no children will ever be added
if (children.empty()) {
state = completed;
parent->mark_completion();
} else {
// we do have children, so we must wait until they are finished to move on.
state = waiting;
}
return true;
}
}

@ -0,0 +1,88 @@
//
// Created by anton on 6/24/22.
//
#ifndef PMP_CALL_FRAME_H
#define PMP_CALL_FRAME_H
#include <iostream>
#include <cassert>
#include <atomic>
#include <utility>
#include <vector>
#include <mutex>
#include <algorithm>
#include <memory>
#include <list>
#include "helpers.h"
namespace pmp::core {
typedef void *call_frame_target(void * args, void * scope);
typedef u64 frame_id_t;
enum call_frame_state {
initial = 0,
running = 1,
waiting = 2,
completed = 3
};
class call_frame {
public:
call_frame(void *argument, void *scope, call_frame_target *target, frame_id_t id, const std::weak_ptr<call_frame>&parent) :
state(initial), arguments(argument), scope(scope), result(nullptr),
target_function(target), id(id), children(), children_lock(), parent(parent),
evaluation_lock(), m_has_error(false) {
}
std::atomic<call_frame_state> state;
void *arguments;
void *scope;
void* result;
call_frame_target *target_function;
frame_id_t id;
bool evaluate();
private:
void mark_completion() {
// we can only be notified of child completion when we are waiting...
assert(state == waiting);
if (state == waiting && has_unfinished_children()) {
state = completed;
if (parent != nullptr) {
parent->mark_completion();
}
}
}
bool has_unfinished_children() {
// make sure the list of children is not modified
WITH_LOCK(children_lock);
// return true as soon as any child is not complete
return std::any_of(children.begin(), children.end(), [](auto child) {
return child->state != completed;
});
}
std::list<std::shared_ptr<call_frame>> children;
std::mutex children_lock;
const std::shared_ptr<call_frame> parent;
// this lock is taken when the function is executed to prevent double-execution
std::mutex evaluation_lock;
std::atomic<bool> m_has_error;
};
}
#endif //PMP_CALL_FRAME_H

@ -0,0 +1,62 @@
//
// Created by anton on 6/24/22.
//
#include "global_thread_pool.h"
namespace pmp::core {
// public
frame_id_t global_thread_pool::insert_frame(void *argument, void *scope, call_frame_target *target_function, frame_id_t parent) {
assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active));
frame_id_t id = m_last_id++;
return id;
}
void global_thread_pool::join() {
assert(m_active);
stop();
std::cout << "joining..." << std::endl;
for (size_t i = 0; i < m_thread_count; i++) {
if (m_threads[i].joinable()) {
m_threads[i].join();
std::cout << "thread " << i << " joined!" << std::endl;
} else {
std::cout << "thread " << i << " not joinable" << std::endl;
}
}
std::cout << "done!" << std::endl;
std::cout << "GlobalThreadPool finished!" << std::endl;
}
// private
void global_thread_pool::worker() {
// std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl;
using namespace std::chrono_literals;
call_frame* current = m_root_frame;
while (true) {
break;
if (!current->evaluate()) {
// someone else was faster
}
}
}
}

@ -0,0 +1,92 @@
//
// Created by anton on 6/24/22.
//
#ifndef PMP_GLOBAL_THREAD_POOL_H
#define PMP_GLOBAL_THREAD_POOL_H
#include <iostream>
#include <cassert>
#include <functional>
#include <unordered_map>
#include <stack>
#include <queue>
#include <memory>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <atomic>
#include <random>
#include <sodium.h>
#include <unordered_set>
#include <map>
#include <optional>
#include <boost/lockfree/stack.hpp>
#include "call_frame.h"
#include "helpers.h"
namespace pmp::core {
class global_thread_pool {
private:
call_frame* m_root_frame;
// frame creation / lookup stuff
std::atomic<frame_id_t> m_last_id;
std::unordered_map<int, call_frame*> m_frames;
std::mutex m_frames_lock;
// thread stuff
std::atomic<bool> m_active;
const size_t m_thread_count;
std::vector<std::thread> m_threads;
boost::lockfree::stack<std::shared_ptr<call_frame>> stack;
void worker();
public:
explicit global_thread_pool(call_frame* root_frame) : m_root_frame(root_frame), m_last_id(0), m_frames(),
m_frames_lock(), m_active(true), m_thread_count(std::thread::hardware_concurrency()),
stack(100)
{
std::cout << "Init GlobalThreadPool" << std::endl;
m_threads = std::vector<std::thread>(m_thread_count);
for (size_t i = 0; i < m_thread_count; i++) {
m_threads[i] = std::thread([&] {
worker();
});
}
}
~global_thread_pool() {
if (m_active) {
std::cout << "GlobalThreadPool destructor called before join!" << std::endl;
join();
}
assert(!m_active);
}
frame_id_t insert_frame(void *argument, void *scope, call_frame_target *target_function, frame_id_t parent);
void stop() { m_active = false; }
void join();
size_t thread_count() const { return m_thread_count; }
size_t thread_count() { return m_thread_count; }
};
}
#endif //PMP_GLOBAL_THREAD_POOL_H

@ -0,0 +1,24 @@
//
// Created by anton on 6/24/22.
//
#ifndef PMP_HELPERS_H
#define PMP_HELPERS_H
#include <mutex>
#define NOT_REACHED() assert(0)
#define WITH_LOCK(lock_name) std::lock_guard<std::mutex> lock_name ## _guard(lock_name)
namespace pmp::core {
typedef uint64_t u64;
typedef int64_t i64;
}
#endif //PMP_HELPERS_H

@ -17,137 +17,17 @@
#include <sodium.h>
#include <unordered_set>
#include <map>
#include <optional>
#include "call_frame.h"
#include "global_thread_pool.h"
#include "helpers.h"
typedef void *call_frame_target(void *, void *);
using namespace pmp::core;
struct call_frame {
call_frame(void *argument, void *scope, call_frame_target *target, int id) :
arguments(argument), scope(scope),
target_function(target), id(id) {
}
void *arguments;
void *scope;
call_frame_target *target_function;
int id;
};
class GlobalThreadPool {
public:
GlobalThreadPool() : m_queue(), m_queue_lock(), m_last_id(0) {
std::cout << "Init GlobalThreadPool" << std::endl;
m_thread_count = std::thread::hardware_concurrency();
m_threads = std::vector<std::thread>(m_thread_count);
for (size_t i = 0; i < m_thread_count; i++) {
m_threads[i] = std::thread([&] {
worker();
});
}
}
~GlobalThreadPool() {
if (m_active) {
std::cout << "GlobalThreadPool destructor called before join!" << std::endl;
join();
}
assert(!m_active);
}
int insert(void *arguments, void *scope, call_frame_target *target_function) {
assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active));
int id = m_last_id++;
m_queue_lock.lock();
m_queue.emplace(arguments, scope, target_function, id);
m_queue_lock.unlock();
return id;
}
void stop() {
m_active = false;
}
void join() {
assert(m_active);
stop();
std::cout << "joining..." << std::endl;
for (size_t i = 0; i < m_thread_count; i++) {
if (m_threads[i].joinable()) {
m_threads[i].join();
std::cout << "thread " << i << " joined!" << std::endl;
} else {
std::cout << "thread " << i << " not joinable" << std::endl;
}
}
std::cout << "done!" << std::endl;
std::cout << "GlobalThreadPool finished!" << std::endl;
}
size_t thread_count() const { return m_thread_count; }
private:
std::queue<struct call_frame> m_queue;
std::mutex m_queue_lock;
std::atomic<int> m_last_id;
std::vector<std::thread> m_threads;
std::atomic<bool> m_active = true;
size_t m_thread_count;
void worker() {
// std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl;
using namespace std::chrono_literals;
while (true) {
m_queue_lock.lock();
if (m_queue.empty()) {
m_queue_lock.unlock();
if (!m_active) {
break;
}
std::this_thread::sleep_for(10ms);
continue;
}
struct call_frame item = m_queue.front();
m_queue.pop();
m_queue_lock.unlock();
try {
item.target_function(item.arguments, item.scope);
} catch (const std::exception& t) {
std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << t.what() << "\n"
<< " in work package #" << item.id << std::endl;
} catch (...) {
std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << "unknown" << "\n"
<< " in work package #" << item.id << std::endl;
}
}
}
};
void *task1(void *args, void *scope) {
//std::cout << "hello from task 1" << std::endl;
@ -216,16 +96,16 @@ int main() {
std::cout << "Hello, World!" << std::endl;
auto pool = GlobalThreadPool();
auto pool = global_thread_pool(nullptr);
std::mutex stdout_lock;
std::cout << "Using " << pool.thread_count() << " workers" << std::endl;
for (int i = 0; i < 2; i++) {
pool.insert(nullptr, &stdout_lock, task1);
pool.insert_frame(nullptr, &stdout_lock, task1, 0);
}
std::cout << pool.insert(nullptr, &stdout_lock, task1) << std::endl;
std::cout << pool.insert_frame(nullptr, &stdout_lock, task1,0) << std::endl;
pool.join();

Loading…
Cancel
Save