1 /** 2 * Threads and thread-pool. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2012. 5 * Copyright: Copyright (c) 2009-2011, David Simcha. 6 * Copyright: Copyright Guillaume Piolat 2016. 7 * License: Distributed under the 8 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 9 * (See accompanying file LICENSE) 10 */ 11 module dplug.core.thread; 12 13 import core.stdc.stdlib; 14 import core.stdc.stdio; 15 16 import dplug.core.nogc; 17 import dplug.core.lockedqueue; 18 import dplug.core.sync; 19 20 version(Posix) 21 import core.sys.posix.pthread; 22 else version(Windows) 23 { 24 import core.stdc.stdint : uintptr_t; 25 import core.sys.windows.windef; 26 import core.sys.windows.winbase; 27 import core.thread; 28 29 extern (Windows) alias btex_fptr = uint function(void*) ; 30 extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc; 31 } 32 else 33 static assert(false, "Platform not supported"); 34 35 version (OSX) 36 version = Darwin; 37 else version (iOS) 38 version = Darwin; 39 else version (TVOS) 40 version = Darwin; 41 else version (WatchOS) 42 version = Darwin; 43 44 version(Darwin) 45 { 46 extern(C) nothrow @nogc 47 int sysctlbyname(const(char)*, void *, size_t *, void *, size_t); 48 } 49 50 //debug = threadPoolIsActuallySynchronous; 51 52 53 /// Legacy thread function 54 alias ThreadDelegate = void delegate() nothrow @nogc; 55 56 /// Thread function with user data, used eg. in thread pool. 57 alias ThreadDelegateUser = void delegate(void* userData) nothrow @nogc; 58 59 60 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc 61 { 62 return Thread(callback, stackSize); 63 } 64 65 Thread makeThread(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) nothrow @nogc 66 { 67 return Thread(callback, stackSize, userData); 68 } 69 70 /// Optimistic thread, failure not supported 71 struct Thread 72 { 73 nothrow: 74 @nogc: 75 public: 76 77 /// Create a thread with user data. Thread is not created until `start` has been called. 78 /// 79 /// Params: 80 /// callback = The delegate that will be called by the thread. 81 /// stackSize = The thread stack size in bytes. 0 for default size. 82 /// userData = a pointer to be passed to thread delegate 83 /// 84 /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct 85 /// member delegate) to have additional context. 86 /// Passing struct method delegates are currently UNSUPPORTED. 87 /// 88 this(ThreadDelegate callback, size_t stackSize = 0) 89 { 90 _stackSize = stackSize; 91 _context = cast(CreateContext*) malloc(CreateContext.sizeof); 92 _context.callback = callback; 93 _context.callbackUser = null; 94 } 95 96 ///ditto 97 this(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) 98 { 99 _stackSize = stackSize; 100 _context = cast(CreateContext*) malloc(CreateContext.sizeof); 101 _context.callback = null; 102 _context.callbackUser = callback; 103 _context.userData = userData; 104 } 105 106 ~this() 107 { 108 if (_context !is null) 109 { 110 free(_context); 111 _context = null; 112 } 113 } 114 115 @disable this(this); 116 117 /// Starts the thread. Threads are created suspended. This function can 118 /// only be called once. 119 void start() 120 { 121 version(Posix) 122 { 123 pthread_attr_t attr; 124 125 int err = assumeNothrowNoGC( 126 (pthread_attr_t* pattr) 127 { 128 return pthread_attr_init(pattr); 129 })(&attr); 130 131 if (err != 0) 132 assert(false); 133 134 if(_stackSize != 0) 135 { 136 int err2 = assumeNothrowNoGC( 137 (pthread_attr_t* pattr, size_t stackSize) 138 { 139 return pthread_attr_setstacksize(pattr, stackSize); 140 })(&attr, _stackSize); 141 if (err2 != 0) 142 assert(false); 143 } 144 145 int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, _context); 146 if (err3 != 0) 147 assert(false); 148 149 int err4 = assumeNothrowNoGC( 150 (pthread_attr_t* pattr) 151 { 152 return pthread_attr_destroy(pattr); 153 })(&attr); 154 if (err4 != 0) 155 assert(false); 156 } 157 else version(Windows) 158 { 159 160 uint dummy; 161 162 _id = cast(HANDLE) _beginthreadex(null, 163 cast(uint)_stackSize, 164 &windowsThreadEntryPoint, 165 _context, 166 CREATE_SUSPENDED, 167 &dummy); 168 if (cast(size_t)_id == 0) 169 assert(false); 170 if (ResumeThread(_id) == -1) 171 assert(false); 172 } 173 else 174 static assert(false); 175 } 176 177 /// Wait for that thread termination 178 /// Again, this function can be called only once. 179 /// This actually releases the thread resource. 180 void join() 181 { 182 version(Posix) 183 { 184 void* returnValue; 185 if (0 != pthread_join(_id, &returnValue)) 186 assert(false); 187 } 188 else version(Windows) 189 { 190 if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0) 191 assert(false); 192 CloseHandle(_id); 193 } 194 } 195 196 void* getThreadID() 197 { 198 version(Posix) return cast(void*)_id; 199 else version(Windows) return cast(void*)_id; 200 else assert(false); 201 } 202 203 private: 204 version(Posix) 205 { 206 pthread_t _id; 207 } 208 else version(Windows) 209 { 210 HANDLE _id; 211 } 212 else 213 static assert(false); 214 215 // Thread context given to OS thread creation function need to have a constant adress 216 // since there are no guarantees the `Thread` struct will be at the same adress. 217 static struct CreateContext 218 { 219 nothrow: 220 @nogc: 221 ThreadDelegate callback; 222 ThreadDelegateUser callbackUser; 223 void* userData; 224 void call() 225 { 226 if (callback !is null) 227 callback(); 228 else 229 callbackUser(userData); 230 } 231 } 232 CreateContext* _context; 233 234 size_t _stackSize; 235 } 236 237 version(Posix) 238 { 239 extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc 240 { 241 Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext); 242 context.call(); 243 return null; 244 } 245 } 246 247 version(Windows) 248 { 249 extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc 250 { 251 Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext); 252 context.call(); 253 return 0; 254 } 255 } 256 257 unittest 258 { 259 int outerInt = 0; 260 261 class A 262 { 263 nothrow @nogc: 264 this() 265 { 266 t = makeThread(&f); 267 t.start(); 268 } 269 270 void join() 271 { 272 t.join(); 273 } 274 275 void f() 276 { 277 outerInt = 1; 278 innerInt = 2; 279 280 // verify this 281 assert(checkValue0 == 0x11223344); 282 assert(checkValue1 == 0x55667788); 283 } 284 285 int checkValue0 = 0x11223344; 286 int checkValue1 = 0x55667788; 287 int innerInt = 0; 288 Thread t; 289 } 290 291 auto a = new A; 292 a.t.join(); 293 assert(a.innerInt == 2); 294 a.destroy(); 295 assert(outerInt == 1); 296 } 297 298 /// Launch a function in a newly created thread, which is destroyed afterwards. 299 /// Return the thread so that you can call `.join()` on it. 300 Thread launchInAThread(ThreadDelegate dg, size_t stackSize = 0) nothrow @nogc 301 { 302 Thread t = makeThread(dg, stackSize); 303 t.start(); 304 return t; 305 } 306 307 // 308 // Thread-pool 309 // 310 311 /// Returns: Number of CPUs. 312 int getTotalNumberOfCPUs() nothrow @nogc 313 { 314 version(Windows) 315 { 316 // import core.sys.windows.windef;// : SYSTEM_INFO, GetSystemInfo; 317 SYSTEM_INFO si; 318 GetSystemInfo(&si); 319 int procs = cast(int) si.dwNumberOfProcessors; 320 if (procs < 1) 321 procs = 1; 322 return procs; 323 } 324 else version(Darwin) 325 { 326 auto nameStr = "machdep.cpu.core_count\0".ptr; 327 uint ans; 328 size_t len = uint.sizeof; 329 sysctlbyname(nameStr, &ans, &len, null, 0); 330 return cast(int)ans; 331 } 332 else version(Posix) 333 { 334 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 335 return cast(int) sysconf(_SC_NPROCESSORS_ONLN); 336 } 337 else 338 static assert(false, "OS unsupported"); 339 } 340 341 alias ThreadPoolDelegate = void delegate(int workItem, int threadIndex) nothrow @nogc; 342 343 344 debug(threadPoolIsActuallySynchronous) 345 { 346 /// Fake synchronous version of the thread pool 347 /// For measurement purpose, makes it easier to measure actual CPU time spent. 348 class ThreadPool 349 { 350 public: 351 nothrow: 352 @nogc: 353 354 enum constantThreadId = 0; 355 356 this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0) 357 { 358 } 359 360 ~this() 361 { 362 } 363 364 void parallelFor(int count, scope ThreadPoolDelegate dg) 365 { 366 foreach(i; 0..count) 367 dg(cast(int)i, constantThreadId); 368 } 369 370 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 371 { 372 foreach(i; 0..count) 373 dg(cast(int)i, constantThreadId); 374 } 375 376 /// Wait for completion of the previous parallelFor, if any. 377 // It's always safe to call this function before doing another parallelFor. 378 void waitForCompletion() 379 { 380 } 381 382 int numThreads() pure const 383 { 384 return 1; 385 } 386 } 387 } 388 else 389 { 390 391 /// Rewrite of the ThreadPool using condition variables. 392 /// FUTURE: this could be speed-up by using futures. Description of the task 393 /// and associated condition+mutex would go in an external struct. 394 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from 395 /// multiple threads at once. 396 class ThreadPool 397 { 398 public: 399 nothrow: 400 @nogc: 401 402 /// Creates a thread-pool. 403 /// Params: 404 /// numThreads = Number of threads to create (0 = auto). 405 /// maxThreads = A maximum number of threads to create (0 = none). 406 /// stackSize = Stack size to create threads with (0 = auto). 407 this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0) 408 { 409 // Create sync first 410 _workMutex = makeMutex(); 411 _workCondition = makeConditionVariable(); 412 413 _finishMutex = makeMutex(); 414 _finishCondition = makeConditionVariable(); 415 416 // Create threads 417 if (numThreads == 0) 418 numThreads = getTotalNumberOfCPUs(); 419 420 // Limit number of threads eventually (this is done to give other software some leeway 421 // in a soft real-time OS) 422 if (maxThreads != 0) 423 { 424 if (numThreads > maxThreads) 425 numThreads = maxThreads; 426 } 427 428 assert(numThreads >= 1); 429 430 _threads = mallocSlice!Thread(numThreads); 431 foreach(size_t threadIndex, ref thread; _threads) 432 { 433 // Pass the index of the thread through user data, so that it can be passed to the task in 434 // case these task need thread-local buffers. 435 void* userData = cast(void*)(threadIndex); 436 thread = makeThread(&workerThreadFunc, stackSize, userData); 437 } 438 439 // because of calling currentThreadId, don't start threads until all are created 440 foreach(ref thread; _threads) 441 { 442 thread.start(); 443 } 444 } 445 446 /// Destroys a thread-pool. 447 ~this() 448 { 449 if (_threads !is null) 450 { 451 assert(_state == State.initial); 452 453 // Put the threadpool is stop state 454 _workMutex.lock(); 455 _stopFlag = true; 456 _workMutex.unlock(); 457 458 // Notify all workers 459 _workCondition.notifyAll(); 460 461 // Wait for each thread termination 462 foreach(ref thread; _threads) 463 thread.join(); 464 465 // Detroys each thread 466 foreach(ref thread; _threads) 467 thread.destroy(); 468 freeSlice(_threads); 469 _threads = null; 470 destroy(_workMutex); 471 } 472 } 473 474 /// Calls the delegate in parallel, with 0..count as index. 475 /// Immediate waiting for completion. 476 /// If there is only one task, it is run directly on this thread. 477 /// IMPORTANT to be reentrant there! widget drawn alone can then launch same threadpool. 478 void parallelFor(int count, scope ThreadPoolDelegate dg) 479 { 480 assert(_state == State.initial); 481 482 // Do not launch worker threads for one work-item, not worth it. 483 // (but it is worth it in async). 484 if (count == 1) 485 { 486 int dummythreadID = 0; // it should not matter which is passed as long as it's a valid ID. 487 dg(0, dummythreadID); 488 return; 489 } 490 491 // Unleash parallel threads. 492 parallelForAsync(count, dg); 493 494 // Wait for completion immediately. 495 waitForCompletion(); 496 } 497 498 /// Same, but does not wait for completion. 499 /// You cannot have 2 concurrent parallelFor for the same thread-pool. 500 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 501 { 502 assert(_state == State.initial); 503 504 if (count == 0) // no tasks, exit immediately 505 return; 506 507 // At this point we assume all worker threads are waiting for messages 508 509 // Sets the current task 510 _workMutex.lock(); 511 512 _taskDelegate = dg; // immutable during this parallelFor 513 _taskNumWorkItem = count; // immutable during this parallelFor 514 _taskCurrentWorkItem = 0; 515 _taskCompleted = 0; 516 517 _workMutex.unlock(); 518 519 if (count >= _threads.length) 520 { 521 // wake up all threads 522 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads 523 _workCondition.notifyAll(); 524 } 525 else 526 { 527 // Less tasks than threads in the pool: only wake-up some threads. 528 for (int t = 0; t < count; ++t) 529 _workCondition.notifyOne(); 530 } 531 532 _state = State.parallelForInProgress; 533 } 534 535 /// Wait for completion of the previous parallelFor, if any. 536 // It's always safe to call this function before doing another parallelFor. 537 void waitForCompletion() 538 { 539 if (_state == State.initial) 540 return; // that means that parallel threads were not launched 541 542 assert(_state == State.parallelForInProgress); 543 544 _finishMutex.lock(); 545 scope(exit) _finishMutex.unlock(); 546 547 // FUTURE: order thread will be waken up multiple times 548 // (one for every completed task) 549 // maybe that can be optimized 550 while (_taskCompleted < _taskNumWorkItem) 551 { 552 _finishCondition.wait(&_finishMutex); 553 } 554 555 _state = State.initial; 556 } 557 558 int numThreads() pure const 559 { 560 return cast(int)_threads.length; 561 } 562 563 private: 564 Thread[] _threads = null; 565 566 // A map to find back thread index from thread system ID 567 void*[] _threadID = null; 568 569 // Used to signal more work 570 UncheckedMutex _workMutex; 571 ConditionVariable _workCondition; 572 573 // Used to signal completion 574 UncheckedMutex _finishMutex; 575 ConditionVariable _finishCondition; 576 577 // These fields represent the current task group (ie. a parallelFor) 578 ThreadPoolDelegate _taskDelegate; 579 int _taskNumWorkItem; // total number of tasks in this task group 580 int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex) 581 int _taskCompleted; // every task < taskCompleted has already been completed (protected by _finishMutex) 582 583 bool _stopFlag; 584 585 bool hasWork() 586 { 587 return _taskCurrentWorkItem < _taskNumWorkItem; 588 } 589 590 // Represent the thread-pool state from the user POV 591 enum State 592 { 593 initial, // tasks can be launched 594 parallelForInProgress, // task were launched, but not waited one 595 } 596 State _state = State.initial; 597 598 // What worker threads do 599 // MAYDO: threads come here with bad context with struct delegates 600 void workerThreadFunc(void* userData) 601 { 602 while (true) 603 { 604 int workItem = -1; 605 { 606 _workMutex.lock(); 607 scope(exit) _workMutex.unlock(); 608 609 // Wait for notification 610 while (!_stopFlag && !hasWork()) 611 _workCondition.wait(&_workMutex); 612 613 if (_stopFlag && !hasWork()) 614 return; 615 616 assert(hasWork()); 617 618 // Pick a task and increment counter 619 workItem = _taskCurrentWorkItem; 620 _taskCurrentWorkItem++; 621 } 622 623 // Find thread index from user data set by pool 624 int threadIndex = cast(int)( cast(size_t)(userData) ); 625 626 // Do the actual task 627 _taskDelegate(workItem, threadIndex); 628 629 // signal completion of one more task 630 { 631 _finishMutex.lock(); 632 _taskCompleted++; 633 _finishMutex.unlock(); 634 635 _finishCondition.notifyOne(); // wake-up 636 } 637 } 638 } 639 } 640 } 641 642 /// Get the current thread OS handle. 643 /// The returned ID is just used for display. You can't get a `Thread` out of it. 644 public static size_t getCurrentThreadId() nothrow @nogc 645 { 646 version(Windows) 647 { 648 return cast(size_t) GetCurrentThreadId(); 649 } 650 else version(Posix) 651 { 652 return cast(size_t)cast(void*)pthread_self(); 653 } 654 else 655 static assert(false); 656 } 657 658 unittest 659 { 660 import core.atomic; 661 import dplug.core.nogc; 662 663 struct A 664 { 665 ThreadPool _pool; 666 int _numThreads; 667 668 this(int numThreads, int maxThreads = 0, int stackSize = 0) 669 { 670 _pool = mallocNew!ThreadPool(numThreads, maxThreads, stackSize); 671 _numThreads = _pool.numThreads(); 672 } 673 674 ~this() 675 { 676 _pool.destroy(); 677 } 678 679 void launch(int count, bool async) nothrow @nogc 680 { 681 if (async) 682 { 683 _pool.parallelForAsync(count, &loopBody); 684 _pool.waitForCompletion(); 685 } 686 else 687 _pool.parallelFor(count, &loopBody); 688 } 689 690 void loopBody(int workItem, int threadIndex) nothrow @nogc 691 { 692 bool goodIndex = (threadIndex >= 0) && (threadIndex < _numThreads); 693 assert(goodIndex); 694 atomicOp!"+="(counter, 1); 695 } 696 697 shared(int) counter = 0; 698 } 699 700 foreach (numThreads; [0, 1, 2, 4, 8, 16, 32]) 701 { 702 auto a = A(numThreads); 703 a.launch(10, false); 704 assert(a.counter == 10); 705 706 a.launch(500, true); 707 assert(a.counter == 510); 708 709 a.launch(1, false); 710 assert(a.counter == 511); 711 712 a.launch(1, true); 713 assert(a.counter == 512); 714 715 a.launch(0, true); 716 assert(a.counter == 512); 717 a.launch(0, false); 718 assert(a.counter == 512); 719 } 720 } 721 722 // Bonus: Capacity to get the macOS version 723 724 version(Darwin) 725 { 726 727 // Note: .init value is a large future version (100.0.0), so that failure to detect version 728 // lead to newer behaviour. 729 struct MacOSVersion 730 { 731 int major = 100; // eg: major = 10 minor = 7 for 10.7 732 int minor = 0; 733 int patch = 0; 734 } 735 736 /// Get the macOS version we are running on. 737 /// Note: it only makes sense for macOS, not iOS. 738 /// Note: patch always return zero for now. 739 MacOSVersion getMacOSVersion() nothrow @nogc 740 { 741 char[256] str; 742 size_t size = 256; 743 int ret = sysctlbyname("kern.osrelease", str.ptr, &size, null, 0); 744 MacOSVersion result; 745 if (ret != 0) 746 return result; 747 int darwinMajor, darwinMinor, darwinPatch; 748 if (3 == sscanf(str.ptr, "%d.%d.%d", &darwinMajor, &darwinMinor, &darwinPatch)) 749 { 750 result.patch = 0; 751 752 switch(darwinMajor) 753 { 754 case 0: .. case 11: 755 result.major = 10; // 10.7 756 result.minor = 7; 757 break; 758 759 case 12: .. case 19: 760 result.major = 10; // 10.7 761 result.minor = darwinMajor - 4; // 10.8 to 10.15 762 break; 763 764 case 20: 765 result.major = 11; // Big Sur 766 result.minor = 0; 767 break; 768 769 case 21: 770 result.major = 12; // Monterey 771 result.minor = 0; 772 break; 773 774 775 default: 776 result.major = 100; 777 result.minor = 0; 778 } 779 } 780 return result; 781 } 782 783 /* unittest 784 { 785 MacOSVersion ver = getMacOSVersion(); 786 printf("Detected macOS %d.%d.%d\n", ver.major, ver.minor, ver.patch); 787 } */ 788 }