17 #ifndef CPP_MATE_THREAD_POOL_HPP
18 #define CPP_MATE_THREAD_POOL_HPP
20 #include <CppMate/Checkers.hpp>
21 #include <CppMate/SharedState.hpp>
35 template<
typename R =
int>
44 explicit ThreadPool(
unsigned maxThreads = 0): _state(maxThreads, [this] {
return std::thread(&ThreadPool::worker,
this); }) {}
56 for (
auto& worker: _state.modify().template notifyAll<std::vector<std::thread>>([](
auto& state) { return state.finish(); })) {
57 if (worker.joinable()) { worker.join(); }
65 unsigned getMaxThreads()
const {
return _state.template view<unsigned>([](
auto& state) {
return state.getMaxThreads(); }); }
71 unsigned getWorkersCount()
const {
return _state.template view<unsigned>([](
auto& state) {
return state.getWorkersCount(); }); }
77 unsigned getPendingTasksCount()
const {
return _state.template view<unsigned>([](
auto& state) {
return state.getPendingTasksCount(); }); }
89 unsigned getWaitingCount()
const {
return _state.template view<unsigned>([](
auto& state) {
return state.getWaitingCount(); }); }
98 template<
typename F,
typename... A>
99 std::future<R>
addTask(F&& task, A&&... args) {
100 return _state.modify().template notifyOne<std::future<R>>([&](
auto& state) {
101 return state.addTask(std::bind(std::forward<F>(task), std::forward<A>(args)...));
110 auto isWorking =
true;
112 auto task = _state.modify().when([&isWorking](
auto& state) {
113 const auto wakeupIf = state.isFinished() || state.hasTasks();
114 if (!wakeupIf && isWorking) { isWorking =
false; state.waitingBegin(); }
117 }).
template extract<std::packaged_task<R()>>([&isWorking, &exit](
auto& state) {
118 exit = state.isFinished();
119 if (!exit && !isWorking) {
121 state.waitingFinish();
123 return state.takeTask();
125 if (!exit) { task(); }
129 SharedState<State> _state;
139 _createWorker(std::move(createWorker)),
140 _workers(detectMaxThreads(maxThreads)),
159 template<
typename F,
typename... A>
160 inline std::future<R>
addTask(F&& task, A&&... args) {
161 checkLogic(!isFinished());
162 _tasks.emplace(std::bind(std::forward<F>(task), std::forward<A>(args)...));
164 return _tasks.back().get_future();
172 if (_waitingCount > 0) { _waitingCount--; }
176 if (_tasks.empty()) {
return std::packaged_task<R()>(); }
177 auto task = std::move(_tasks.front());
185 std::vector<std::thread> result(_workersCount);
186 for (
auto i = 0u; i < _workersCount; i++) {
187 result[i] = std::move(_workers[i]);
193 void reviewWorkers() {
194 if (_workersCount < _workers.size() && !_waitingCount) { _workers[_workersCount++] = _createWorker(); }
198 std::queue<std::packaged_task<R()>> empty;
199 std::swap(_tasks, empty);
202 static unsigned defaultMaxThreads() {
203 constexpr
auto DefaultMaxThreads = 4u;
204 const auto systemCpuCount = std::thread::hardware_concurrency();
205 return systemCpuCount ? systemCpuCount : DefaultMaxThreads;
208 static unsigned detectMaxThreads(
unsigned desiredMaxThreads) {
209 return desiredMaxThreads > 0 ? desiredMaxThreads : defaultMaxThreads();
212 std::function<std::thread()> _createWorker;
213 std::vector<std::thread> _workers;
214 std::queue<std::packaged_task<R()>> _tasks;
215 unsigned _workersCount;
216 unsigned _waitingCount;
223 template<
typename R,
typename... A>
250 template<
typename R,
typename... A>
261 return _pool->addTask(std::forward<decltype(task)>(task), std::forward<A>(args)...);
280 std::shared_ptr<ThreadPool<R>>
getPool()
const {
return _pool; }
288 return std::make_unique<PipelineThreadPool>(maxThreads);
297 return std::make_unique<PipelineThreadPool>(pool);
301 std::shared_ptr<ThreadPool<R>> _pool;
Represents pipeline implemetation with ThreadPool backend.
Definition: ThreadPool.hpp:252
PipelineThreadPool & operator=(const PipelineThreadPool &other)=default
std::shared_ptr< ThreadPool< R > > getPool() const
getPool
Definition: ThreadPool.hpp:280
static std::unique_ptr< Pipeline< R, A... > > create(std::shared_ptr< ThreadPool< R >> pool)
Creates pipeline instance.
Definition: ThreadPool.hpp:296
static std::unique_ptr< Pipeline< R, A... > > create(unsigned maxThreads=0)
Creates pipeline instance.
Definition: ThreadPool.hpp:287
std::future< R > addTask(typename Pipeline< R, A... >::Signature task, A &&... args) override
Definition: ThreadPool.hpp:260
PipelineThreadPool(unsigned maxThreads=0)
Constructor.
Definition: ThreadPool.hpp:268
PipelineThreadPool(const PipelineThreadPool &other)=default
PipelineThreadPool(std::shared_ptr< ThreadPool< R >> pool)
Constructor.
Definition: ThreadPool.hpp:274
PipelineThreadPool(PipelineThreadPool &&other)=default
PipelineThreadPool & operator=(PipelineThreadPool &&other)=default
Represents abstract pipeline.
Definition: ThreadPool.hpp:225
virtual std::future< R > addTask(Signature task, A &&... args)=0
Adds a new task for execution.
virtual ~Pipeline()=default
std::function< R(A...)> Signature
Definition: ThreadPool.hpp:231
Definition: ThreadPool.hpp:134
bool hasTasks() const
Definition: ThreadPool.hpp:153
std::future< R > addTask(F &&task, A &&... args)
Definition: ThreadPool.hpp:160
std::packaged_task< R()> takeTask()
Definition: ThreadPool.hpp:175
State(unsigned maxThreads, CreateWorker createWorker)
Definition: ThreadPool.hpp:138
unsigned getPendingTasksCount() const
Definition: ThreadPool.hpp:151
void waitingBegin()
Definition: ThreadPool.hpp:167
unsigned getMaxThreads() const
Definition: ThreadPool.hpp:147
std::vector< std::thread > finish()
Definition: ThreadPool.hpp:182
unsigned getWaitingCount() const
Definition: ThreadPool.hpp:155
void waitingFinish()
Definition: ThreadPool.hpp:171
std::function< std::thread()> CreateWorker
Definition: ThreadPool.hpp:136
unsigned getWorkersCount() const
Definition: ThreadPool.hpp:149
bool isFinished() const
Definition: ThreadPool.hpp:157
Represents thread pool.
Definition: ThreadPool.hpp:37
ThreadPool & operator=(const ThreadPool &other)=delete
ThreadPool & operator=(ThreadPool &&other)=default
ThreadPool(const ThreadPool &other)=delete
std::future< R > addTask(F &&task, A &&... args)
Adds a new task for execution.
Definition: ThreadPool.hpp:99
ThreadPool(ThreadPool &&other)=default
unsigned getPendingTasksCount() const
Returns number of pending tasks at the moment.
Definition: ThreadPool.hpp:77
unsigned getWorkersCount() const
Returns the number of active workers in the pool at the moment.
Definition: ThreadPool.hpp:71
ThreadPool(unsigned maxThreads=0)
Constructor.
Definition: ThreadPool.hpp:44
unsigned getMaxThreads() const
Returns the maximum possible number of threads in the pool.
Definition: ThreadPool.hpp:65
bool hasPendingTasks() const
Indicates that pool has at least one pending task at the moment.
Definition: ThreadPool.hpp:83
unsigned getWaitingCount() const
Returns the number of workers is waiting for a job.
Definition: ThreadPool.hpp:89
~ThreadPool()
Definition: ThreadPool.hpp:55
Definition: BinaryData.hpp:28