You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
5.5 KiB
C++

//
// Created by anton on 6/22/22.
//
#include <iostream>
#include <cassert>
#include <functional>
#include <unordered_map>
#include <stack>
#include <queue>
#include <memory>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <atomic>
#include <random>
#include <sodium.h>
#include <unordered_set>
#include <map>
typedef void *call_frame_target(void *, void *);
struct call_frame {
call_frame(void *argument, void *scope, call_frame_target *target, int id) :
arguments(argument), scope(scope),
target_function(target), id(id) {
}
void *arguments;
void *scope;
call_frame_target *target_function;
int id;
};
class GlobalThreadPool {
public:
GlobalThreadPool() : m_queue(), m_queue_lock(), m_last_id(0) {
std::cout << "Init GlobalThreadPool" << std::endl;
m_thread_count = std::thread::hardware_concurrency();
m_threads = std::vector<std::thread>(m_thread_count);
for (size_t i = 0; i < m_thread_count; i++) {
m_threads[i] = std::thread([&] {
worker();
});
}
}
~GlobalThreadPool() {
if (m_active) {
std::cout << "GlobalThreadPool destructor called before join!" << std::endl;
join();
}
assert(!m_active);
}
int insert(void *arguments, void *scope, call_frame_target *target_function) {
assert(("ThreadPool must be active when inserting! stop() or join() were called too early!", m_active));
int id = m_last_id++;
m_queue_lock.lock();
m_queue.emplace(arguments, scope, target_function, id);
m_queue_lock.unlock();
return id;
}
void stop() {
m_active = false;
}
void join() {
assert(m_active);
stop();
std::cout << "joining..." << std::endl;
for (size_t i = 0; i < m_thread_count; i++) {
if (m_threads[i].joinable()) {
m_threads[i].join();
std::cout << "thread " << i << " joined!" << std::endl;
} else {
std::cout << "thread " << i << " not joinable" << std::endl;
}
}
std::cout << "done!" << std::endl;
std::cout << "GlobalThreadPool finished!" << std::endl;
}
size_t thread_count() const { return m_thread_count; }
private:
std::queue<struct call_frame> m_queue;
std::mutex m_queue_lock;
std::atomic<int> m_last_id;
std::vector<std::thread> m_threads;
std::atomic<bool> m_active = true;
size_t m_thread_count;
void worker() {
// std::cout << "Hello from worker #" << std::this_thread::get_id() << std::endl;
using namespace std::chrono_literals;
while (true) {
m_queue_lock.lock();
if (m_queue.empty()) {
m_queue_lock.unlock();
if (!m_active) {
break;
}
std::this_thread::sleep_for(10ms);
continue;
}
struct call_frame item = m_queue.front();
m_queue.pop();
m_queue_lock.unlock();
try {
item.target_function(item.arguments, item.scope);
} catch (const std::exception& t) {
std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << t.what() << "\n"
<< " in work package #" << item.id << std::endl;
} catch (...) {
std::cerr << "Error in thread " << std::this_thread::get_id() << ": " << "unknown" << "\n"
<< " in work package #" << item.id << std::endl;
}
}
}
};
void *task1(void *args, void *scope) {
//std::cout << "hello from task 1" << std::endl;
assert(("Scope can't be nullptr", scope != nullptr));
auto *print_lock = reinterpret_cast<std::mutex *>(scope);
using u64 = unsigned long long int;
u64 word = 0;
u64 len = 0;
long long int seed = 0;
std::random_device dev;
// FIXME: random_device entropy seems to be stuck at 0.
// currently falling back to libsodium, but that's not sustainable.
if (dev.entropy() == 0) {
randombytes_buf(&seed, sizeof(long long int));
} else {
seed = dev();
}
std::mt19937 rng(seed);
std::uniform_int_distribution<std::mt19937::result_type> next_number(0, 26);
std::map<u64, u64> lengths;
//std::unordered_map<u64, u64> words;
for (u64 i = 0; i < 10000000; i++) {
unsigned long num = next_number(rng);
if (num == 0) {
lengths[len]++;
//words[word]++;
word = 0;
len = 0;
continue;
}
word *= 26;
word += num;
len++;
}
print_lock->lock();
std::cout << "Word lengths:\n";
for (auto& pair : lengths) {
if (pair.first > 20) {
break;
}
std::cout << pair.first << ": " << pair.second << "\n";
}
std::cout << "Most used words:\n";
print_lock->unlock();
return nullptr;
}
int main() {
sodium_init();
std::cout << "Hello, World!" << std::endl;
auto pool = GlobalThreadPool();
std::mutex stdout_lock;
std::cout << "Using " << pool.thread_count() << " workers" << std::endl;
for (int i = 0; i < 2; i++) {
pool.insert(nullptr, &stdout_lock, task1);
}
std::cout << pool.insert(nullptr, &stdout_lock, task1) << std::endl;
pool.join();
return 0;
}