GDAL
cpl_worker_thread_pool.h
Go to the documentation of this file.
1/**********************************************************************
2 *
3 * Project: CPL - Common Portability Library
4 * Purpose: CPL worker thread pool
5 * Author: Even Rouault, <even dot rouault at spatialys dot com>
6 *
7 **********************************************************************
8 * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9 *
10 * SPDX-License-Identifier: MIT
11 ****************************************************************************/
12
13#ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
14#define CPL_WORKER_THREAD_POOL_H_INCLUDED_
15
16#include "cpl_multiproc.h"
17#include "cpl_list.h"
18
19#include <condition_variable>
20#include <functional>
21#include <memory>
22#include <mutex>
23#include <queue>
24#include <vector>
25
31
32#ifndef DOXYGEN_SKIP
34
35struct CPLWorkerThread
36{
37 CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
38 CPLWorkerThread() = default;
39
40 CPLThreadFunc pfnInitFunc = nullptr;
41 void *pInitData = nullptr;
42 CPLWorkerThreadPool *poTP = nullptr;
43 CPLJoinableThread *hThread = nullptr;
44 bool bMarkedAsWaiting = false;
45
46 std::mutex m_mutex{};
47 std::condition_variable m_cv{};
48};
49
50typedef enum
51{
52 CPLWTS_OK,
53 CPLWTS_STOP,
54 CPLWTS_ERROR
55} CPLWorkerThreadState;
56#endif // ndef DOXYGEN_SKIP
57
58class CPLJobQueue;
60using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>;
61
64{
66
67 std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
68 mutable std::mutex m_mutex{};
69 std::condition_variable m_cv{};
70 volatile CPLWorkerThreadState eState = CPLWTS_OK;
71 std::queue<std::function<void()>> jobQueue;
72 int nPendingJobs = 0;
73 bool m_bNotifyEvent = false;
74
75 CPLList *psWaitingWorkerThreadsList = nullptr;
76 int nWaitingWorkerThreads = 0;
77
78 int m_nMaxThreads = 0;
79
80 static void WorkerThreadFunction(void *user_data);
81
82 void DeclareJobFinished();
83 std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread);
84
85 public:
87 explicit CPLWorkerThreadPool(int nThreads);
89
90 bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData);
91 bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData,
92 bool bWaitallStarted);
93
95
96 bool SubmitJob(std::function<void()> task);
97 bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
98 bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData);
99 void WaitCompletion(int nMaxRemainingJobs = 0);
100 void WaitEvent();
101 void WakeUpWaitEvent();
102
104 int GetThreadCount() const;
105};
106
108class CPL_DLL CPLJobQueue
109{
111 CPLWorkerThreadPool *m_poPool = nullptr;
112 std::mutex m_mutex{};
113 std::condition_variable m_cv{};
114 int m_nPendingJobs = 0;
115
116 void DeclareJobFinished();
117
119 protected:
120 friend class CPLWorkerThreadPool;
121 explicit CPLJobQueue(CPLWorkerThreadPool *poPool);
123
124 public:
125 ~CPLJobQueue();
126
129 {
130 return m_poPool;
131 }
132
133 bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
134 bool SubmitJob(std::function<void()> task);
135 void WaitCompletion(int nMaxRemainingJobs = 0);
136 bool WaitEvent();
137};
138
139#endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_
Job queue.
Definition cpl_worker_thread_pool.h:109
CPLWorkerThreadPool * GetPool()
Return the owning worker thread pool.
Definition cpl_worker_thread_pool.h:128
Pool of worker threads.
Definition cpl_worker_thread_pool.h:64
CPLWorkerThreadPool()
Instantiate a new pool of worker threads.
Definition cpl_worker_thread_pool.cpp:34
bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData)
Setup the pool.
Definition cpl_worker_thread_pool.cpp:394
int GetThreadCount() const
Return the number of threads setup.
Definition cpl_worker_thread_pool.cpp:80
CPLJobQueuePtr CreateJobQueue()
Create a new job queue based on this worker thread pool.
Definition cpl_worker_thread_pool.cpp:552
void WakeUpWaitEvent()
Wake-up WaitEvent().
Definition cpl_worker_thread_pool.cpp:375
bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector< void * > &apData)
Queue several jobs.
Definition cpl_worker_thread_pool.cpp:239
void WaitEvent()
Wait for completion of at least one job, if there are any remaining, or for WakeUpWaitEvent() to have...
Definition cpl_worker_thread_pool.cpp:351
bool SubmitJob(std::function< void()> task)
Queue a new job.
Definition cpl_worker_thread_pool.cpp:134
void WaitCompletion(int nMaxRemainingJobs=0)
Wait for completion of part or whole jobs.
Definition cpl_worker_thread_pool.cpp:335
Simplest list implementation.
struct _CPLList CPLList
List element structure.
Definition cpl_list.h:31
#define CPL_DISALLOW_COPY_ASSIGN(ClassName)
Helper to remove the copy and assignment constructors so that the compiler will not generate the defa...
Definition cpl_port.h:936
std::unique_ptr< CPLJobQueue > CPLJobQueuePtr
Unique pointer to a job queue.
Definition cpl_worker_thread_pool.h:60