1 /** 2 Mutexes, semaphores and condition variables. 3 4 Copyright: Sean Kelly 2005 - 2009. 5 Copyright: Guillaume Piolat 2016 - 2018. 6 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 */ 8 // This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and 9 // Modified to make it @nogc nothrow 10 module dplug.core.sync; 11 12 import core.time; 13 import core.atomic; 14 15 import dplug.core.vec; 16 import dplug.core.nogc; 17 18 import core.stdc.stdio; 19 import core.stdc.stdlib: malloc, free; 20 21 // emulate conditions variable on Windows 22 // (this was originally in Phobos for XP compatibility) 23 version = emulateCondVarWin32; 24 25 version (OSX) 26 version = Darwin; 27 else version (iOS) 28 version = Darwin; 29 else version (TVOS) 30 version = Darwin; 31 else version (WatchOS) 32 version = Darwin; 33 34 version( Windows ) 35 { 36 import core.sys.windows.windef; 37 import core.sys.windows.winbase; 38 39 extern (Windows) nothrow @nogc 40 { 41 void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount); 42 } 43 } 44 else version( Darwin ) 45 { 46 import core.sys.posix.pthread; 47 import core.sync.config; 48 import core.stdc.errno; 49 import core.sys.posix.time; 50 static if (__VERSION__ < 2084) 51 import core.sys.osx.mach.semaphore; // was removed with DMDFE 2.084 52 else 53 import core.sys.darwin.mach.semaphore; 54 } 55 else version( Posix ) 56 { 57 import core.sync.config; 58 import core.stdc.errno; 59 import core.sys.posix.pthread; 60 import core.sys.posix.semaphore; 61 import core.sys.posix.time; 62 } 63 else 64 { 65 static assert(false, "Platform not supported"); 66 } 67 68 69 // 70 // MUTEX 71 // 72 73 /// Returns: A newly created `UnchekedMutex`. 74 UncheckedMutex makeMutex() nothrow @nogc 75 { 76 return UncheckedMutex(42); 77 } 78 79 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done 80 81 // Cargo-culting the spin-count in WTF::Lock 82 // See: https://webkit.org/blog/6161/locking-in-webkit/ 83 private enum CriticalSectionSpinCount = 40; 84 85 struct UncheckedMutex 86 { 87 nothrow @nogc: 88 89 private this(int dummyArg) 90 { 91 assert(!_created); 92 lazyThreadSafeInitialization(); 93 } 94 95 ~this() 96 { 97 // RACE CONDITION HERE 98 // TODO: not easy to make the mutex destruction thread-safe, because one of the 99 // thread must wait. Ignore that for now. 100 101 void* mutexHandle = atomicLoadCompat(_mutex); // Can be the mutex handle, or null. 102 103 if (mutexHandle !is null) 104 { 105 // Now, the mutex could have been destroyed by another thread already. 106 // Make a cas to ensure we are first to attempt it. 107 108 if (casCompat(&_mutex, &mutexHandle, null)) 109 { 110 destroyMutex(mutexHandle); 111 } 112 } 113 114 _created = 0; 115 } 116 117 @disable this(this); 118 119 /// Lock mutex, and create it in a thread-safe way if it doens't exist yet. 120 /// This is useful when globals access must be protected by a mutex. 121 void lockLazy() @trusted 122 { 123 lazyThreadSafeInitialization(); 124 lock(); 125 } 126 127 /// Lock mutex 128 void lock() 129 { 130 // No sync needed, else would have used lockLazy() 131 version( Windows ) 132 { 133 EnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ); 134 } 135 else version( Posix ) 136 { 137 138 assumeNothrowNoGC( 139 (pthread_mutex_t* handle) 140 { 141 int res = pthread_mutex_lock(handle); 142 if (res != 0) 143 assert(false); 144 })(handleAddr()); 145 } 146 } 147 148 // undocumented function for internal use 149 void unlock() 150 { 151 version( Windows ) 152 { 153 LeaveCriticalSection( cast(CRITICAL_SECTION*) _mutex ); 154 } 155 else version( Posix ) 156 { 157 assumeNothrowNoGC( 158 (pthread_mutex_t* handle) 159 { 160 int res = pthread_mutex_unlock(handle); 161 if (res != 0) 162 assert(false); 163 })(handleAddr()); 164 } 165 } 166 167 bool tryLock() 168 { 169 version( Windows ) 170 { 171 return TryEnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ) != 0; 172 } 173 else version( Posix ) 174 { 175 int result = assumeNothrowNoGC( 176 (pthread_mutex_t* handle) 177 { 178 return pthread_mutex_trylock(handle); 179 })(handleAddr()); 180 return result == 0; 181 } 182 } 183 184 private: 185 // on Windows, this is a CRITICAL_SECTION*. 186 // else, this is a pthread_mutex_t* 187 void* _mutex; 188 189 // Work-around for Issue 16636 190 // https://issues.dlang.org/show_bug.cgi?id=16636 191 // Still crash with LDC somehow 192 long _created = 0; 193 194 static void* createMutex() 195 { 196 version( Windows ) 197 { 198 CRITICAL_SECTION* critSec = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof); 199 InitializeCriticalSectionAndSpinCount( critSec, CriticalSectionSpinCount ); 200 return critSec; 201 } 202 else version( Posix ) 203 { 204 pthread_mutex_t* pmtx = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) ); 205 206 assumeNothrowNoGC( 207 (pthread_mutex_t* handle) 208 { 209 pthread_mutexattr_t attr = void; 210 pthread_mutexattr_init( &attr ); 211 pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); 212 213 version (Darwin) 214 { 215 // Note: disabled since this breaks thread pool. 216 /+ 217 // OSX mutexes are fair by default, but this has a cost for contended locks 218 // Disable fairness. 219 // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/ 220 enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2; 221 pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT); 222 +/ 223 } 224 225 pthread_mutex_init( handle, &attr ); 226 227 })(pmtx); 228 return pmtx; 229 } 230 else 231 static assert(false); 232 } 233 234 static void destroyMutex(void* mutex) 235 { 236 if (mutex is null) 237 return; 238 239 // TODO: this should be thread-safe, for example a cas with local variable name or _created 240 241 version( Windows ) 242 { 243 DeleteCriticalSection( cast(CRITICAL_SECTION*) mutex ); 244 free(mutex); 245 } 246 else version( Posix ) 247 { 248 assumeNothrowNoGC( 249 (pthread_mutex_t* handle) 250 { 251 pthread_mutex_destroy(handle); 252 })( cast(pthread_mutex_t*) mutex ); 253 alignedFree(mutex, PosixMutexAlignment); 254 } 255 else 256 static assert(false); 257 } 258 259 void lazyThreadSafeInitialization() @trusted 260 { 261 // Is there an existing mutex already? 262 if (atomicLoadCompat(_mutex) !is null) 263 return; 264 265 // Create one mutex. 266 void* mtx = createMutex(); 267 void* p = cast(void*)mtx; 268 void** here = &_mutex; 269 void* ifThis = null; 270 271 // Try to set _mutex. 272 if (!casCompat(here, &ifThis, p)) 273 { 274 // Another thread created _mutex first. Destroy our useless instance. 275 destroyMutex(mtx); 276 } 277 } 278 279 package: 280 version( Posix ) 281 { 282 pthread_mutex_t* handleAddr() nothrow @nogc 283 { 284 return cast(pthread_mutex_t*) _mutex; 285 } 286 } 287 private: 288 289 static void* atomicLoadCompat(ref void* p) 290 { 291 static if (__VERSION__ < 2094) 292 { 293 // old compiler, do it incorrectly 294 return p; 295 } 296 else 297 return atomicLoad(p); 298 } 299 300 static bool casCompat(void** here, 301 void** ifThis, 302 void* writeThis) 303 { 304 static if (__VERSION__ < 2094) 305 { 306 // old compiler, do it incorrectly 307 if (*here == *ifThis) 308 { 309 *here = writeThis; 310 return true; 311 } 312 else 313 return false; 314 } 315 else 316 { 317 return cas(here, ifThis, writeThis); 318 } 319 } 320 } 321 322 unittest 323 { 324 UncheckedMutex mutex = makeMutex(); 325 foreach(i; 0..100) 326 { 327 mutex.lock(); 328 mutex.unlock(); 329 330 if (mutex.tryLock) 331 mutex.unlock(); 332 } 333 mutex.destroy(); 334 } 335 336 unittest 337 { 338 __gshared UncheckedMutex mutex2; 339 mutex2.lockLazy(); 340 mutex2.unlock(); 341 } 342 343 344 345 // 346 // SEMAPHORE 347 // 348 349 /// Returns: A newly created `UncheckedSemaphore` 350 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc 351 { 352 return UncheckedSemaphore(count); 353 } 354 355 struct UncheckedSemaphore 356 { 357 private this( uint count ) nothrow @nogc 358 { 359 version( Windows ) 360 { 361 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 362 if( m_hndl == m_hndl.init ) 363 assert(false); 364 } 365 else version( Darwin ) 366 { 367 mach_port_t task = assumeNothrowNoGC( 368 () 369 { 370 return mach_task_self(); 371 })(); 372 373 kern_return_t rc = assumeNothrowNoGC( 374 (mach_port_t t, semaphore_t* handle, uint count) 375 { 376 return semaphore_create(t, handle, SYNC_POLICY_FIFO, count ); 377 })(task, &m_hndl, count); 378 379 if( rc ) 380 assert(false); 381 } 382 else version( Posix ) 383 { 384 int rc = sem_init( &m_hndl, 0, count ); 385 if( rc ) 386 assert(false); 387 } 388 _created = 1; 389 } 390 391 ~this() nothrow @nogc 392 { 393 if (_created) 394 { 395 version( Windows ) 396 { 397 BOOL rc = CloseHandle( m_hndl ); 398 assert( rc, "Unable to destroy semaphore" ); 399 } 400 else version( Darwin ) 401 { 402 mach_port_t task = assumeNothrowNoGC( 403 () 404 { 405 return mach_task_self(); 406 })(); 407 408 kern_return_t rc = assumeNothrowNoGC( 409 (mach_port_t t, semaphore_t handle) 410 { 411 return semaphore_destroy( t, handle ); 412 })(task, m_hndl); 413 414 assert( !rc, "Unable to destroy semaphore" ); 415 } 416 else version( Posix ) 417 { 418 int rc = sem_destroy( &m_hndl ); 419 assert( !rc, "Unable to destroy semaphore" ); 420 } 421 _created = 0; 422 } 423 } 424 425 @disable this(this); 426 427 void wait() nothrow @nogc 428 { 429 version( Windows ) 430 { 431 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 432 assert( rc == WAIT_OBJECT_0 ); 433 } 434 else version( Darwin ) 435 { 436 while( true ) 437 { 438 auto rc = assumeNothrowNoGC( 439 (semaphore_t handle) 440 { 441 return semaphore_wait(handle); 442 })(m_hndl); 443 if( !rc ) 444 return; 445 if( rc == KERN_ABORTED && errno == EINTR ) 446 continue; 447 assert(false); 448 } 449 } 450 else version( Posix ) 451 { 452 while( true ) 453 { 454 if (!assumeNothrowNoGC( 455 (sem_t* handle) 456 { 457 return sem_wait(handle); 458 })(&m_hndl)) 459 return; 460 if( errno != EINTR ) 461 assert(false); 462 } 463 } 464 } 465 466 bool wait( Duration period ) nothrow @nogc 467 { 468 assert( !period.isNegative ); 469 470 version( Windows ) 471 { 472 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 473 474 while( period > maxWaitMillis ) 475 { 476 auto rc = WaitForSingleObject( m_hndl, cast(uint) 477 maxWaitMillis.total!"msecs" ); 478 switch( rc ) 479 { 480 case WAIT_OBJECT_0: 481 return true; 482 case WAIT_TIMEOUT: 483 period -= maxWaitMillis; 484 continue; 485 default: 486 assert(false); 487 } 488 } 489 switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 490 { 491 case WAIT_OBJECT_0: 492 return true; 493 case WAIT_TIMEOUT: 494 return false; 495 default: 496 assert(false); 497 } 498 } 499 else version( Darwin ) 500 { 501 mach_timespec_t t = void; 502 (cast(byte*) &t)[0 .. t.sizeof] = 0; 503 504 if( period.total!"seconds" > t.tv_sec.max ) 505 { 506 t.tv_sec = t.tv_sec.max; 507 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 508 } 509 else 510 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 511 while( true ) 512 { 513 auto rc = assumeNothrowNoGC( 514 (semaphore_t handle, mach_timespec_t t) 515 { 516 return semaphore_timedwait(handle, t); 517 })(m_hndl, t); 518 if( !rc ) 519 return true; 520 if( rc == KERN_OPERATION_TIMED_OUT ) 521 return false; 522 if( rc != KERN_ABORTED || errno != EINTR ) 523 assert(false); 524 } 525 } 526 else version( Posix ) 527 { 528 timespec t; 529 530 assumeNothrowNoGC( 531 (ref timespec t, Duration period) 532 { 533 mktspec( t, period ); 534 })(t, period); 535 536 while( true ) 537 { 538 if (! ((sem_t* handle, timespec* t) 539 { 540 return sem_timedwait(handle, t); 541 })(&m_hndl, &t)) 542 return true; 543 if( errno == ETIMEDOUT ) 544 return false; 545 if( errno != EINTR ) 546 assert(false); 547 } 548 } 549 } 550 551 void notify() nothrow @nogc 552 { 553 version( Windows ) 554 { 555 if( !ReleaseSemaphore( m_hndl, 1, null ) ) 556 assert(false); 557 } 558 else version( Darwin ) 559 { 560 auto rc = assumeNothrowNoGC( 561 (semaphore_t handle) 562 { 563 return semaphore_signal(handle); 564 })(m_hndl); 565 if( rc ) 566 assert(false); 567 } 568 else version( Posix ) 569 { 570 int rc = sem_post( &m_hndl ); 571 if( rc ) 572 assert(false); 573 } 574 } 575 576 bool tryWait() nothrow @nogc 577 { 578 version( Windows ) 579 { 580 switch( WaitForSingleObject( m_hndl, 0 ) ) 581 { 582 case WAIT_OBJECT_0: 583 return true; 584 case WAIT_TIMEOUT: 585 return false; 586 default: 587 assert(false); 588 } 589 } 590 else version( Darwin ) 591 { 592 return wait( dur!"hnsecs"(0) ); 593 } 594 else version( Posix ) 595 { 596 while( true ) 597 { 598 if( !sem_trywait( &m_hndl ) ) 599 return true; 600 if( errno == EAGAIN ) 601 return false; 602 if( errno != EINTR ) 603 assert(false); 604 } 605 } 606 } 607 608 609 private: 610 version( Windows ) 611 { 612 HANDLE m_hndl; 613 } 614 else version( Darwin ) 615 { 616 semaphore_t m_hndl; 617 } 618 else version( Posix ) 619 { 620 sem_t m_hndl; 621 } 622 ulong _created = 0; 623 } 624 625 626 unittest 627 { 628 foreach(j; 0..4) 629 { 630 UncheckedSemaphore semaphore = makeSemaphore(1); 631 foreach(i; 0..100) 632 { 633 semaphore.wait(); 634 semaphore.notify(); 635 if (semaphore.tryWait()) 636 semaphore.notify(); 637 } 638 } 639 } 640 641 642 643 // 644 // CONDITION VARIABLE 645 // 646 647 648 ConditionVariable makeConditionVariable() nothrow @nogc 649 { 650 return ConditionVariable(42); 651 } 652 653 /** 654 * This struct represents a condition variable as conceived by C.A.R. Hoare. As 655 * per Mesa type monitors however, "signal" has been replaced with "notify" to 656 * indicate that control is not transferred to the waiter when a notification 657 * is sent. 658 */ 659 struct ConditionVariable 660 { 661 public: 662 nothrow: 663 @nogc: 664 665 /// Initializes a condition variable. 666 this(int dummy) 667 { 668 version( Windows ) 669 { 670 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 671 if( m_blockLock == m_blockLock.init ) 672 assert(false); 673 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 674 if( m_blockQueue == m_blockQueue.init ) 675 assert(false); 676 677 m_unblockLock = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof); 678 InitializeCriticalSectionAndSpinCount( m_unblockLock, CriticalSectionSpinCount ); 679 } 680 else version( Posix ) 681 { 682 _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) ); 683 684 int rc = pthread_cond_init( handleAddr(), null ); 685 if( rc ) 686 assert(false); 687 } 688 } 689 690 691 ~this() 692 { 693 version( Windows ) 694 { 695 CloseHandle( m_blockLock ); 696 CloseHandle( m_blockQueue ); 697 if (m_unblockLock !is null) 698 { 699 DeleteCriticalSection( m_unblockLock ); 700 free(m_unblockLock); 701 m_unblockLock = null; 702 } 703 } 704 else version( Posix ) 705 { 706 if (_handle !is null) 707 { 708 int rc = pthread_cond_destroy( handleAddr() ); 709 assert( !rc, "Unable to destroy condition" ); 710 alignedFree(_handle, PosixMutexAlignment); 711 _handle = null; 712 } 713 } 714 } 715 716 /// Wait until notified. 717 /// The associated mutex should always be the same for this condition variable. 718 void wait(UncheckedMutex* assocMutex) 719 { 720 version( Windows ) 721 { 722 timedWait( INFINITE, assocMutex ); 723 } 724 else version( Posix ) 725 { 726 int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() ); 727 if( rc ) 728 assert(false); 729 } 730 } 731 732 /// Notifies one waiter. 733 void notifyOne() 734 { 735 version( Windows ) 736 { 737 notifyImpl( false ); 738 } 739 else version( Posix ) 740 { 741 int rc = pthread_cond_signal( handleAddr() ); 742 if( rc ) 743 assert(false); 744 } 745 } 746 747 748 /// Notifies all waiters. 749 void notifyAll() 750 { 751 version( Windows ) 752 { 753 notifyImpl( true ); 754 } 755 else version( Posix ) 756 { 757 int rc = pthread_cond_broadcast( handleAddr() ); 758 if( rc ) 759 assert(false); 760 } 761 } 762 763 version(Posix) 764 { 765 pthread_cond_t* handleAddr() nothrow @nogc 766 { 767 return _handle; 768 } 769 } 770 771 772 private: 773 version( Windows ) 774 { 775 bool timedWait( DWORD timeout, UncheckedMutex* assocMutex ) 776 { 777 int numSignalsLeft; 778 int numWaitersGone; 779 DWORD rc; 780 781 rc = WaitForSingleObject( m_blockLock, INFINITE ); 782 assert( rc == WAIT_OBJECT_0 ); 783 784 m_numWaitersBlocked++; 785 786 rc = ReleaseSemaphore( m_blockLock, 1, null ); 787 assert( rc ); 788 789 assocMutex.unlock(); 790 791 rc = WaitForSingleObject( m_blockQueue, timeout ); 792 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 793 bool timedOut = (rc == WAIT_TIMEOUT); 794 795 EnterCriticalSection( m_unblockLock ); 796 797 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 798 { 799 if ( timedOut ) 800 { 801 // timeout (or canceled) 802 if( m_numWaitersBlocked != 0 ) 803 { 804 m_numWaitersBlocked--; 805 // do not unblock next waiter below (already unblocked) 806 numSignalsLeft = 0; 807 } 808 else 809 { 810 // spurious wakeup pending!! 811 m_numWaitersGone = 1; 812 } 813 } 814 if( --m_numWaitersToUnblock == 0 ) 815 { 816 if( m_numWaitersBlocked != 0 ) 817 { 818 // open the gate 819 rc = ReleaseSemaphore( m_blockLock, 1, null ); 820 assert( rc ); 821 // do not open the gate below again 822 numSignalsLeft = 0; 823 } 824 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 825 { 826 m_numWaitersGone = 0; 827 } 828 } 829 } 830 else if( ++m_numWaitersGone == int.max / 2 ) 831 { 832 // timeout/canceled or spurious event :-) 833 rc = WaitForSingleObject( m_blockLock, INFINITE ); 834 assert( rc == WAIT_OBJECT_0 ); 835 // something is going on here - test of timeouts? 836 m_numWaitersBlocked -= m_numWaitersGone; 837 rc = ReleaseSemaphore( m_blockLock, 1, null ); 838 assert( rc == WAIT_OBJECT_0 ); 839 m_numWaitersGone = 0; 840 } 841 842 LeaveCriticalSection( m_unblockLock ); 843 844 if( numSignalsLeft == 1 ) 845 { 846 // better now than spurious later (same as ResetEvent) 847 for( ; numWaitersGone > 0; --numWaitersGone ) 848 { 849 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 850 assert( rc == WAIT_OBJECT_0 ); 851 } 852 // open the gate 853 rc = ReleaseSemaphore( m_blockLock, 1, null ); 854 assert( rc ); 855 } 856 else if( numSignalsLeft != 0 ) 857 { 858 // unblock next waiter 859 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 860 assert( rc ); 861 } 862 assocMutex.lock(); 863 return !timedOut; 864 } 865 866 867 void notifyImpl( bool all ) 868 { 869 DWORD rc; 870 871 EnterCriticalSection( m_unblockLock ); 872 873 if( m_numWaitersToUnblock != 0 ) 874 { 875 if( m_numWaitersBlocked == 0 ) 876 { 877 LeaveCriticalSection( m_unblockLock ); 878 return; 879 } 880 if( all ) 881 { 882 m_numWaitersToUnblock += m_numWaitersBlocked; 883 m_numWaitersBlocked = 0; 884 } 885 else 886 { 887 m_numWaitersToUnblock++; 888 m_numWaitersBlocked--; 889 } 890 LeaveCriticalSection( m_unblockLock ); 891 } 892 else if( m_numWaitersBlocked > m_numWaitersGone ) 893 { 894 rc = WaitForSingleObject( m_blockLock, INFINITE ); 895 assert( rc == WAIT_OBJECT_0 ); 896 if( 0 != m_numWaitersGone ) 897 { 898 m_numWaitersBlocked -= m_numWaitersGone; 899 m_numWaitersGone = 0; 900 } 901 if( all ) 902 { 903 m_numWaitersToUnblock = m_numWaitersBlocked; 904 m_numWaitersBlocked = 0; 905 } 906 else 907 { 908 m_numWaitersToUnblock = 1; 909 m_numWaitersBlocked--; 910 } 911 LeaveCriticalSection( m_unblockLock ); 912 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 913 assert( rc ); 914 } 915 else 916 { 917 LeaveCriticalSection( m_unblockLock ); 918 } 919 } 920 921 922 // NOTE: This implementation uses Algorithm 8c as described here: 923 // http://groups.google.com/group/comp.programming.threads/ 924 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 925 HANDLE m_blockLock; // auto-reset event (now semaphore) 926 HANDLE m_blockQueue; // auto-reset event (now semaphore) 927 CRITICAL_SECTION* m_unblockLock = null; // internal mutex/CS 928 int m_numWaitersGone = 0; 929 int m_numWaitersBlocked = 0; 930 int m_numWaitersToUnblock = 0; 931 } 932 else version( Posix ) 933 { 934 pthread_cond_t* _handle; 935 } 936 } 937 938 unittest 939 { 940 import dplug.core.thread; 941 942 auto mutex = makeMutex(); 943 auto condvar = makeConditionVariable(); 944 945 bool finished = false; 946 947 // Launch a thread that wait on this condition 948 Thread t = launchInAThread( 949 () { 950 mutex.lock(); 951 while(!finished) 952 condvar.wait(&mutex); 953 mutex.unlock(); 954 }); 955 956 // Notify termination 957 mutex.lock(); 958 finished = true; 959 mutex.unlock(); 960 condvar.notifyOne(); 961 962 t.join(); 963 }