iostat - add read/write details to output
[dragonfly.git] / contrib / bind / lib / isc / rwlock.c
1 /*
2  * Copyright (C) 2004, 2005, 2007, 2009  Internet Systems Consortium, Inc. ("ISC")
3  * Copyright (C) 1998-2001, 2003  Internet Software Consortium.
4  *
5  * Permission to use, copy, modify, and/or distribute this software for any
6  * purpose with or without fee is hereby granted, provided that the above
7  * copyright notice and this permission notice appear in all copies.
8  *
9  * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
10  * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
11  * AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
12  * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13  * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
14  * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15  * PERFORMANCE OF THIS SOFTWARE.
16  */
17
18 /* $Id: rwlock.c,v 1.44.128.2 2009/01/19 23:47:03 tbox Exp $ */
19
20 /*! \file */
21
22 #include <config.h>
23
24 #include <stddef.h>
25
26 #include <isc/atomic.h>
27 #include <isc/magic.h>
28 #include <isc/msgs.h>
29 #include <isc/platform.h>
30 #include <isc/rwlock.h>
31 #include <isc/util.h>
32
33 #define RWLOCK_MAGIC            ISC_MAGIC('R', 'W', 'L', 'k')
34 #define VALID_RWLOCK(rwl)       ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
35
36 #ifdef ISC_PLATFORM_USETHREADS
37
38 #ifndef RWLOCK_DEFAULT_READ_QUOTA
39 #define RWLOCK_DEFAULT_READ_QUOTA 4
40 #endif
41
42 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
43 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
44 #endif
45
46 #ifdef ISC_RWLOCK_TRACE
47 #include <stdio.h>              /* Required for fprintf/stderr. */
48 #include <isc/thread.h>         /* Required for isc_thread_self(). */
49
50 static void
51 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
52         fprintf(stderr,
53                 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
54                                ISC_MSG_PRINTLOCK,
55                                "rwlock %p thread %lu %s(%s): %s, %u active, "
56                                "%u granted, %u rwaiting, %u wwaiting\n"),
57                 rwl, isc_thread_self(), operation,
58                 (type == isc_rwlocktype_read ?
59                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
60                                 ISC_MSG_READ, "read") :
61                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
62                                 ISC_MSG_WRITE, "write")),
63                 (rwl->type == isc_rwlocktype_read ?
64                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
65                                 ISC_MSG_READING, "reading") :
66                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
67                                 ISC_MSG_WRITING, "writing")),
68                 rwl->active, rwl->granted, rwl->readers_waiting,
69                 rwl->writers_waiting);
70 }
71 #endif
72
73 isc_result_t
74 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
75                 unsigned int write_quota)
76 {
77         isc_result_t result;
78
79         REQUIRE(rwl != NULL);
80
81         /*
82          * In case there's trouble initializing, we zero magic now.  If all
83          * goes well, we'll set it to RWLOCK_MAGIC.
84          */
85         rwl->magic = 0;
86
87 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
88         rwl->write_requests = 0;
89         rwl->write_completions = 0;
90         rwl->cnt_and_flag = 0;
91         rwl->readers_waiting = 0;
92         rwl->write_granted = 0;
93         if (read_quota != 0) {
94                 UNEXPECTED_ERROR(__FILE__, __LINE__,
95                                  "read quota is not supported");
96         }
97         if (write_quota == 0)
98                 write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
99         rwl->write_quota = write_quota;
100 #else
101         rwl->type = isc_rwlocktype_read;
102         rwl->original = isc_rwlocktype_none;
103         rwl->active = 0;
104         rwl->granted = 0;
105         rwl->readers_waiting = 0;
106         rwl->writers_waiting = 0;
107         if (read_quota == 0)
108                 read_quota = RWLOCK_DEFAULT_READ_QUOTA;
109         rwl->read_quota = read_quota;
110         if (write_quota == 0)
111                 write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
112         rwl->write_quota = write_quota;
113 #endif
114
115         result = isc_mutex_init(&rwl->lock);
116         if (result != ISC_R_SUCCESS)
117                 return (result);
118
119         result = isc_condition_init(&rwl->readable);
120         if (result != ISC_R_SUCCESS) {
121                 UNEXPECTED_ERROR(__FILE__, __LINE__,
122                                  "isc_condition_init(readable) %s: %s",
123                                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
124                                                 ISC_MSG_FAILED, "failed"),
125                                  isc_result_totext(result));
126                 result = ISC_R_UNEXPECTED;
127                 goto destroy_lock;
128         }
129         result = isc_condition_init(&rwl->writeable);
130         if (result != ISC_R_SUCCESS) {
131                 UNEXPECTED_ERROR(__FILE__, __LINE__,
132                                  "isc_condition_init(writeable) %s: %s",
133                                  isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
134                                                 ISC_MSG_FAILED, "failed"),
135                                  isc_result_totext(result));
136                 result = ISC_R_UNEXPECTED;
137                 goto destroy_rcond;
138         }
139
140         rwl->magic = RWLOCK_MAGIC;
141
142         return (ISC_R_SUCCESS);
143
144   destroy_rcond:
145         (void)isc_condition_destroy(&rwl->readable);
146   destroy_lock:
147         DESTROYLOCK(&rwl->lock);
148
149         return (result);
150 }
151
152 void
153 isc_rwlock_destroy(isc_rwlock_t *rwl) {
154         REQUIRE(VALID_RWLOCK(rwl));
155
156 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
157         REQUIRE(rwl->write_requests == rwl->write_completions &&
158                 rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
159 #else
160         LOCK(&rwl->lock);
161         REQUIRE(rwl->active == 0 &&
162                 rwl->readers_waiting == 0 &&
163                 rwl->writers_waiting == 0);
164         UNLOCK(&rwl->lock);
165 #endif
166
167         rwl->magic = 0;
168         (void)isc_condition_destroy(&rwl->readable);
169         (void)isc_condition_destroy(&rwl->writeable);
170         DESTROYLOCK(&rwl->lock);
171 }
172
173 #if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
174
175 /*
176  * When some architecture-dependent atomic operations are available,
177  * rwlock can be more efficient than the generic algorithm defined below.
178  * The basic algorithm is described in the following URL:
179  *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
180  *
181  * The key is to use the following integer variables modified atomically:
182  *   write_requests, write_completions, and cnt_and_flag.
183  *
184  * write_requests and write_completions act as a waiting queue for writers
185  * in order to ensure the FIFO order.  Both variables begin with the initial
186  * value of 0.  When a new writer tries to get a write lock, it increments
187  * write_requests and gets the previous value of the variable as a "ticket".
188  * When write_completions reaches the ticket number, the new writer can start
189  * writing.  When the writer completes its work, it increments
190  * write_completions so that another new writer can start working.  If the
191  * write_requests is not equal to write_completions, it means a writer is now
192  * working or waiting.  In this case, a new readers cannot start reading, or
193  * in other words, this algorithm basically prefers writers.
194  *
195  * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
196  * variable is a kind of structure with two members: writer_flag (1 bit) and
197  * reader_count (31 bits).  The writer_flag shows whether a writer is working,
198  * and the reader_count shows the number of readers currently working or almost
199  * ready for working.  A writer who has the current "ticket" tries to get the
200  * lock by exclusively setting the writer_flag to 1, provided that the whole
201  * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
202  * a new reader tries to increment the "reader_count" field provided that
203  * the writer_flag is 0 (meaning there is no writer working).
204  *
205  * If some of the above operations fail, the reader or the writer sleeps
206  * until the related condition changes.  When a working reader or writer
207  * completes its work, some readers or writers are sleeping, and the condition
208  * that suspended the reader or writer has changed, it wakes up the sleeping
209  * readers or writers.
210  *
211  * As already noted, this algorithm basically prefers writers.  In order to
212  * prevent readers from starving, however, the algorithm also introduces the
213  * "writer quota" (Q).  When Q consecutive writers have completed their work,
214  * suspending readers, the last writer will wake up the readers, even if a new
215  * writer is waiting.
216  *
217  * Implementation specific note: due to the combination of atomic operations
218  * and a mutex lock, ordering between the atomic operation and locks can be
219  * very sensitive in some cases.  In particular, it is generally very important
220  * to check the atomic variable that requires a reader or writer to sleep after
221  * locking the mutex and before actually sleeping; otherwise, it could be very
222  * likely to cause a deadlock.  For example, assume "var" is a variable
223  * atomically modified, then the corresponding code would be:
224  *      if (var == need_sleep) {
225  *              LOCK(lock);
226  *              if (var == need_sleep)
227  *                      WAIT(cond, lock);
228  *              UNLOCK(lock);
229  *      }
230  * The second check is important, since "var" is protected by the atomic
231  * operation, not by the mutex, and can be changed just before sleeping.
232  * (The first "if" could be omitted, but this is also important in order to
233  * make the code efficient by avoiding the use of the mutex unless it is
234  * really necessary.)
235  */
236
237 #define WRITER_ACTIVE   0x1
238 #define READER_INCR     0x2
239
240 isc_result_t
241 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
242         isc_int32_t cntflag;
243
244         REQUIRE(VALID_RWLOCK(rwl));
245
246 #ifdef ISC_RWLOCK_TRACE
247         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
248                                   ISC_MSG_PRELOCK, "prelock"), rwl, type);
249 #endif
250
251         if (type == isc_rwlocktype_read) {
252                 if (rwl->write_requests != rwl->write_completions) {
253                         /* there is a waiting or active writer */
254                         LOCK(&rwl->lock);
255                         if (rwl->write_requests != rwl->write_completions) {
256                                 rwl->readers_waiting++;
257                                 WAIT(&rwl->readable, &rwl->lock);
258                                 rwl->readers_waiting--;
259                         }
260                         UNLOCK(&rwl->lock);
261                 }
262
263                 cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
264                 while (1) {
265                         if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
266                                 break;
267
268                         /* A writer is still working */
269                         LOCK(&rwl->lock);
270                         rwl->readers_waiting++;
271                         if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
272                                 WAIT(&rwl->readable, &rwl->lock);
273                         rwl->readers_waiting--;
274                         UNLOCK(&rwl->lock);
275
276                         /*
277                          * Typically, the reader should be able to get a lock
278                          * at this stage:
279                          *   (1) there should have been no pending writer when
280                          *       the reader was trying to increment the
281                          *       counter; otherwise, the writer should be in
282                          *       the waiting queue, preventing the reader from
283                          *       proceeding to this point.
284                          *   (2) once the reader increments the counter, no
285                          *       more writer can get a lock.
286                          * Still, it is possible another writer can work at
287                          * this point, e.g. in the following scenario:
288                          *   A previous writer unlocks the writer lock.
289                          *   This reader proceeds to point (1).
290                          *   A new writer appears, and gets a new lock before
291                          *   the reader increments the counter.
292                          *   The reader then increments the counter.
293                          *   The previous writer notices there is a waiting
294                          *   reader who is almost ready, and wakes it up.
295                          * So, the reader needs to confirm whether it can now
296                          * read explicitly (thus we loop).  Note that this is
297                          * not an infinite process, since the reader has
298                          * incremented the counter at this point.
299                          */
300                 }
301
302                 /*
303                  * If we are temporarily preferred to writers due to the writer
304                  * quota, reset the condition (race among readers doesn't
305                  * matter).
306                  */
307                 rwl->write_granted = 0;
308         } else {
309                 isc_int32_t prev_writer;
310
311                 /* enter the waiting queue, and wait for our turn */
312                 prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
313                 while (rwl->write_completions != prev_writer) {
314                         LOCK(&rwl->lock);
315                         if (rwl->write_completions != prev_writer) {
316                                 WAIT(&rwl->writeable, &rwl->lock);
317                                 UNLOCK(&rwl->lock);
318                                 continue;
319                         }
320                         UNLOCK(&rwl->lock);
321                         break;
322                 }
323
324                 while (1) {
325                         cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
326                                                      WRITER_ACTIVE);
327                         if (cntflag == 0)
328                                 break;
329
330                         /* Another active reader or writer is working. */
331                         LOCK(&rwl->lock);
332                         if (rwl->cnt_and_flag != 0)
333                                 WAIT(&rwl->writeable, &rwl->lock);
334                         UNLOCK(&rwl->lock);
335                 }
336
337                 INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
338                 rwl->write_granted++;
339         }
340
341 #ifdef ISC_RWLOCK_TRACE
342         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
343                                   ISC_MSG_POSTLOCK, "postlock"), rwl, type);
344 #endif
345
346         return (ISC_R_SUCCESS);
347 }
348
349 isc_result_t
350 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
351         isc_int32_t cntflag;
352
353         REQUIRE(VALID_RWLOCK(rwl));
354
355 #ifdef ISC_RWLOCK_TRACE
356         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
357                                   ISC_MSG_PRELOCK, "prelock"), rwl, type);
358 #endif
359
360         if (type == isc_rwlocktype_read) {
361                 /* If a writer is waiting or working, we fail. */
362                 if (rwl->write_requests != rwl->write_completions)
363                         return (ISC_R_LOCKBUSY);
364
365                 /* Otherwise, be ready for reading. */
366                 cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
367                 if ((cntflag & WRITER_ACTIVE) != 0) {
368                         /*
369                          * A writer is working.  We lose, and cancel the read
370                          * request.
371                          */
372                         cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
373                                                   -READER_INCR);
374                         /*
375                          * If no other readers are waiting and we've suspended
376                          * new writers in this short period, wake them up.
377                          */
378                         if (cntflag == READER_INCR &&
379                             rwl->write_completions != rwl->write_requests) {
380                                 LOCK(&rwl->lock);
381                                 BROADCAST(&rwl->writeable);
382                                 UNLOCK(&rwl->lock);
383                         }
384
385                         return (ISC_R_LOCKBUSY);
386                 }
387         } else {
388                 /* Try locking without entering the waiting queue. */
389                 cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
390                                              WRITER_ACTIVE);
391                 if (cntflag != 0)
392                         return (ISC_R_LOCKBUSY);
393
394                 /*
395                  * XXXJT: jump into the queue, possibly breaking the writer
396                  * order.
397                  */
398                 (void)isc_atomic_xadd(&rwl->write_completions, -1);
399
400                 rwl->write_granted++;
401         }
402
403 #ifdef ISC_RWLOCK_TRACE
404         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
405                                   ISC_MSG_POSTLOCK, "postlock"), rwl, type);
406 #endif
407
408         return (ISC_R_SUCCESS);
409 }
410
411 isc_result_t
412 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
413         isc_int32_t prevcnt;
414
415         REQUIRE(VALID_RWLOCK(rwl));
416
417         /* Try to acquire write access. */
418         prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
419                                      READER_INCR, WRITER_ACTIVE);
420         /*
421          * There must have been no writer, and there must have been at least
422          * one reader.
423          */
424         INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
425                (prevcnt & ~WRITER_ACTIVE) != 0);
426
427         if (prevcnt == READER_INCR) {
428                 /*
429                  * We are the only reader and have been upgraded.
430                  * Now jump into the head of the writer waiting queue.
431                  */
432                 (void)isc_atomic_xadd(&rwl->write_completions, -1);
433         } else
434                 return (ISC_R_LOCKBUSY);
435
436         return (ISC_R_SUCCESS);
437
438 }
439
440 void
441 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
442         isc_int32_t prev_readers;
443
444         REQUIRE(VALID_RWLOCK(rwl));
445
446         /* Become an active reader. */
447         prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
448         /* We must have been a writer. */
449         INSIST((prev_readers & WRITER_ACTIVE) != 0);
450
451         /* Complete write */
452         (void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
453         (void)isc_atomic_xadd(&rwl->write_completions, 1);
454
455         /* Resume other readers */
456         LOCK(&rwl->lock);
457         if (rwl->readers_waiting > 0)
458                 BROADCAST(&rwl->readable);
459         UNLOCK(&rwl->lock);
460 }
461
462 isc_result_t
463 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
464         isc_int32_t prev_cnt;
465
466         REQUIRE(VALID_RWLOCK(rwl));
467
468 #ifdef ISC_RWLOCK_TRACE
469         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
470                                   ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
471 #endif
472
473         if (type == isc_rwlocktype_read) {
474                 prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
475
476                 /*
477                  * If we're the last reader and any writers are waiting, wake
478                  * them up.  We need to wake up all of them to ensure the
479                  * FIFO order.
480                  */
481                 if (prev_cnt == READER_INCR &&
482                     rwl->write_completions != rwl->write_requests) {
483                         LOCK(&rwl->lock);
484                         BROADCAST(&rwl->writeable);
485                         UNLOCK(&rwl->lock);
486                 }
487         } else {
488                 isc_boolean_t wakeup_writers = ISC_TRUE;
489
490                 /*
491                  * Reset the flag, and (implicitly) tell other writers
492                  * we are done.
493                  */
494                 (void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
495                 (void)isc_atomic_xadd(&rwl->write_completions, 1);
496
497                 if (rwl->write_granted >= rwl->write_quota ||
498                     rwl->write_requests == rwl->write_completions ||
499                     (rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
500                         /*
501                          * We have passed the write quota, no writer is
502                          * waiting, or some readers are almost ready, pending
503                          * possible writers.  Note that the last case can
504                          * happen even if write_requests != write_completions
505                          * (which means a new writer in the queue), so we need
506                          * to catch the case explicitly.
507                          */
508                         LOCK(&rwl->lock);
509                         if (rwl->readers_waiting > 0) {
510                                 wakeup_writers = ISC_FALSE;
511                                 BROADCAST(&rwl->readable);
512                         }
513                         UNLOCK(&rwl->lock);
514                 }
515
516                 if (rwl->write_requests != rwl->write_completions &&
517                     wakeup_writers) {
518                         LOCK(&rwl->lock);
519                         BROADCAST(&rwl->writeable);
520                         UNLOCK(&rwl->lock);
521                 }
522         }
523
524 #ifdef ISC_RWLOCK_TRACE
525         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
526                                   ISC_MSG_POSTUNLOCK, "postunlock"),
527                    rwl, type);
528 #endif
529
530         return (ISC_R_SUCCESS);
531 }
532
533 #else /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
534
535 static isc_result_t
536 doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, isc_boolean_t nonblock) {
537         isc_boolean_t skip = ISC_FALSE;
538         isc_boolean_t done = ISC_FALSE;
539         isc_result_t result = ISC_R_SUCCESS;
540
541         REQUIRE(VALID_RWLOCK(rwl));
542
543         LOCK(&rwl->lock);
544
545 #ifdef ISC_RWLOCK_TRACE
546         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
547                                   ISC_MSG_PRELOCK, "prelock"), rwl, type);
548 #endif
549
550         if (type == isc_rwlocktype_read) {
551                 if (rwl->readers_waiting != 0)
552                         skip = ISC_TRUE;
553                 while (!done) {
554                         if (!skip &&
555                             ((rwl->active == 0 ||
556                               (rwl->type == isc_rwlocktype_read &&
557                                (rwl->writers_waiting == 0 ||
558                                 rwl->granted < rwl->read_quota)))))
559                         {
560                                 rwl->type = isc_rwlocktype_read;
561                                 rwl->active++;
562                                 rwl->granted++;
563                                 done = ISC_TRUE;
564                         } else if (nonblock) {
565                                 result = ISC_R_LOCKBUSY;
566                                 done = ISC_TRUE;
567                         } else {
568                                 skip = ISC_FALSE;
569                                 rwl->readers_waiting++;
570                                 WAIT(&rwl->readable, &rwl->lock);
571                                 rwl->readers_waiting--;
572                         }
573                 }
574         } else {
575                 if (rwl->writers_waiting != 0)
576                         skip = ISC_TRUE;
577                 while (!done) {
578                         if (!skip && rwl->active == 0) {
579                                 rwl->type = isc_rwlocktype_write;
580                                 rwl->active = 1;
581                                 rwl->granted++;
582                                 done = ISC_TRUE;
583                         } else if (nonblock) {
584                                 result = ISC_R_LOCKBUSY;
585                                 done = ISC_TRUE;
586                         } else {
587                                 skip = ISC_FALSE;
588                                 rwl->writers_waiting++;
589                                 WAIT(&rwl->writeable, &rwl->lock);
590                                 rwl->writers_waiting--;
591                         }
592                 }
593         }
594
595 #ifdef ISC_RWLOCK_TRACE
596         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
597                                   ISC_MSG_POSTLOCK, "postlock"), rwl, type);
598 #endif
599
600         UNLOCK(&rwl->lock);
601
602         return (result);
603 }
604
605 isc_result_t
606 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
607         return (doit(rwl, type, ISC_FALSE));
608 }
609
610 isc_result_t
611 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
612         return (doit(rwl, type, ISC_TRUE));
613 }
614
615 isc_result_t
616 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
617         isc_result_t result = ISC_R_SUCCESS;
618
619         REQUIRE(VALID_RWLOCK(rwl));
620         LOCK(&rwl->lock);
621         REQUIRE(rwl->type == isc_rwlocktype_read);
622         REQUIRE(rwl->active != 0);
623
624         /* If we are the only reader then succeed. */
625         if (rwl->active == 1) {
626                 rwl->original = (rwl->original == isc_rwlocktype_none) ?
627                                 isc_rwlocktype_read : isc_rwlocktype_none;
628                 rwl->type = isc_rwlocktype_write;
629         } else
630                 result = ISC_R_LOCKBUSY;
631
632         UNLOCK(&rwl->lock);
633         return (result);
634 }
635
636 void
637 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
638
639         REQUIRE(VALID_RWLOCK(rwl));
640         LOCK(&rwl->lock);
641         REQUIRE(rwl->type == isc_rwlocktype_write);
642         REQUIRE(rwl->active == 1);
643
644         rwl->type = isc_rwlocktype_read;
645         rwl->original = (rwl->original == isc_rwlocktype_none) ?
646                         isc_rwlocktype_write : isc_rwlocktype_none;
647         /*
648          * Resume processing any read request that were blocked when
649          * we upgraded.
650          */
651         if (rwl->original == isc_rwlocktype_none &&
652             (rwl->writers_waiting == 0 || rwl->granted < rwl->read_quota) &&
653             rwl->readers_waiting > 0)
654                 BROADCAST(&rwl->readable);
655
656         UNLOCK(&rwl->lock);
657 }
658
659 isc_result_t
660 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
661
662         REQUIRE(VALID_RWLOCK(rwl));
663         LOCK(&rwl->lock);
664         REQUIRE(rwl->type == type);
665
666         UNUSED(type);
667
668 #ifdef ISC_RWLOCK_TRACE
669         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
670                                   ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
671 #endif
672
673         INSIST(rwl->active > 0);
674         rwl->active--;
675         if (rwl->active == 0) {
676                 if (rwl->original != isc_rwlocktype_none) {
677                         rwl->type = rwl->original;
678                         rwl->original = isc_rwlocktype_none;
679                 }
680                 if (rwl->type == isc_rwlocktype_read) {
681                         rwl->granted = 0;
682                         if (rwl->writers_waiting > 0) {
683                                 rwl->type = isc_rwlocktype_write;
684                                 SIGNAL(&rwl->writeable);
685                         } else if (rwl->readers_waiting > 0) {
686                                 /* Does this case ever happen? */
687                                 BROADCAST(&rwl->readable);
688                         }
689                 } else {
690                         if (rwl->readers_waiting > 0) {
691                                 if (rwl->writers_waiting > 0 &&
692                                     rwl->granted < rwl->write_quota) {
693                                         SIGNAL(&rwl->writeable);
694                                 } else {
695                                         rwl->granted = 0;
696                                         rwl->type = isc_rwlocktype_read;
697                                         BROADCAST(&rwl->readable);
698                                 }
699                         } else if (rwl->writers_waiting > 0) {
700                                 rwl->granted = 0;
701                                 SIGNAL(&rwl->writeable);
702                         } else {
703                                 rwl->granted = 0;
704                         }
705                 }
706         }
707         INSIST(rwl->original == isc_rwlocktype_none);
708
709 #ifdef ISC_RWLOCK_TRACE
710         print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
711                                   ISC_MSG_POSTUNLOCK, "postunlock"),
712                    rwl, type);
713 #endif
714
715         UNLOCK(&rwl->lock);
716
717         return (ISC_R_SUCCESS);
718 }
719
720 #endif /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
721 #else /* ISC_PLATFORM_USETHREADS */
722
723 isc_result_t
724 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
725                 unsigned int write_quota)
726 {
727         REQUIRE(rwl != NULL);
728
729         UNUSED(read_quota);
730         UNUSED(write_quota);
731
732         rwl->type = isc_rwlocktype_read;
733         rwl->active = 0;
734         rwl->magic = RWLOCK_MAGIC;
735
736         return (ISC_R_SUCCESS);
737 }
738
739 isc_result_t
740 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
741         REQUIRE(VALID_RWLOCK(rwl));
742
743         if (type == isc_rwlocktype_read) {
744                 if (rwl->type != isc_rwlocktype_read && rwl->active != 0)
745                         return (ISC_R_LOCKBUSY);
746                 rwl->type = isc_rwlocktype_read;
747                 rwl->active++;
748         } else {
749                 if (rwl->active != 0)
750                         return (ISC_R_LOCKBUSY);
751                 rwl->type = isc_rwlocktype_write;
752                 rwl->active = 1;
753         }
754         return (ISC_R_SUCCESS);
755 }
756
757 isc_result_t
758 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
759         return (isc_rwlock_lock(rwl, type));
760 }
761
762 isc_result_t
763 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
764         isc_result_t result = ISC_R_SUCCESS;
765
766         REQUIRE(VALID_RWLOCK(rwl));
767         REQUIRE(rwl->type == isc_rwlocktype_read);
768         REQUIRE(rwl->active != 0);
769
770         /* If we are the only reader then succeed. */
771         if (rwl->active == 1)
772                 rwl->type = isc_rwlocktype_write;
773         else
774                 result = ISC_R_LOCKBUSY;
775         return (result);
776 }
777
778 void
779 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
780
781         REQUIRE(VALID_RWLOCK(rwl));
782         REQUIRE(rwl->type == isc_rwlocktype_write);
783         REQUIRE(rwl->active == 1);
784
785         rwl->type = isc_rwlocktype_read;
786 }
787
788 isc_result_t
789 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
790         REQUIRE(VALID_RWLOCK(rwl));
791         REQUIRE(rwl->type == type);
792
793         UNUSED(type);
794
795         INSIST(rwl->active > 0);
796         rwl->active--;
797
798         return (ISC_R_SUCCESS);
799 }
800
801 void
802 isc_rwlock_destroy(isc_rwlock_t *rwl) {
803         REQUIRE(rwl != NULL);
804         REQUIRE(rwl->active == 0);
805         rwl->magic = 0;
806 }
807
808 #endif /* ISC_PLATFORM_USETHREADS */