cpp-mate  0.7
Helpful library for C++.
ThreadPool.hpp
1 /*
2  * Copyright (C) 2021 Alexander Kornilov (akornilov.82@gmail.com)
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef CPP_MATE_THREAD_POOL_HPP
18 #define CPP_MATE_THREAD_POOL_HPP
19 
20 #include <CppMate/Checkers.hpp>
21 #include <CppMate/SharedState.hpp>
22 
23 #include <memory>
24 #include <functional>
25 #include <future>
26 #include <vector>
27 #include <queue>
28 #include <thread>
29 
30 namespace CppMate {
31 
35 template<typename R = int>
36 class ThreadPool final
37 {
38 public:
39 
44  explicit ThreadPool(unsigned maxThreads = 0): _state(maxThreads, [this] { return std::thread(&ThreadPool::worker, this); }) {}
45 
46  ThreadPool(const ThreadPool& other) = delete;
47  ThreadPool(ThreadPool&& other) = default;
48 
49  ThreadPool& operator=(const ThreadPool& other) = delete;
50  ThreadPool& operator=(ThreadPool&& other) = default;
51 
56  for (auto& worker: _state.modify().template notifyAll<std::vector<std::thread>>([](auto& state) { return state.finish(); })) {
57  if (worker.joinable()) { worker.join(); }
58  }
59  }
60 
65  unsigned getMaxThreads() const { return _state.template view<unsigned>([](auto& state) { return state.getMaxThreads(); }); }
66 
71  unsigned getWorkersCount() const { return _state.template view<unsigned>([](auto& state) { return state.getWorkersCount(); }); }
72 
77  unsigned getPendingTasksCount() const { return _state.template view<unsigned>([](auto& state) { return state.getPendingTasksCount(); }); }
78 
83  bool hasPendingTasks() const { return getPendingTasksCount() > 0; }
84 
89  unsigned getWaitingCount() const { return _state.template view<unsigned>([](auto& state) { return state.getWaitingCount(); }); }
90 
98  template<typename F, typename... A>
99  std::future<R> addTask(F&& task, A&&... args) {
100  return _state.modify().template notifyOne<std::future<R>>([&](auto& state) {
101  return state.addTask(std::bind(std::forward<F>(task), std::forward<A>(args)...));
102  });
103  }
104 
105 private:
106  class State;
107 
108  void worker() {
109  auto exit = false;
110  auto isWorking = true;
111  while(!exit) {
112  auto task = _state.modify().when([&isWorking](auto& state) {
113  const auto wakeupIf = state.isFinished() || state.hasTasks();
114  if (!wakeupIf && isWorking) { isWorking = false; state.waitingBegin(); }
115  return wakeupIf;
116 
117  }).template extract<std::packaged_task<R()>>([&isWorking, &exit](auto& state) {
118  exit = state.isFinished();
119  if (!exit && !isWorking) {
120  isWorking = true;
121  state.waitingFinish();
122  }
123  return state.takeTask();
124  });
125  if (!exit) { task(); }
126  }
127  }
128 
129  SharedState<State> _state;
130 };
131 
132 template<typename R>
133 class ThreadPool<R>::State final
134 {
135 public:
136  using CreateWorker = std::function<std::thread()>;
137 
138  State(unsigned maxThreads, CreateWorker createWorker):
139  _createWorker(std::move(createWorker)),
140  _workers(detectMaxThreads(maxThreads)),
141  _workersCount(0),
142  _waitingCount(0),
143  _exit(false)
144  {
145  }
146 
147  unsigned getMaxThreads() const { return _workers.size(); }
148 
149  unsigned getWorkersCount() const { return _workersCount; }
150 
151  unsigned getPendingTasksCount() const { return _tasks.size(); }
152 
153  bool hasTasks() const { return !_tasks.empty(); }
154 
155  unsigned getWaitingCount() const { return _waitingCount; }
156 
157  bool isFinished() const { return _exit; }
158 
159  template<typename F, typename... A>
160  inline std::future<R> addTask(F&& task, A&&... args) {
161  checkLogic(!isFinished());
162  _tasks.emplace(std::bind(std::forward<F>(task), std::forward<A>(args)...));
163  reviewWorkers();
164  return _tasks.back().get_future();
165  }
166 
167  void waitingBegin() {
168  _waitingCount++;
169  }
170 
171  void waitingFinish() {
172  if (_waitingCount > 0) { _waitingCount--; }
173  }
174 
175  std::packaged_task<R()> takeTask() {
176  if (_tasks.empty()) { return std::packaged_task<R()>(); }
177  auto task = std::move(_tasks.front());
178  _tasks.pop();
179  return task;
180  }
181 
182  std::vector<std::thread> finish() {
183  _exit = true;
184  clearTasks();
185  std::vector<std::thread> result(_workersCount);
186  for (auto i = 0u; i < _workersCount; i++) {
187  result[i] = std::move(_workers[i]);
188  }
189  return result;
190  }
191 
192 private:
193  void reviewWorkers() {
194  if (_workersCount < _workers.size() && !_waitingCount) { _workers[_workersCount++] = _createWorker(); }
195  }
196 
197  void clearTasks() {
198  std::queue<std::packaged_task<R()>> empty;
199  std::swap(_tasks, empty);
200  }
201 
202  static unsigned defaultMaxThreads() {
203  constexpr auto DefaultMaxThreads = 4u;
204  const auto systemCpuCount = std::thread::hardware_concurrency();
205  return systemCpuCount ? systemCpuCount : DefaultMaxThreads;
206  }
207 
208  static unsigned detectMaxThreads(unsigned desiredMaxThreads) {
209  return desiredMaxThreads > 0 ? desiredMaxThreads : defaultMaxThreads();
210  }
211 
212  std::function<std::thread()> _createWorker;
213  std::vector<std::thread> _workers;
214  std::queue<std::packaged_task<R()>> _tasks;
215  unsigned _workersCount;
216  unsigned _waitingCount;
217  bool _exit;
218 };
219 
223 template<typename R, typename... A>
224 class Pipeline
225 {
226 public:
227 
231  using Signature = std::function<R(A...)>;
232 
236  virtual ~Pipeline() = default;
237 
244  virtual std::future<R> addTask(Signature task, A&&... args) = 0;
245 };
246 
250 template<typename R, typename... A>
251 class PipelineThreadPool: public Pipeline<R, A...>
252 {
253 public:
254  PipelineThreadPool(const PipelineThreadPool& other) = default;
256 
259 
260  std::future<R> addTask(typename Pipeline<R, A...>::Signature task, A&&... args) override {
261  return _pool->addTask(std::forward<decltype(task)>(task), std::forward<A>(args)...);
262  }
263 
268  explicit PipelineThreadPool(unsigned maxThreads = 0): _pool(std::make_shared<ThreadPool<R>>(maxThreads)) {}
269 
274  explicit PipelineThreadPool(std::shared_ptr<ThreadPool<R>> pool): _pool(std::move(pool)) {}
275 
280  std::shared_ptr<ThreadPool<R>> getPool() const { return _pool; }
281 
287  static std::unique_ptr<Pipeline<R, A...>> create(unsigned maxThreads = 0) {
288  return std::make_unique<PipelineThreadPool>(maxThreads);
289  }
290 
296  static std::unique_ptr<Pipeline<R, A...>> create(std::shared_ptr<ThreadPool<R>> pool) {
297  return std::make_unique<PipelineThreadPool>(pool);
298  }
299 
300 private:
301  std::shared_ptr<ThreadPool<R>> _pool;
302 };
303 
304 } // namespace CppMate
305 
306 #endif // CPP_MATE_THREAD_POOL_HPP
Represents pipeline implemetation with ThreadPool backend.
Definition: ThreadPool.hpp:252
PipelineThreadPool & operator=(const PipelineThreadPool &other)=default
std::shared_ptr< ThreadPool< R > > getPool() const
getPool
Definition: ThreadPool.hpp:280
static std::unique_ptr< Pipeline< R, A... > > create(std::shared_ptr< ThreadPool< R >> pool)
Creates pipeline instance.
Definition: ThreadPool.hpp:296
static std::unique_ptr< Pipeline< R, A... > > create(unsigned maxThreads=0)
Creates pipeline instance.
Definition: ThreadPool.hpp:287
std::future< R > addTask(typename Pipeline< R, A... >::Signature task, A &&... args) override
Definition: ThreadPool.hpp:260
PipelineThreadPool(unsigned maxThreads=0)
Constructor.
Definition: ThreadPool.hpp:268
PipelineThreadPool(const PipelineThreadPool &other)=default
PipelineThreadPool(std::shared_ptr< ThreadPool< R >> pool)
Constructor.
Definition: ThreadPool.hpp:274
PipelineThreadPool(PipelineThreadPool &&other)=default
PipelineThreadPool & operator=(PipelineThreadPool &&other)=default
Represents abstract pipeline.
Definition: ThreadPool.hpp:225
virtual std::future< R > addTask(Signature task, A &&... args)=0
Adds a new task for execution.
virtual ~Pipeline()=default
std::function< R(A...)> Signature
Definition: ThreadPool.hpp:231
Definition: ThreadPool.hpp:134
bool hasTasks() const
Definition: ThreadPool.hpp:153
std::future< R > addTask(F &&task, A &&... args)
Definition: ThreadPool.hpp:160
std::packaged_task< R()> takeTask()
Definition: ThreadPool.hpp:175
State(unsigned maxThreads, CreateWorker createWorker)
Definition: ThreadPool.hpp:138
unsigned getPendingTasksCount() const
Definition: ThreadPool.hpp:151
void waitingBegin()
Definition: ThreadPool.hpp:167
unsigned getMaxThreads() const
Definition: ThreadPool.hpp:147
std::vector< std::thread > finish()
Definition: ThreadPool.hpp:182
unsigned getWaitingCount() const
Definition: ThreadPool.hpp:155
void waitingFinish()
Definition: ThreadPool.hpp:171
std::function< std::thread()> CreateWorker
Definition: ThreadPool.hpp:136
unsigned getWorkersCount() const
Definition: ThreadPool.hpp:149
bool isFinished() const
Definition: ThreadPool.hpp:157
Represents thread pool.
Definition: ThreadPool.hpp:37
ThreadPool & operator=(const ThreadPool &other)=delete
ThreadPool & operator=(ThreadPool &&other)=default
ThreadPool(const ThreadPool &other)=delete
std::future< R > addTask(F &&task, A &&... args)
Adds a new task for execution.
Definition: ThreadPool.hpp:99
ThreadPool(ThreadPool &&other)=default
unsigned getPendingTasksCount() const
Returns number of pending tasks at the moment.
Definition: ThreadPool.hpp:77
unsigned getWorkersCount() const
Returns the number of active workers in the pool at the moment.
Definition: ThreadPool.hpp:71
ThreadPool(unsigned maxThreads=0)
Constructor.
Definition: ThreadPool.hpp:44
unsigned getMaxThreads() const
Returns the maximum possible number of threads in the pool.
Definition: ThreadPool.hpp:65
bool hasPendingTasks() const
Indicates that pool has at least one pending task at the moment.
Definition: ThreadPool.hpp:83
unsigned getWaitingCount() const
Returns the number of workers is waiting for a job.
Definition: ThreadPool.hpp:89
~ThreadPool()
Definition: ThreadPool.hpp:55
Definition: BinaryData.hpp:28