1 /**
2 Mutexes, semaphores and condition variables.
3 
4 Copyright: Sean Kelly 2005 - 2009.
5 Copyright: Guillaume Piolat 2016 - 2018.
6 License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7 */
8 // This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and
9 // Modified to make it @nogc nothrow
10 module dplug.core.sync;
11 
12 import core.time;
13 import core.atomic;
14 
15 import dplug.core.vec;
16 import dplug.core.nogc;
17 
18 import core.stdc.stdio;
19 import core.stdc.stdlib: malloc, free;
20 
21 // emulate conditions variable on Windows
22 // (this was originally in Phobos for XP compatibility)
23 version = emulateCondVarWin32;
24 
25 version (OSX)
26     version = Darwin;
27 else version (iOS)
28     version = Darwin;
29 else version (TVOS)
30     version = Darwin;
31 else version (WatchOS)
32     version = Darwin;
33 
34 version( Windows )
35 {
36     import core.sys.windows.windef;
37     import core.sys.windows.winbase;
38 
39     extern (Windows) nothrow @nogc
40     {
41         void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount);
42     }
43 }
44 else version( Darwin )
45 {
46     import core.sys.posix.pthread;
47     import core.sync.config;
48     import core.stdc.errno;
49     import core.sys.posix.time;
50     static if (__VERSION__ < 2084)
51         import core.sys.osx.mach.semaphore; // was removed with DMDFE 2.084
52     else 
53         import core.sys.darwin.mach.semaphore;
54 }
55 else version( Posix )
56 {
57     import core.sync.config;
58     import core.stdc.errno;
59     import core.sys.posix.pthread;
60     import core.sys.posix.semaphore;
61     import core.sys.posix.time;
62 }
63 else
64 {
65     static assert(false, "Platform not supported");
66 }
67 
68 
69 //
70 // MUTEX
71 //
72 
73 /// Returns: A newly created `UnchekedMutex`.
74 UncheckedMutex makeMutex() nothrow @nogc
75 {
76     return UncheckedMutex(42);
77 }
78 
79 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done
80 
81 // Cargo-culting the spin-count in WTF::Lock
82 // See: https://webkit.org/blog/6161/locking-in-webkit/
83 private enum CriticalSectionSpinCount = 40;
84 
85 struct UncheckedMutex
86 {
87 nothrow @nogc:
88 
89     private this(int dummyArg)
90     {
91         assert(!_created);
92         lazyThreadSafeInitialization();
93     }
94 
95     ~this()
96     {
97         // RACE CONDITION HERE
98         // TODO: not easy to make the mutex destruction thread-safe, because one of the
99         // thread must wait. Ignore that for now.
100 
101         void* mutexHandle = atomicLoadCompat(_mutex); // Can be the mutex handle, or null.
102 
103         if (mutexHandle !is null)
104         {
105             // Now, the mutex could have been destroyed by another thread already.
106             // Make a cas to ensure we are first to attempt it.
107 
108             if (casCompat(&_mutex, &mutexHandle, null))
109             {
110                 destroyMutex(mutexHandle);
111             }
112         }
113 
114         _created = 0;
115     }
116 
117     @disable this(this);
118 
119     /// Lock mutex, and create it in a thread-safe way if it doens't exist yet.
120     /// This is useful when globals access must be protected by a mutex.
121     void lockLazy() @trusted
122     {
123         lazyThreadSafeInitialization();
124         lock();
125     }
126 
127     /// Lock mutex
128     void lock()
129     {
130         // No sync needed, else would have used lockLazy()
131         version( Windows )
132         {
133             EnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ); 
134         }
135         else version( Posix )
136         {
137             
138             assumeNothrowNoGC(
139                 (pthread_mutex_t* handle)
140                 {
141                     int res = pthread_mutex_lock(handle);
142                     if (res != 0)
143                         assert(false);
144                 })(handleAddr());
145         }
146     }
147 
148     // undocumented function for internal use
149     void unlock()
150     {
151         version( Windows )
152         {
153             LeaveCriticalSection( cast(CRITICAL_SECTION*) _mutex );
154         }
155         else version( Posix )
156         {
157             assumeNothrowNoGC(
158                 (pthread_mutex_t* handle)
159                 {
160                     int res = pthread_mutex_unlock(handle);
161                     if (res != 0)
162                         assert(false);
163                 })(handleAddr());
164         }
165     }
166 
167     bool tryLock()
168     {
169         version( Windows )
170         {
171             return TryEnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ) != 0;
172         }
173         else version( Posix )
174         {
175             int result = assumeNothrowNoGC(
176                 (pthread_mutex_t* handle)
177                 {
178                     return pthread_mutex_trylock(handle);
179                 })(handleAddr());
180             return result == 0;
181         }
182     }
183 
184 private:
185     // on Windows, this is a CRITICAL_SECTION*.
186     // else, this is a pthread_mutex_t*
187     void* _mutex;
188 
189     // Work-around for Issue 16636
190     // https://issues.dlang.org/show_bug.cgi?id=16636
191     // Still crash with LDC somehow
192     long _created = 0;
193 
194     static void* createMutex()
195     {
196         version( Windows )
197         {
198             CRITICAL_SECTION* critSec = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof);
199             InitializeCriticalSectionAndSpinCount( critSec, CriticalSectionSpinCount );
200             return critSec;
201         }
202         else version( Posix )
203         {
204             pthread_mutex_t* pmtx = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) );
205 
206             assumeNothrowNoGC(
207                 (pthread_mutex_t* handle)
208                 {
209                     pthread_mutexattr_t attr = void;
210                     pthread_mutexattr_init( &attr );
211                     pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE );
212 
213                     version (Darwin)
214                     {
215                         // Note: disabled since this breaks thread pool.
216                         /+
217                             // OSX mutexes are fair by default, but this has a cost for contended locks
218                             // Disable fairness.
219                             // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/
220                             enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2;
221                             pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT);
222                         +/
223                     }
224 
225                     pthread_mutex_init( handle, &attr );
226 
227                 })(pmtx);
228                 return pmtx;
229         }
230         else
231             static assert(false);
232     }
233 
234     static void destroyMutex(void* mutex)
235     {
236         if (mutex is null)
237             return;
238 
239         // TODO: this should be thread-safe, for example a cas with local variable name or _created
240 
241         version( Windows )
242         {
243             DeleteCriticalSection( cast(CRITICAL_SECTION*) mutex );
244             free(mutex);
245         }
246         else version( Posix )
247         {
248             assumeNothrowNoGC(
249                 (pthread_mutex_t* handle)
250                 {
251                     pthread_mutex_destroy(handle);
252                 })( cast(pthread_mutex_t*) mutex );
253             alignedFree(mutex, PosixMutexAlignment);
254         }
255         else
256             static assert(false);
257     }
258 
259     void lazyThreadSafeInitialization() @trusted
260     {
261         // Is there an existing mutex already?
262         if (atomicLoadCompat(_mutex) !is null)
263             return;
264 
265         // Create one mutex.
266         void* mtx = createMutex();
267         void* p = cast(void*)mtx;
268         void** here = &_mutex;
269         void* ifThis = null;
270 
271         // Try to set _mutex.
272         if (!casCompat(here, &ifThis, p))
273         {
274             // Another thread created _mutex first. Destroy our useless instance.
275             destroyMutex(mtx);
276         }
277     }
278 
279 package:
280     version( Posix )
281     {
282         pthread_mutex_t* handleAddr() nothrow @nogc
283         {
284             return cast(pthread_mutex_t*) _mutex;
285         }
286     }
287 private:
288 
289     static void* atomicLoadCompat(ref void* p)
290     {
291         static if (__VERSION__ < 2094)
292         {
293             // old compiler, do it incorrectly
294             return p;
295         }
296         else
297             return atomicLoad(p);        
298     }
299     
300     static bool casCompat(void** here,
301                           void** ifThis,
302                           void* writeThis)
303     {
304         static if (__VERSION__ < 2094)
305         {
306             // old compiler, do it incorrectly
307             if (*here == *ifThis)
308             {
309                 *here = writeThis;
310                 return true;                
311             }
312             else 
313                 return false;
314         }
315         else
316         {
317             return cas(here, ifThis, writeThis);
318         }
319     }
320 }
321 
322 unittest
323 {
324     UncheckedMutex mutex = makeMutex();
325     foreach(i; 0..100)
326     {
327         mutex.lock();
328         mutex.unlock();
329 
330         if (mutex.tryLock)
331             mutex.unlock();
332     }
333     mutex.destroy();
334 }
335 
336 unittest
337 {
338     __gshared UncheckedMutex mutex2;
339     mutex2.lockLazy();
340     mutex2.unlock();
341 }
342 
343 
344 
345 //
346 // SEMAPHORE
347 //
348 
349 /// Returns: A newly created `UncheckedSemaphore`
350 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc
351 {
352     return UncheckedSemaphore(count);
353 }
354 
355 struct UncheckedSemaphore
356 {
357     private this( uint count ) nothrow @nogc
358     {
359         version( Windows )
360         {
361             m_hndl = CreateSemaphoreA( null, count, int.max, null );
362             if( m_hndl == m_hndl.init )
363                 assert(false);
364         }
365         else version( Darwin )
366         {
367             mach_port_t task = assumeNothrowNoGC(
368                 ()
369                 {
370                     return mach_task_self();
371                 })();
372 
373             kern_return_t rc = assumeNothrowNoGC(
374                 (mach_port_t t, semaphore_t* handle, uint count)
375                 {
376                     return semaphore_create(t, handle, SYNC_POLICY_FIFO, count );
377                 })(task, &m_hndl, count);
378 
379             if( rc )
380                  assert(false);
381         }
382         else version( Posix )
383         {
384             int rc = sem_init( &m_hndl, 0, count );
385             if( rc )
386                 assert(false);
387         }
388         _created = 1;
389     }
390 
391     ~this() nothrow @nogc
392     {
393         if (_created)
394         {
395             version( Windows )
396             {
397                 BOOL rc = CloseHandle( m_hndl );
398                 assert( rc, "Unable to destroy semaphore" );
399             }
400             else version( Darwin )
401             {
402                 mach_port_t task = assumeNothrowNoGC(
403                     ()
404                     {
405                         return mach_task_self();
406                     })();
407 
408                 kern_return_t rc = assumeNothrowNoGC(
409                     (mach_port_t t, semaphore_t handle)
410                     {
411                         return semaphore_destroy( t, handle );
412                     })(task, m_hndl);
413 
414                 assert( !rc, "Unable to destroy semaphore" );
415             }
416             else version( Posix )
417             {
418                 int rc = sem_destroy( &m_hndl );
419                 assert( !rc, "Unable to destroy semaphore" );
420             }
421             _created = 0;
422         }
423     }
424 
425     @disable this(this);
426 
427     void wait() nothrow @nogc
428     {
429         version( Windows )
430         {
431             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
432             assert( rc == WAIT_OBJECT_0 );
433         }
434         else version( Darwin )
435         {
436             while( true )
437             {
438                 auto rc = assumeNothrowNoGC(
439                     (semaphore_t handle)
440                     {
441                         return semaphore_wait(handle);
442                     })(m_hndl);
443                 if( !rc )
444                     return;
445                 if( rc == KERN_ABORTED && errno == EINTR )
446                     continue;
447                 assert(false);
448             }
449         }
450         else version( Posix )
451         {
452             while( true )
453             {
454                 if (!assumeNothrowNoGC(
455                     (sem_t* handle)
456                     {
457                         return sem_wait(handle);
458                     })(&m_hndl))
459                     return;
460                 if( errno != EINTR )
461                     assert(false);
462             }
463         }
464     }
465 
466     bool wait( Duration period ) nothrow @nogc
467     {
468         assert( !period.isNegative );
469 
470         version( Windows )
471         {
472             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
473 
474             while( period > maxWaitMillis )
475             {
476                 auto rc = WaitForSingleObject( m_hndl, cast(uint)
477                                                maxWaitMillis.total!"msecs" );
478                 switch( rc )
479                 {
480                     case WAIT_OBJECT_0:
481                         return true;
482                     case WAIT_TIMEOUT:
483                         period -= maxWaitMillis;
484                         continue;
485                     default:
486                          assert(false);
487                 }
488             }
489             switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
490             {
491                 case WAIT_OBJECT_0:
492                     return true;
493                 case WAIT_TIMEOUT:
494                     return false;
495                 default:
496                     assert(false);
497             }
498         }
499         else version( Darwin )
500         {
501             mach_timespec_t t = void;
502             (cast(byte*) &t)[0 .. t.sizeof] = 0;
503 
504             if( period.total!"seconds" > t.tv_sec.max )
505             {
506                 t.tv_sec  = t.tv_sec.max;
507                 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
508             }
509             else
510                 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
511             while( true )
512             {
513                 auto rc = assumeNothrowNoGC(
514                             (semaphore_t handle, mach_timespec_t t)
515                             {
516                                 return semaphore_timedwait(handle, t);
517                             })(m_hndl, t);
518                 if( !rc )
519                     return true;
520                 if( rc == KERN_OPERATION_TIMED_OUT )
521                     return false;
522                 if( rc != KERN_ABORTED || errno != EINTR )
523                      assert(false);
524             }
525         }
526         else version( Posix )
527         {
528             timespec t;
529 
530             assumeNothrowNoGC(
531                 (ref timespec t, Duration period)
532                 {
533                     mktspec( t, period );
534                 })(t, period);
535 
536             while( true )
537             {
538                 if (! ((sem_t* handle, timespec* t)
539                        {
540                             return sem_timedwait(handle, t);
541                        })(&m_hndl, &t))
542                     return true;
543                 if( errno == ETIMEDOUT )
544                     return false;
545                 if( errno != EINTR )
546                     assert(false);
547             }
548         }
549     }
550 
551     void notify()  nothrow @nogc
552     {
553         version( Windows )
554         {
555             if( !ReleaseSemaphore( m_hndl, 1, null ) )
556                 assert(false);
557         }
558         else version( Darwin )
559         {
560            auto rc = assumeNothrowNoGC(
561                         (semaphore_t handle)
562                         {
563                             return semaphore_signal(handle);
564                         })(m_hndl);
565             if( rc )
566                 assert(false);
567         }
568         else version( Posix )
569         {
570             int rc = sem_post( &m_hndl );
571             if( rc )
572                 assert(false);
573         }
574     }
575 
576     bool tryWait() nothrow @nogc
577     {
578         version( Windows )
579         {
580             switch( WaitForSingleObject( m_hndl, 0 ) )
581             {
582                 case WAIT_OBJECT_0:
583                     return true;
584                 case WAIT_TIMEOUT:
585                     return false;
586                 default:
587                     assert(false);
588             }
589         }
590         else version( Darwin )
591         {
592             return wait( dur!"hnsecs"(0) );
593         }
594         else version( Posix )
595         {
596             while( true )
597             {
598                 if( !sem_trywait( &m_hndl ) )
599                     return true;
600                 if( errno == EAGAIN )
601                     return false;
602                 if( errno != EINTR )
603                     assert(false);
604             }
605         }
606     }
607 
608 
609 private:
610     version( Windows )
611     {
612         HANDLE  m_hndl;
613     }
614     else version( Darwin )
615     {
616         semaphore_t m_hndl;
617     }
618     else version( Posix )
619     {
620         sem_t   m_hndl;
621     }
622     ulong _created = 0;
623 }
624 
625 
626 unittest
627 {
628     foreach(j; 0..4)
629     {
630         UncheckedSemaphore semaphore = makeSemaphore(1);
631         foreach(i; 0..100)
632         {
633             semaphore.wait();
634             semaphore.notify();
635             if (semaphore.tryWait())
636                 semaphore.notify();
637         }
638     }
639 }
640 
641 
642 
643 //
644 // CONDITION VARIABLE
645 //
646 
647 
648 ConditionVariable makeConditionVariable() nothrow @nogc
649 {
650     return ConditionVariable(42);
651 }
652 
653 /**
654 * This struct represents a condition variable as conceived by C.A.R. Hoare.  As
655 * per Mesa type monitors however, "signal" has been replaced with "notify" to
656 * indicate that control is not transferred to the waiter when a notification
657 * is sent.
658 */
659 struct ConditionVariable
660 {
661 public:
662 nothrow:
663 @nogc:
664 
665     /// Initializes a condition variable.
666     this(int dummy)
667     {
668         version( Windows )
669         {
670             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
671             if( m_blockLock == m_blockLock.init )
672                 assert(false);
673             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
674             if( m_blockQueue == m_blockQueue.init )
675                 assert(false);
676 
677             m_unblockLock = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof);
678             InitializeCriticalSectionAndSpinCount( m_unblockLock, CriticalSectionSpinCount );
679         }
680         else version( Posix )
681         {
682             _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) );
683 
684             int rc = pthread_cond_init( handleAddr(), null );
685             if( rc )
686                 assert(false);
687         }
688     }
689 
690 
691     ~this()
692     {
693         version( Windows )
694         {
695             CloseHandle( m_blockLock );
696             CloseHandle( m_blockQueue );
697             if (m_unblockLock !is null)
698             {
699                 DeleteCriticalSection( m_unblockLock );
700                 free(m_unblockLock);
701                 m_unblockLock = null;
702             }
703         }
704         else version( Posix )
705         {
706             if (_handle !is null)
707             {
708                 int rc = pthread_cond_destroy( handleAddr() );
709                 assert( !rc, "Unable to destroy condition" );
710                 alignedFree(_handle, PosixMutexAlignment);
711                 _handle = null;
712             }
713         }
714     }
715 
716     /// Wait until notified.
717     /// The associated mutex should always be the same for this condition variable.
718     void wait(UncheckedMutex* assocMutex)
719     {
720         version( Windows )
721         {
722             timedWait( INFINITE, assocMutex );
723         }
724         else version( Posix )
725         {
726             int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() );
727             if( rc )
728                 assert(false);
729         }
730     }
731 
732     /// Notifies one waiter.
733     void notifyOne()
734     {
735         version( Windows )
736         {
737             notifyImpl( false );
738         }
739         else version( Posix )
740         {
741             int rc = pthread_cond_signal( handleAddr() );
742             if( rc )
743                 assert(false);
744         }
745     }
746 
747 
748     /// Notifies all waiters.
749     void notifyAll()
750     {
751         version( Windows )
752         {
753             notifyImpl( true );
754         }
755         else version( Posix )
756         {
757             int rc = pthread_cond_broadcast( handleAddr() );
758             if( rc )
759                 assert(false);
760         }
761     }
762 
763     version(Posix)
764     {
765         pthread_cond_t* handleAddr() nothrow @nogc
766         {
767             return _handle;
768         }
769     }
770 
771 
772 private:
773     version( Windows )
774     {
775         bool timedWait( DWORD timeout, UncheckedMutex* assocMutex )
776         {
777             int   numSignalsLeft;
778             int   numWaitersGone;
779             DWORD rc;
780 
781             rc = WaitForSingleObject( m_blockLock, INFINITE );
782             assert( rc == WAIT_OBJECT_0 );
783 
784             m_numWaitersBlocked++;
785 
786             rc = ReleaseSemaphore( m_blockLock, 1, null );
787             assert( rc );
788 
789             assocMutex.unlock();
790 
791             rc = WaitForSingleObject( m_blockQueue, timeout );
792             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
793             bool timedOut = (rc == WAIT_TIMEOUT);
794 
795             EnterCriticalSection( m_unblockLock );
796 
797             if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
798             {
799                 if ( timedOut )
800                 {
801                     // timeout (or canceled)
802                     if( m_numWaitersBlocked != 0 )
803                     {
804                         m_numWaitersBlocked--;
805                         // do not unblock next waiter below (already unblocked)
806                         numSignalsLeft = 0;
807                     }
808                     else
809                     {
810                         // spurious wakeup pending!!
811                         m_numWaitersGone = 1;
812                     }
813                 }
814                 if( --m_numWaitersToUnblock == 0 )
815                 {
816                     if( m_numWaitersBlocked != 0 )
817                     {
818                         // open the gate
819                         rc = ReleaseSemaphore( m_blockLock, 1, null );
820                         assert( rc );
821                         // do not open the gate below again
822                         numSignalsLeft = 0;
823                     }
824                     else if( (numWaitersGone = m_numWaitersGone) != 0 )
825                     {
826                         m_numWaitersGone = 0;
827                     }
828                 }
829             }
830             else if( ++m_numWaitersGone == int.max / 2 )
831             {
832                 // timeout/canceled or spurious event :-)
833                 rc = WaitForSingleObject( m_blockLock, INFINITE );
834                 assert( rc == WAIT_OBJECT_0 );
835                 // something is going on here - test of timeouts?
836                 m_numWaitersBlocked -= m_numWaitersGone;
837                 rc = ReleaseSemaphore( m_blockLock, 1, null );
838                 assert( rc == WAIT_OBJECT_0 );
839                 m_numWaitersGone = 0;
840             }
841 
842             LeaveCriticalSection( m_unblockLock );
843 
844             if( numSignalsLeft == 1 )
845             {
846                 // better now than spurious later (same as ResetEvent)
847                 for( ; numWaitersGone > 0; --numWaitersGone )
848                 {
849                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
850                     assert( rc == WAIT_OBJECT_0 );
851                 }
852                 // open the gate
853                 rc = ReleaseSemaphore( m_blockLock, 1, null );
854                 assert( rc );
855             }
856             else if( numSignalsLeft != 0 )
857             {
858                 // unblock next waiter
859                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
860                 assert( rc );
861             }
862             assocMutex.lock();
863             return !timedOut;
864         }
865 
866 
867         void notifyImpl( bool all )
868         {
869             DWORD rc;
870 
871             EnterCriticalSection( m_unblockLock );
872 
873             if( m_numWaitersToUnblock != 0 )
874             {
875                 if( m_numWaitersBlocked == 0 )
876                 {
877                     LeaveCriticalSection( m_unblockLock );
878                     return;
879                 }
880                 if( all )
881                 {
882                     m_numWaitersToUnblock += m_numWaitersBlocked;
883                     m_numWaitersBlocked = 0;
884                 }
885                 else
886                 {
887                     m_numWaitersToUnblock++;
888                     m_numWaitersBlocked--;
889                 }
890                 LeaveCriticalSection( m_unblockLock );
891             }
892             else if( m_numWaitersBlocked > m_numWaitersGone )
893             {
894                 rc = WaitForSingleObject( m_blockLock, INFINITE );
895                 assert( rc == WAIT_OBJECT_0 );
896                 if( 0 != m_numWaitersGone )
897                 {
898                     m_numWaitersBlocked -= m_numWaitersGone;
899                     m_numWaitersGone = 0;
900                 }
901                 if( all )
902                 {
903                     m_numWaitersToUnblock = m_numWaitersBlocked;
904                     m_numWaitersBlocked = 0;
905                 }
906                 else
907                 {
908                     m_numWaitersToUnblock = 1;
909                     m_numWaitersBlocked--;
910                 }
911                 LeaveCriticalSection( m_unblockLock );
912                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
913                 assert( rc );
914             }
915             else
916             {
917                 LeaveCriticalSection( m_unblockLock );
918             }
919         }
920 
921 
922         // NOTE: This implementation uses Algorithm 8c as described here:
923         //       http://groups.google.com/group/comp.programming.threads/
924         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
925         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
926         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
927         CRITICAL_SECTION*   m_unblockLock           = null;  // internal mutex/CS
928         int                 m_numWaitersGone        = 0;
929         int                 m_numWaitersBlocked     = 0;
930         int                 m_numWaitersToUnblock   = 0;
931     }
932     else version( Posix )
933     {
934         pthread_cond_t*     _handle;
935     }
936 }
937 
938 unittest
939 {
940     import dplug.core.thread;
941 
942     auto mutex = makeMutex();
943     auto condvar = makeConditionVariable();
944 
945     bool finished = false;
946 
947     // Launch a thread that wait on this condition
948     Thread t = launchInAThread(
949         () {
950             mutex.lock();
951             while(!finished)
952                 condvar.wait(&mutex);
953             mutex.unlock();
954         });
955 
956     // Notify termination
957     mutex.lock();
958         finished = true;
959     mutex.unlock();
960     condvar.notifyOne();
961 
962     t.join();
963 }