1 /**
2  * Threads and thread-pool.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2012.
5  * Copyright: Copyright (c) 2009-2011, David Simcha.
6  * Copyright: Copyright Guillaume Piolat 2016.
7  * License: Distributed under the
8  *      $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
9  *    (See accompanying file LICENSE)
10  */
11 module dplug.core.thread;
12 
13 import core.stdc.stdlib;
14 import core.stdc.stdio;
15 
16 import dplug.core.nogc;
17 import dplug.core.lockedqueue;
18 import dplug.core.sync;
19 
20 version(Posix)
21     import core.sys.posix.pthread;
22 else version(Windows)
23 {
24     import core.stdc.stdint : uintptr_t;
25     import core.sys.windows.windef;
26     import core.sys.windows.winbase;
27     import core.thread;
28 
29     extern (Windows) alias btex_fptr = uint function(void*) ;
30     extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
31 }
32 else
33     static assert(false, "Platform not supported");
34 
35 version (OSX)
36     version = Darwin;
37 else version (iOS)
38     version = Darwin;
39 else version (TVOS)
40     version = Darwin;
41 else version (WatchOS)
42     version = Darwin;
43 
44 version(Darwin)
45 {
46     extern(C) nothrow @nogc
47     int sysctlbyname(const(char)*, void *, size_t *, void *, size_t);
48 }
49 
50 //debug = threadPoolIsActuallySynchronous;
51 
52 
53 /// Legacy thread function
54 alias ThreadDelegate = void delegate() nothrow @nogc;
55 
56 /// Thread function with user data, used eg. in thread pool.
57 alias ThreadDelegateUser = void delegate(void* userData) nothrow @nogc;
58 
59 
60 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc
61 {
62     return Thread(callback, stackSize);
63 }
64 
65 Thread makeThread(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) nothrow @nogc
66 {
67     return Thread(callback, stackSize, userData);
68 }
69 
70 /// Optimistic thread, failure not supported
71 struct Thread
72 {
73 nothrow:
74 @nogc:
75 public:
76 
77     /// Create a thread with user data. Thread is not created until `start` has been called.
78     ///
79     /// Params:
80     ///     callback  = The delegate that will be called by the thread.
81     ///     stackSize = The thread stack size in bytes. 0 for default size.
82     ///     userData  = a pointer to be passed to thread delegate
83     ///
84     /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct
85     ///          member delegate) to have additional context.
86     ///          Passing struct method delegates are currently UNSUPPORTED.
87     ///
88     this(ThreadDelegate callback, size_t stackSize = 0)
89     {
90         _stackSize = stackSize;
91         _context = cast(CreateContext*) malloc(CreateContext.sizeof);
92         _context.callback = callback;
93         _context.callbackUser = null;
94     }
95 
96     ///ditto
97     this(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null)
98     {
99         _stackSize = stackSize;
100         _context = cast(CreateContext*) malloc(CreateContext.sizeof);
101         _context.callback = null;
102         _context.callbackUser = callback;
103         _context.userData = userData;
104     }
105 
106     ~this()
107     {
108         if (_context !is null)
109         {
110             free(_context);
111             _context = null;
112         }
113     }
114 
115     @disable this(this);
116 
117     /// Starts the thread. Threads are created suspended. This function can
118     /// only be called once.
119     void start()
120     {
121         version(Posix)
122         {
123             pthread_attr_t attr;
124 
125             int err = assumeNothrowNoGC(
126                 (pthread_attr_t* pattr)
127                 {
128                     return pthread_attr_init(pattr);
129                 })(&attr);
130 
131             if (err != 0)
132                 assert(false);
133 
134             if(_stackSize != 0)
135             {
136                 int err2 = assumeNothrowNoGC(
137                     (pthread_attr_t* pattr, size_t stackSize)
138                     {
139                         return pthread_attr_setstacksize(pattr, stackSize);
140                     })(&attr, _stackSize);
141                 if (err2 != 0)
142                     assert(false);
143             }
144 
145             int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, _context);
146             if (err3 != 0)
147                 assert(false);
148 
149             int err4 = assumeNothrowNoGC(
150                 (pthread_attr_t* pattr)
151                 {
152                     return pthread_attr_destroy(pattr);
153                 })(&attr);
154             if (err4 != 0)
155                 assert(false);
156         }
157         else version(Windows)
158         {
159 
160             uint dummy;
161 
162             _id = cast(HANDLE) _beginthreadex(null,
163                                               cast(uint)_stackSize,
164                                               &windowsThreadEntryPoint,
165                                               _context,
166                                               CREATE_SUSPENDED,
167                                               &dummy);
168             if (cast(size_t)_id == 0)
169                 assert(false);
170             if (ResumeThread(_id) == -1)
171                 assert(false);
172         }
173         else
174             static assert(false);
175     }
176 
177     /// Wait for that thread termination
178     /// Again, this function can be called only once.
179     /// This actually releases the thread resource.
180     void join()
181     {
182         version(Posix)
183         {
184             void* returnValue;
185             if (0 != pthread_join(_id, &returnValue))
186                 assert(false);
187         }
188         else version(Windows)
189         {
190             if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0)
191                 assert(false);
192             CloseHandle(_id);
193         }
194     }
195 
196     void* getThreadID()
197     {
198         version(Posix) return cast(void*)_id;
199         else version(Windows) return cast(void*)_id;
200         else assert(false);
201     }
202 
203 private:
204     version(Posix) 
205     {
206         pthread_t _id;
207     }
208     else version(Windows) 
209     {
210         HANDLE _id;
211     }
212     else 
213         static assert(false);
214 
215     // Thread context given to OS thread creation function need to have a constant adress
216     // since there are no guarantees the `Thread` struct will be at the same adress.
217     static struct CreateContext
218     {
219     nothrow:
220     @nogc:
221         ThreadDelegate callback;
222         ThreadDelegateUser callbackUser;
223         void* userData;
224         void call()
225         {
226             if (callback !is null)
227                 callback();
228             else
229                 callbackUser(userData);
230         }
231     }
232     CreateContext* _context;
233 
234     size_t _stackSize;
235 }
236 
237 version(Posix)
238 {
239     extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc
240     {
241         Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext);
242         context.call();
243         return null;
244     }
245 }
246 
247 version(Windows)
248 {
249     extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc
250     {
251         Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext);
252         context.call();
253         return 0;
254     }
255 }
256 
257 unittest
258 {
259     int outerInt = 0;
260 
261     class A
262     {
263     nothrow @nogc:
264         this()
265         {
266             t = makeThread(&f);
267             t.start();
268         }
269 
270         void join()
271         {
272             t.join();
273         }
274 
275         void f()
276         {
277             outerInt = 1;
278             innerInt = 2;
279 
280             // verify this
281             assert(checkValue0 == 0x11223344);
282             assert(checkValue1 == 0x55667788);
283         }
284 
285         int checkValue0 = 0x11223344;
286         int checkValue1 = 0x55667788;
287         int innerInt = 0;
288         Thread t;
289     }
290 
291     auto a = new A;
292     a.t.join();
293     assert(a.innerInt == 2);
294     a.destroy();
295     assert(outerInt == 1);
296 }
297 
298 /// Launch a function in a newly created thread, which is destroyed afterwards.
299 /// Return the thread so that you can call `.join()` on it.
300 Thread launchInAThread(ThreadDelegate dg, size_t stackSize = 0) nothrow @nogc
301 {
302     Thread t = makeThread(dg, stackSize);
303     t.start();
304     return t;
305 }
306 
307 //
308 // Thread-pool
309 //
310 
311 /// Returns: Number of CPUs.
312 int getTotalNumberOfCPUs() nothrow @nogc
313 {
314     version(Windows)
315     {
316       //  import core.sys.windows.windef;// : SYSTEM_INFO, GetSystemInfo;
317         SYSTEM_INFO si;
318         GetSystemInfo(&si);
319         int procs = cast(int) si.dwNumberOfProcessors;
320         if (procs < 1)
321             procs = 1;
322         return procs;
323     }
324     else version(Darwin)
325     {
326         auto nameStr = "machdep.cpu.core_count\0".ptr;
327         uint ans;
328         size_t len = uint.sizeof;
329         sysctlbyname(nameStr, &ans, &len, null, 0);
330         return cast(int)ans;
331     }
332     else version(Posix)
333     {
334         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
335         return cast(int) sysconf(_SC_NPROCESSORS_ONLN);
336     }
337     else
338         static assert(false, "OS unsupported");
339 }
340 
341 alias ThreadPoolDelegate = void delegate(int workItem, int threadIndex) nothrow @nogc;
342 
343 
344 debug(threadPoolIsActuallySynchronous)
345 {
346     /// Fake synchronous version of the thread pool
347     /// For measurement purpose, makes it easier to measure actual CPU time spent.
348     class ThreadPool
349     {
350     public:
351     nothrow:
352     @nogc:
353 
354         enum constantThreadId = 0;
355 
356         this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0)
357         {
358         }
359 
360         ~this()
361         {
362         }
363 
364         void parallelFor(int count, scope ThreadPoolDelegate dg)
365         {
366             foreach(i; 0..count)
367                 dg(cast(int)i, constantThreadId);
368         }
369 
370         void parallelForAsync(int count, scope ThreadPoolDelegate dg)
371         {
372             foreach(i; 0..count)
373                 dg(cast(int)i, constantThreadId);
374         }
375 
376         /// Wait for completion of the previous parallelFor, if any.
377         // It's always safe to call this function before doing another parallelFor.
378         void waitForCompletion()
379         {
380         }
381 
382         int numThreads() pure const
383         {
384             return 1;
385         }
386     }
387 }
388 else
389 {
390 
391     /// Rewrite of the ThreadPool using condition variables.
392     /// FUTURE: this could be speed-up by using futures. Description of the task
393     ///         and associated condition+mutex would go in an external struct.
394     /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from
395     ///       multiple threads at once.
396     class ThreadPool
397     {
398     public:
399     nothrow:
400     @nogc:
401 
402         /// Creates a thread-pool.
403         /// Params:
404         ///     numThreads = Number of threads to create (0 = auto).
405         ///     maxThreads = A maximum number of threads to create (0 = none).
406         ///     stackSize = Stack size to create threads with (0 = auto).
407         this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0)
408         {
409             // Create sync first
410             _workMutex = makeMutex();
411             _workCondition = makeConditionVariable();
412 
413             _finishMutex = makeMutex();
414             _finishCondition = makeConditionVariable();
415 
416             // Create threads
417             if (numThreads == 0)
418                 numThreads = getTotalNumberOfCPUs();
419 
420             // Limit number of threads eventually (this is done to give other software some leeway
421             // in a soft real-time OS)
422             if (maxThreads != 0)
423             {
424                 if (numThreads > maxThreads)
425                     numThreads = maxThreads;
426             }
427 
428             assert(numThreads >= 1);
429 
430             _threads = mallocSlice!Thread(numThreads);
431             foreach(size_t threadIndex, ref thread; _threads)
432             {
433                 // Pass the index of the thread through user data, so that it can be passed to the task in 
434                 // case these task need thread-local buffers.
435                 void* userData = cast(void*)(threadIndex);
436                 thread = makeThread(&workerThreadFunc, stackSize, userData);
437             }
438 
439             // because of calling currentThreadId, don't start threads until all are created
440             foreach(ref thread; _threads)
441             {
442                 thread.start();
443             }
444         }
445 
446         /// Destroys a thread-pool.
447         ~this()
448         {
449             if (_threads !is null)
450             {
451                 assert(_state == State.initial);
452 
453                 // Put the threadpool is stop state
454                 _workMutex.lock();
455                     _stopFlag = true;
456                 _workMutex.unlock();
457 
458                 // Notify all workers
459                 _workCondition.notifyAll();
460 
461                 // Wait for each thread termination
462                 foreach(ref thread; _threads)
463                     thread.join();
464 
465                 // Detroys each thread
466                 foreach(ref thread; _threads)
467                     thread.destroy();
468                 freeSlice(_threads);
469                 _threads = null;
470                 destroy(_workMutex);
471             }
472         }
473 
474         /// Calls the delegate in parallel, with 0..count as index.
475         /// Immediate waiting for completion.
476         /// If there is only one task, it is run directly on this thread.
477         /// IMPORTANT to be reentrant there! widget drawn alone can then launch same threadpool.
478         void parallelFor(int count, scope ThreadPoolDelegate dg)
479         {
480             assert(_state == State.initial);
481 
482             // Do not launch worker threads for one work-item, not worth it.
483             // (but it is worth it in async).
484             if (count == 1)
485             {
486                 int dummythreadID = 0; // it should not matter which is passed as long as it's a valid ID.
487                 dg(0, dummythreadID);
488                 return;
489             }
490 
491             // Unleash parallel threads.
492             parallelForAsync(count, dg);
493 
494             // Wait for completion immediately.
495             waitForCompletion(); 
496         }
497 
498         /// Same, but does not wait for completion. 
499         /// You cannot have 2 concurrent parallelFor for the same thread-pool.
500         void parallelForAsync(int count, scope ThreadPoolDelegate dg)
501         {
502             assert(_state == State.initial);
503 
504             if (count == 0) // no tasks, exit immediately
505                 return;
506 
507             // At this point we assume all worker threads are waiting for messages
508 
509             // Sets the current task
510             _workMutex.lock();
511 
512             _taskDelegate = dg;       // immutable during this parallelFor
513             _taskNumWorkItem = count; // immutable during this parallelFor
514             _taskCurrentWorkItem = 0;
515             _taskCompleted = 0;
516 
517             _workMutex.unlock();
518 
519             if (count >= _threads.length)
520             {
521                 // wake up all threads
522                 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads
523                 _workCondition.notifyAll();
524             }
525             else
526             {
527                 // Less tasks than threads in the pool: only wake-up some threads.
528                 for (int t = 0; t < count; ++t)
529                     _workCondition.notifyOne();
530             }
531 
532             _state = State.parallelForInProgress;
533         }
534 
535         /// Wait for completion of the previous parallelFor, if any.
536         // It's always safe to call this function before doing another parallelFor.
537         void waitForCompletion()
538         {
539             if (_state == State.initial)
540                 return; // that means that parallel threads were not launched
541 
542             assert(_state == State.parallelForInProgress);
543 
544             _finishMutex.lock();
545             scope(exit) _finishMutex.unlock();
546 
547             // FUTURE: order thread will be waken up multiple times
548             //         (one for every completed task)
549             //         maybe that can be optimized
550             while (_taskCompleted < _taskNumWorkItem)
551             {
552                 _finishCondition.wait(&_finishMutex);
553             }
554 
555             _state = State.initial;
556         }
557 
558         int numThreads() pure const
559         {
560             return cast(int)_threads.length;
561         }
562 
563     private:
564         Thread[] _threads = null;
565 
566         // A map to find back thread index from thread system ID
567         void*[] _threadID = null;
568 
569         // Used to signal more work
570         UncheckedMutex _workMutex;
571         ConditionVariable _workCondition;
572 
573         // Used to signal completion
574         UncheckedMutex _finishMutex;
575         ConditionVariable _finishCondition;
576 
577         // These fields represent the current task group (ie. a parallelFor)
578         ThreadPoolDelegate _taskDelegate;
579         int _taskNumWorkItem;     // total number of tasks in this task group
580         int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex)
581         int _taskCompleted;       // every task < taskCompleted has already been completed (protected by _finishMutex)
582 
583         bool _stopFlag;
584 
585         bool hasWork()
586         {
587             return _taskCurrentWorkItem < _taskNumWorkItem;
588         }
589 
590         // Represent the thread-pool state from the user POV
591         enum State
592         {
593             initial,               // tasks can be launched
594             parallelForInProgress, // task were launched, but not waited one
595         }
596         State _state = State.initial;
597 
598         // What worker threads do
599         // MAYDO: threads come here with bad context with struct delegates
600         void workerThreadFunc(void* userData)
601         {
602             while (true)
603             {
604                 int workItem = -1;
605                 {
606                     _workMutex.lock();
607                     scope(exit) _workMutex.unlock();
608 
609                     // Wait for notification
610                     while (!_stopFlag && !hasWork())
611                         _workCondition.wait(&_workMutex);
612 
613                     if (_stopFlag && !hasWork())
614                         return;
615 
616                     assert(hasWork());
617 
618                     // Pick a task and increment counter
619                     workItem = _taskCurrentWorkItem;
620                     _taskCurrentWorkItem++;
621                 }
622 
623                 // Find thread index from user data set by pool
624                 int threadIndex = cast(int)( cast(size_t)(userData) );
625 
626                 // Do the actual task
627                 _taskDelegate(workItem, threadIndex);
628 
629                 // signal completion of one more task
630                 {
631                     _finishMutex.lock();
632                     _taskCompleted++;
633                     _finishMutex.unlock();
634 
635                     _finishCondition.notifyOne(); // wake-up
636                 }
637             }
638         }
639     }
640 }
641 
642 /// Get the current thread OS handle.
643 /// The returned ID is just used for display. You can't get a `Thread` out of it.
644 public static size_t getCurrentThreadId() nothrow @nogc
645 {
646     version(Windows)
647     {
648         return cast(size_t) GetCurrentThreadId();
649     }
650     else version(Posix)
651     {
652         return cast(size_t)cast(void*)pthread_self();
653     }
654     else
655         static assert(false);
656 }
657 
658 unittest
659 {
660     import core.atomic;
661     import dplug.core.nogc;
662 
663     struct A
664     {
665         ThreadPool _pool;
666         int _numThreads;
667 
668         this(int numThreads, int maxThreads = 0, int stackSize = 0)
669         {
670             _pool = mallocNew!ThreadPool(numThreads, maxThreads, stackSize);
671             _numThreads = _pool.numThreads();
672         }
673 
674         ~this()
675         {
676             _pool.destroy();
677         }
678 
679         void launch(int count, bool async) nothrow @nogc
680         {
681             if (async)
682             {
683                 _pool.parallelForAsync(count, &loopBody);
684                 _pool.waitForCompletion();
685             }
686             else
687                 _pool.parallelFor(count, &loopBody);
688         }
689 
690         void loopBody(int workItem, int threadIndex) nothrow @nogc
691         {
692             bool goodIndex = (threadIndex >= 0) && (threadIndex < _numThreads);
693             assert(goodIndex);
694             atomicOp!"+="(counter, 1);
695         }
696 
697         shared(int) counter = 0;
698     }
699 
700     foreach (numThreads;  [0, 1, 2, 4, 8, 16, 32])
701     {
702         auto a = A(numThreads);
703         a.launch(10, false);
704         assert(a.counter == 10);
705 
706         a.launch(500, true);
707         assert(a.counter == 510);
708 
709         a.launch(1, false);
710         assert(a.counter == 511);
711 
712         a.launch(1, true);
713         assert(a.counter == 512);
714 
715         a.launch(0, true);
716         assert(a.counter == 512);
717         a.launch(0, false);
718         assert(a.counter == 512);
719     }
720 }
721 
722 // Bonus: Capacity to get the macOS version
723 
724 version(Darwin)
725 {
726 
727     // Note: .init value is a large future version (100.0.0), so that failure to detect version
728     // lead to newer behaviour.
729     struct MacOSVersion
730     {
731         int major = 100; // eg: major = 10   minor = 7 for 10.7
732         int minor = 0;
733         int patch = 0;
734     }
735 
736     /// Get the macOS version we are running on.
737     /// Note: it only makes sense for macOS, not iOS.
738     /// Note: patch always return zero for now.
739     MacOSVersion getMacOSVersion() nothrow @nogc
740     {
741         char[256] str;
742         size_t size = 256;
743         int ret = sysctlbyname("kern.osrelease", str.ptr, &size, null, 0);
744         MacOSVersion result;
745         if (ret != 0) 
746             return result;
747         int darwinMajor, darwinMinor, darwinPatch;
748         if (3 == sscanf(str.ptr, "%d.%d.%d", &darwinMajor, &darwinMinor, &darwinPatch))
749         {
750             result.patch = 0;
751 
752             switch(darwinMajor)
753             {
754                 case 0: .. case 11:
755                     result.major = 10; // 10.7
756                     result.minor = 7;
757                     break;
758 
759                 case 12: .. case 19:
760                     result.major = 10; // 10.7
761                     result.minor = darwinMajor - 4; // 10.8 to 10.15
762                     break;
763 
764                 case 20:
765                     result.major = 11; // Big Sur
766                     result.minor = 0;
767                     break;
768 
769                 case 21:
770                     result.major = 12; // Monterey
771                     result.minor = 0;
772                     break;
773 
774 
775                 default:
776                     result.major = 100;
777                     result.minor = 0;
778             }
779         }
780         return result;
781     }
782 
783   /*  unittest
784     {
785         MacOSVersion ver = getMacOSVersion();
786         printf("Detected macOS %d.%d.%d\n", ver.major, ver.minor, ver.patch);
787     } */
788 }