1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
39 * svc.c, Server-side remote procedure call interface.
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
45 * Copyright (C) 1984, Sun Microsystems, Inc.
48 #include <sys/param.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
54 #include <sys/mutex.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
61 #include <sys/ucred.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
67 #include <rpc/rpc_com.h>
69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
80 /* *************** SVCXPRT related stuff **************** */
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
97 pool->sp_state = SVCPOOL_INIT;
99 TAILQ_INIT(&pool->sp_callouts);
100 TAILQ_INIT(&pool->sp_lcallouts);
101 pool->sp_minthreads = 1;
102 pool->sp_maxthreads = 1;
103 pool->sp_groupcount = 1;
104 for (g = 0; g < SVC_MAXGROUPS; g++) {
105 grp = &pool->sp_groups[g];
106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
108 grp->sg_state = SVCPOOL_ACTIVE;
109 TAILQ_INIT(&grp->sg_xlist);
110 TAILQ_INIT(&grp->sg_active);
111 LIST_INIT(&grp->sg_idlethreads);
112 grp->sg_minthreads = 1;
113 grp->sg_maxthreads = 1;
117 * Don't use more than a quarter of mbuf clusters. Nota bene:
118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 * on LP64 architectures, so cast to u_long to avoid undefined
120 * behavior. (ILP32 architectures cannot have nmbclusters
121 * large enough to overflow for other reasons.)
123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
126 sysctl_ctx_init(&pool->sp_sysctl);
128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
130 pool, 0, svcpool_minthread_sysctl, "I",
131 "Minimal number of threads");
132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
134 pool, 0, svcpool_maxthread_sysctl, "I",
135 "Maximal number of threads");
136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "threads", CTLTYPE_INT | CTLFLAG_RD,
138 pool, 0, svcpool_threads_sysctl, "I",
139 "Current number of threads");
140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 "Number of thread groups");
144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 "request_space_used", CTLFLAG_RD,
146 &pool->sp_space_used,
147 "Space in parsed but not handled requests.");
149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 "request_space_used_highest", CTLFLAG_RD,
151 &pool->sp_space_used_highest,
152 "Highest space used since reboot.");
154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 "request_space_high", CTLFLAG_RW,
156 &pool->sp_space_high,
157 "Maximum space in parsed but not handled requests.");
159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 "request_space_low", CTLFLAG_RW,
162 "Low water mark for request space.");
164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 "request_space_throttled", CTLFLAG_RD,
166 &pool->sp_space_throttled, 0,
167 "Whether nfs requests are currently throttled");
169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 "request_space_throttle_count", CTLFLAG_RD,
171 &pool->sp_space_throttle_count, 0,
172 "Count of times throttling based on request space has occurred");
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
183 svcpool_cleanup(SVCPOOL *pool)
186 SVCXPRT *xprt, *nxprt;
187 struct svc_callout *s;
188 struct svc_loss_callout *sl;
189 struct svcxprt_list cleanup;
192 TAILQ_INIT(&cleanup);
194 for (g = 0; g < SVC_MAXGROUPS; g++) {
195 grp = &pool->sp_groups[g];
196 mtx_lock(&grp->sg_lock);
197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 xprt_unregister_locked(xprt);
199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
201 mtx_unlock(&grp->sg_lock);
203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
207 mtx_lock(&pool->sp_lock);
208 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
209 mtx_unlock(&pool->sp_lock);
210 svc_unreg(pool, s->sc_prog, s->sc_vers);
211 mtx_lock(&pool->sp_lock);
213 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
214 mtx_unlock(&pool->sp_lock);
215 svc_loss_unreg(pool, sl->slc_dispatch);
216 mtx_lock(&pool->sp_lock);
218 mtx_unlock(&pool->sp_lock);
222 svcpool_destroy(SVCPOOL *pool)
227 svcpool_cleanup(pool);
229 for (g = 0; g < SVC_MAXGROUPS; g++) {
230 grp = &pool->sp_groups[g];
231 mtx_destroy(&grp->sg_lock);
233 mtx_destroy(&pool->sp_lock);
236 replay_freecache(pool->sp_rcache);
238 sysctl_ctx_free(&pool->sp_sysctl);
243 * Similar to svcpool_destroy(), except that it does not destroy the actual
244 * data structures. As such, "pool" may be used again.
247 svcpool_close(SVCPOOL *pool)
252 svcpool_cleanup(pool);
254 /* Now, initialize the pool's state for a fresh svc_run() call. */
255 mtx_lock(&pool->sp_lock);
256 pool->sp_state = SVCPOOL_INIT;
257 mtx_unlock(&pool->sp_lock);
258 for (g = 0; g < SVC_MAXGROUPS; g++) {
259 grp = &pool->sp_groups[g];
260 mtx_lock(&grp->sg_lock);
261 grp->sg_state = SVCPOOL_ACTIVE;
262 mtx_unlock(&grp->sg_lock);
267 * Sysctl handler to get the present thread count on a pool
270 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273 int threads, error, g;
275 pool = oidp->oid_arg1;
277 mtx_lock(&pool->sp_lock);
278 for (g = 0; g < pool->sp_groupcount; g++)
279 threads += pool->sp_groups[g].sg_threadcount;
280 mtx_unlock(&pool->sp_lock);
281 error = sysctl_handle_int(oidp, &threads, 0, req);
286 * Sysctl handler to set the minimum thread count on a pool
289 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292 int newminthreads, error, g;
294 pool = oidp->oid_arg1;
295 newminthreads = pool->sp_minthreads;
296 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
297 if (error == 0 && newminthreads != pool->sp_minthreads) {
298 if (newminthreads > pool->sp_maxthreads)
300 mtx_lock(&pool->sp_lock);
301 pool->sp_minthreads = newminthreads;
302 for (g = 0; g < pool->sp_groupcount; g++) {
303 pool->sp_groups[g].sg_minthreads = max(1,
304 pool->sp_minthreads / pool->sp_groupcount);
306 mtx_unlock(&pool->sp_lock);
312 * Sysctl handler to set the maximum thread count on a pool
315 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318 int newmaxthreads, error, g;
320 pool = oidp->oid_arg1;
321 newmaxthreads = pool->sp_maxthreads;
322 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
323 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
324 if (newmaxthreads < pool->sp_minthreads)
326 mtx_lock(&pool->sp_lock);
327 pool->sp_maxthreads = newmaxthreads;
328 for (g = 0; g < pool->sp_groupcount; g++) {
329 pool->sp_groups[g].sg_maxthreads = max(1,
330 pool->sp_maxthreads / pool->sp_groupcount);
332 mtx_unlock(&pool->sp_lock);
338 * Activate a transport handle.
341 xprt_register(SVCXPRT *xprt)
343 SVCPOOL *pool = xprt->xp_pool;
348 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
349 xprt->xp_group = grp = &pool->sp_groups[g];
350 mtx_lock(&grp->sg_lock);
351 xprt->xp_registered = TRUE;
352 xprt->xp_active = FALSE;
353 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
354 mtx_unlock(&grp->sg_lock);
358 * De-activate a transport handle. Note: the locked version doesn't
359 * release the transport - caller must do that after dropping the pool
363 xprt_unregister_locked(SVCXPRT *xprt)
365 SVCGROUP *grp = xprt->xp_group;
367 mtx_assert(&grp->sg_lock, MA_OWNED);
368 KASSERT(xprt->xp_registered == TRUE,
369 ("xprt_unregister_locked: not registered"));
370 xprt_inactive_locked(xprt);
371 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
372 xprt->xp_registered = FALSE;
376 xprt_unregister(SVCXPRT *xprt)
378 SVCGROUP *grp = xprt->xp_group;
380 mtx_lock(&grp->sg_lock);
381 if (xprt->xp_registered == FALSE) {
382 /* Already unregistered by another thread */
383 mtx_unlock(&grp->sg_lock);
386 xprt_unregister_locked(xprt);
387 mtx_unlock(&grp->sg_lock);
393 * Attempt to assign a service thread to this transport.
396 xprt_assignthread(SVCXPRT *xprt)
398 SVCGROUP *grp = xprt->xp_group;
401 mtx_assert(&grp->sg_lock, MA_OWNED);
402 st = LIST_FIRST(&grp->sg_idlethreads);
404 LIST_REMOVE(st, st_ilink);
406 xprt->xp_thread = st;
408 cv_signal(&st->st_cond);
412 * See if we can create a new thread. The
413 * actual thread creation happens in
414 * svc_run_internal because our locking state
415 * is poorly defined (we are typically called
416 * from a socket upcall). Don't create more
417 * than one thread per second.
419 if (grp->sg_state == SVCPOOL_ACTIVE
420 && grp->sg_lastcreatetime < time_uptime
421 && grp->sg_threadcount < grp->sg_maxthreads) {
422 grp->sg_state = SVCPOOL_THREADWANTED;
429 xprt_active(SVCXPRT *xprt)
431 SVCGROUP *grp = xprt->xp_group;
433 mtx_lock(&grp->sg_lock);
435 if (!xprt->xp_registered) {
437 * Race with xprt_unregister - we lose.
439 mtx_unlock(&grp->sg_lock);
443 if (!xprt->xp_active) {
444 xprt->xp_active = TRUE;
445 if (xprt->xp_thread == NULL) {
446 if (!svc_request_space_available(xprt->xp_pool) ||
447 !xprt_assignthread(xprt))
448 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453 mtx_unlock(&grp->sg_lock);
457 xprt_inactive_locked(SVCXPRT *xprt)
459 SVCGROUP *grp = xprt->xp_group;
461 mtx_assert(&grp->sg_lock, MA_OWNED);
462 if (xprt->xp_active) {
463 if (xprt->xp_thread == NULL)
464 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
465 xprt->xp_active = FALSE;
470 xprt_inactive(SVCXPRT *xprt)
472 SVCGROUP *grp = xprt->xp_group;
474 mtx_lock(&grp->sg_lock);
475 xprt_inactive_locked(xprt);
476 mtx_unlock(&grp->sg_lock);
480 * Variant of xprt_inactive() for use only when sure that port is
481 * assigned to thread. For example, within receive handlers.
484 xprt_inactive_self(SVCXPRT *xprt)
487 KASSERT(xprt->xp_thread != NULL,
488 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
489 xprt->xp_active = FALSE;
493 * Add a service program to the callout list.
494 * The dispatch routine will be called when a rpc request for this
495 * program number comes in.
498 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
499 void (*dispatch)(struct svc_req *, SVCXPRT *),
500 const struct netconfig *nconf)
502 SVCPOOL *pool = xprt->xp_pool;
503 struct svc_callout *s;
507 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
509 if (xprt->xp_netid) {
510 netid = strdup(xprt->xp_netid, M_RPC);
512 } else if (nconf && nconf->nc_netid) {
513 netid = strdup(nconf->nc_netid, M_RPC);
515 } /* must have been created with svc_raw_create */
516 if ((netid == NULL) && (flag == 1)) {
520 mtx_lock(&pool->sp_lock);
521 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
524 if (s->sc_dispatch == dispatch)
525 goto rpcb_it; /* he is registering another xptr */
526 mtx_unlock(&pool->sp_lock);
529 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
533 mtx_unlock(&pool->sp_lock);
539 s->sc_dispatch = dispatch;
541 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
543 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
544 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
547 mtx_unlock(&pool->sp_lock);
548 /* now register the information with the local binder service */
551 struct netconfig tnc;
554 nb.buf = &xprt->xp_ltaddr;
555 nb.len = xprt->xp_ltaddr.ss_len;
556 dummy = rpcb_set(prog, vers, &tnc, &nb);
563 * Remove a service program from the callout list.
566 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
568 struct svc_callout *s;
570 /* unregister the information anyway */
571 (void) rpcb_unset(prog, vers, NULL);
572 mtx_lock(&pool->sp_lock);
573 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
574 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
576 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
577 mem_free(s, sizeof (struct svc_callout));
579 mtx_unlock(&pool->sp_lock);
583 * Add a service connection loss program to the callout list.
584 * The dispatch routine will be called when some port in ths pool die.
587 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
589 SVCPOOL *pool = xprt->xp_pool;
590 struct svc_loss_callout *s;
592 mtx_lock(&pool->sp_lock);
593 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
594 if (s->slc_dispatch == dispatch)
598 mtx_unlock(&pool->sp_lock);
601 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
603 mtx_unlock(&pool->sp_lock);
606 s->slc_dispatch = dispatch;
607 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
608 mtx_unlock(&pool->sp_lock);
613 * Remove a service connection loss program from the callout list.
616 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
618 struct svc_loss_callout *s;
620 mtx_lock(&pool->sp_lock);
621 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
622 if (s->slc_dispatch == dispatch) {
623 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628 mtx_unlock(&pool->sp_lock);
631 /* ********************** CALLOUT list related stuff ************* */
634 * Search the callout list for a program number, return the callout
637 static struct svc_callout *
638 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
640 struct svc_callout *s;
642 mtx_assert(&pool->sp_lock, MA_OWNED);
643 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
644 if (s->sc_prog == prog && s->sc_vers == vers
645 && (netid == NULL || s->sc_netid == NULL ||
646 strcmp(netid, s->sc_netid) == 0))
653 /* ******************* REPLY GENERATION ROUTINES ************ */
656 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
659 SVCXPRT *xprt = rqstp->rq_xprt;
662 if (rqstp->rq_args) {
663 m_freem(rqstp->rq_args);
664 rqstp->rq_args = NULL;
667 if (xprt->xp_pool->sp_rcache)
668 replay_setreply(xprt->xp_pool->sp_rcache,
669 rply, svc_getrpccaller(rqstp), body);
671 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
674 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
675 if (rqstp->rq_addr) {
676 free(rqstp->rq_addr, M_SONAME);
677 rqstp->rq_addr = NULL;
684 * Send a reply to an rpc request
687 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
694 rply.rm_xid = rqstp->rq_xid;
695 rply.rm_direction = REPLY;
696 rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 rply.acpted_rply.ar_stat = SUCCESS;
699 rply.acpted_rply.ar_results.where = NULL;
700 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
702 m = m_getcl(M_WAITOK, MT_DATA, 0);
703 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
704 ok = xdr_results(&xdrs, xdr_location);
708 return (svc_sendreply_common(rqstp, &rply, m));
716 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
720 rply.rm_xid = rqstp->rq_xid;
721 rply.rm_direction = REPLY;
722 rply.rm_reply.rp_stat = MSG_ACCEPTED;
723 rply.acpted_rply.ar_verf = rqstp->rq_verf;
724 rply.acpted_rply.ar_stat = SUCCESS;
725 rply.acpted_rply.ar_results.where = NULL;
726 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
728 return (svc_sendreply_common(rqstp, &rply, m));
732 * No procedure error reply
735 svcerr_noproc(struct svc_req *rqstp)
737 SVCXPRT *xprt = rqstp->rq_xprt;
740 rply.rm_xid = rqstp->rq_xid;
741 rply.rm_direction = REPLY;
742 rply.rm_reply.rp_stat = MSG_ACCEPTED;
743 rply.acpted_rply.ar_verf = rqstp->rq_verf;
744 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
746 if (xprt->xp_pool->sp_rcache)
747 replay_setreply(xprt->xp_pool->sp_rcache,
748 &rply, svc_getrpccaller(rqstp), NULL);
750 svc_sendreply_common(rqstp, &rply, NULL);
754 * Can't decode args error reply
757 svcerr_decode(struct svc_req *rqstp)
759 SVCXPRT *xprt = rqstp->rq_xprt;
762 rply.rm_xid = rqstp->rq_xid;
763 rply.rm_direction = REPLY;
764 rply.rm_reply.rp_stat = MSG_ACCEPTED;
765 rply.acpted_rply.ar_verf = rqstp->rq_verf;
766 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
768 if (xprt->xp_pool->sp_rcache)
769 replay_setreply(xprt->xp_pool->sp_rcache,
770 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
772 svc_sendreply_common(rqstp, &rply, NULL);
779 svcerr_systemerr(struct svc_req *rqstp)
781 SVCXPRT *xprt = rqstp->rq_xprt;
784 rply.rm_xid = rqstp->rq_xid;
785 rply.rm_direction = REPLY;
786 rply.rm_reply.rp_stat = MSG_ACCEPTED;
787 rply.acpted_rply.ar_verf = rqstp->rq_verf;
788 rply.acpted_rply.ar_stat = SYSTEM_ERR;
790 if (xprt->xp_pool->sp_rcache)
791 replay_setreply(xprt->xp_pool->sp_rcache,
792 &rply, svc_getrpccaller(rqstp), NULL);
794 svc_sendreply_common(rqstp, &rply, NULL);
798 * Authentication error reply
801 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
803 SVCXPRT *xprt = rqstp->rq_xprt;
806 rply.rm_xid = rqstp->rq_xid;
807 rply.rm_direction = REPLY;
808 rply.rm_reply.rp_stat = MSG_DENIED;
809 rply.rjcted_rply.rj_stat = AUTH_ERROR;
810 rply.rjcted_rply.rj_why = why;
812 if (xprt->xp_pool->sp_rcache)
813 replay_setreply(xprt->xp_pool->sp_rcache,
814 &rply, svc_getrpccaller(rqstp), NULL);
816 svc_sendreply_common(rqstp, &rply, NULL);
820 * Auth too weak error reply
823 svcerr_weakauth(struct svc_req *rqstp)
826 svcerr_auth(rqstp, AUTH_TOOWEAK);
830 * Program unavailable error reply
833 svcerr_noprog(struct svc_req *rqstp)
835 SVCXPRT *xprt = rqstp->rq_xprt;
838 rply.rm_xid = rqstp->rq_xid;
839 rply.rm_direction = REPLY;
840 rply.rm_reply.rp_stat = MSG_ACCEPTED;
841 rply.acpted_rply.ar_verf = rqstp->rq_verf;
842 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
844 if (xprt->xp_pool->sp_rcache)
845 replay_setreply(xprt->xp_pool->sp_rcache,
846 &rply, svc_getrpccaller(rqstp), NULL);
848 svc_sendreply_common(rqstp, &rply, NULL);
852 * Program version mismatch error reply
855 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
857 SVCXPRT *xprt = rqstp->rq_xprt;
860 rply.rm_xid = rqstp->rq_xid;
861 rply.rm_direction = REPLY;
862 rply.rm_reply.rp_stat = MSG_ACCEPTED;
863 rply.acpted_rply.ar_verf = rqstp->rq_verf;
864 rply.acpted_rply.ar_stat = PROG_MISMATCH;
865 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
866 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
868 if (xprt->xp_pool->sp_rcache)
869 replay_setreply(xprt->xp_pool->sp_rcache,
870 &rply, svc_getrpccaller(rqstp), NULL);
872 svc_sendreply_common(rqstp, &rply, NULL);
876 * Allocate a new server transport structure. All fields are
877 * initialized to zero and xp_p3 is initialized to point at an
878 * extension structure to hold various flags and authentication
887 xprt = mem_alloc(sizeof(SVCXPRT));
888 ext = mem_alloc(sizeof(SVCXPRT_EXT));
890 refcount_init(&xprt->xp_refs, 1);
896 * Free a server transport structure.
899 svc_xprt_free(SVCXPRT *xprt)
902 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
903 mem_free(xprt, sizeof(SVCXPRT));
906 /* ******************* SERVER INPUT STUFF ******************* */
909 * Read RPC requests from a transport and queue them to be
910 * executed. We handle authentication and replay cache replies here.
911 * Actually dispatching the RPC is deferred till svc_executereq.
913 static enum xprt_stat
914 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
916 SVCPOOL *pool = xprt->xp_pool;
920 struct svc_loss_callout *s;
923 /* now receive msgs from xprtprt (support batch calls) */
924 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
926 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
927 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
928 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
929 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
933 * Handle replays and authenticate before queuing the
934 * request to be executed.
938 if (pool->sp_rcache) {
939 struct rpc_msg repmsg;
940 struct mbuf *repbody;
941 enum replay_state rs;
942 rs = replay_find(pool->sp_rcache, &msg,
943 svc_getrpccaller(r), &repmsg, &repbody);
948 SVC_REPLY(xprt, &repmsg, r->rq_addr,
949 repbody, &r->rq_reply_seq);
951 free(r->rq_addr, M_SONAME);
963 r->rq_xid = msg.rm_xid;
964 r->rq_prog = msg.rm_call.cb_prog;
965 r->rq_vers = msg.rm_call.cb_vers;
966 r->rq_proc = msg.rm_call.cb_proc;
967 r->rq_size = sizeof(*r) + m_length(args, NULL);
969 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
971 * RPCSEC_GSS uses this return code
972 * for requests that form part of its
973 * context establishment protocol and
974 * should not be dispatched to the
977 if (why != RPCSEC_GSS_NODISPATCH)
982 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
988 * Everything checks out, return request to caller.
998 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
999 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1000 (*s->slc_dispatch)(xprt);
1001 xprt_unregister(xprt);
1008 svc_executereq(struct svc_req *rqstp)
1010 SVCXPRT *xprt = rqstp->rq_xprt;
1011 SVCPOOL *pool = xprt->xp_pool;
1014 rpcvers_t high_vers;
1015 struct svc_callout *s;
1017 /* now match message with a registered service*/
1019 low_vers = (rpcvers_t) -1L;
1020 high_vers = (rpcvers_t) 0L;
1021 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1022 if (s->sc_prog == rqstp->rq_prog) {
1023 if (s->sc_vers == rqstp->rq_vers) {
1025 * We hand ownership of r to the
1026 * dispatch method - they must call
1029 (*s->sc_dispatch)(rqstp, xprt);
1031 } /* found correct version */
1033 if (s->sc_vers < low_vers)
1034 low_vers = s->sc_vers;
1035 if (s->sc_vers > high_vers)
1036 high_vers = s->sc_vers;
1037 } /* found correct program */
1041 * if we got here, the program or version
1045 svcerr_progvers(rqstp, low_vers, high_vers);
1047 svcerr_noprog(rqstp);
1053 svc_checkidle(SVCGROUP *grp)
1055 SVCXPRT *xprt, *nxprt;
1057 struct svcxprt_list cleanup;
1059 TAILQ_INIT(&cleanup);
1060 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1062 * Only some transports have idle timers. Don't time
1063 * something out which is just waking up.
1065 if (!xprt->xp_idletimeout || xprt->xp_thread)
1068 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1069 if (time_uptime > timo) {
1070 xprt_unregister_locked(xprt);
1071 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1075 mtx_unlock(&grp->sg_lock);
1076 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1079 mtx_lock(&grp->sg_lock);
1083 svc_assign_waiting_sockets(SVCPOOL *pool)
1089 for (g = 0; g < pool->sp_groupcount; g++) {
1090 grp = &pool->sp_groups[g];
1091 mtx_lock(&grp->sg_lock);
1092 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1093 if (xprt_assignthread(xprt))
1094 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1098 mtx_unlock(&grp->sg_lock);
1103 svc_change_space_used(SVCPOOL *pool, long delta)
1105 unsigned long value;
1107 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1109 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1110 pool->sp_space_throttled = TRUE;
1111 pool->sp_space_throttle_count++;
1113 if (value > pool->sp_space_used_highest)
1114 pool->sp_space_used_highest = value;
1116 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1117 pool->sp_space_throttled = FALSE;
1118 svc_assign_waiting_sockets(pool);
1124 svc_request_space_available(SVCPOOL *pool)
1127 if (pool->sp_space_throttled)
1133 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1135 SVCPOOL *pool = grp->sg_pool;
1136 SVCTHREAD *st, *stpref;
1138 enum xprt_stat stat;
1139 struct svc_req *rqstp;
1144 st = mem_alloc(sizeof(*st));
1145 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1148 STAILQ_INIT(&st->st_reqs);
1149 cv_init(&st->st_cond, "rpcsvc");
1151 mtx_lock(&grp->sg_lock);
1154 * If we are a new thread which was spawned to cope with
1155 * increased load, set the state back to SVCPOOL_ACTIVE.
1157 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1158 grp->sg_state = SVCPOOL_ACTIVE;
1160 while (grp->sg_state != SVCPOOL_CLOSING) {
1162 * Create new thread if requested.
1164 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1165 grp->sg_state = SVCPOOL_THREADSTARTING;
1166 grp->sg_lastcreatetime = time_uptime;
1167 mtx_unlock(&grp->sg_lock);
1168 svc_new_thread(grp);
1169 mtx_lock(&grp->sg_lock);
1174 * Check for idle transports once per second.
1176 if (time_uptime > grp->sg_lastidlecheck) {
1177 grp->sg_lastidlecheck = time_uptime;
1184 * Enforce maxthreads count.
1186 if (grp->sg_threadcount > grp->sg_maxthreads)
1190 * Before sleeping, see if we can find an
1191 * active transport which isn't being serviced
1194 if (svc_request_space_available(pool) &&
1195 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1196 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1198 xprt->xp_thread = st;
1203 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1204 if (ismaster || (!ismaster &&
1205 grp->sg_threadcount > grp->sg_minthreads))
1206 error = cv_timedwait_sig(&st->st_cond,
1207 &grp->sg_lock, 5 * hz);
1209 error = cv_wait_sig(&st->st_cond,
1211 if (st->st_xprt == NULL)
1212 LIST_REMOVE(st, st_ilink);
1215 * Reduce worker thread count when idle.
1217 if (error == EWOULDBLOCK) {
1219 && (grp->sg_threadcount
1220 > grp->sg_minthreads)
1223 } else if (error != 0) {
1224 KASSERT(error == EINTR || error == ERESTART,
1225 ("non-signal error %d", error));
1226 mtx_unlock(&grp->sg_lock);
1229 if (P_SHOULDSTOP(p) ||
1230 (p->p_flag & P_TOTAL_STOP) != 0) {
1231 thread_suspend_check(0);
1233 mtx_lock(&grp->sg_lock);
1237 mtx_lock(&grp->sg_lock);
1243 mtx_unlock(&grp->sg_lock);
1246 * Drain the transport socket and queue up any RPCs.
1248 xprt->xp_lastactive = time_uptime;
1250 if (!svc_request_space_available(pool))
1253 stat = svc_getreq(xprt, &rqstp);
1255 svc_change_space_used(pool, rqstp->rq_size);
1257 * See if the application has a preference
1258 * for some other thread.
1260 if (pool->sp_assign) {
1261 stpref = pool->sp_assign(st, rqstp);
1262 rqstp->rq_thread = stpref;
1263 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1265 mtx_unlock(&stpref->st_lock);
1269 rqstp->rq_thread = st;
1270 STAILQ_INSERT_TAIL(&st->st_reqs,
1274 } while (rqstp == NULL && stat == XPRT_MOREREQS
1275 && grp->sg_state != SVCPOOL_CLOSING);
1278 * Move this transport to the end of the active list to
1279 * ensure fairness when multiple transports are active.
1280 * If this was the last queued request, svc_getreq will end
1281 * up calling xprt_inactive to remove from the active list.
1283 mtx_lock(&grp->sg_lock);
1284 xprt->xp_thread = NULL;
1286 if (xprt->xp_active) {
1287 if (!svc_request_space_available(pool) ||
1288 !xprt_assignthread(xprt))
1289 TAILQ_INSERT_TAIL(&grp->sg_active,
1292 mtx_unlock(&grp->sg_lock);
1296 * Execute what we have queued.
1298 mtx_lock(&st->st_lock);
1299 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1300 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1301 mtx_unlock(&st->st_lock);
1302 sz = (long)rqstp->rq_size;
1303 svc_executereq(rqstp);
1304 svc_change_space_used(pool, -sz);
1305 mtx_lock(&st->st_lock);
1307 mtx_unlock(&st->st_lock);
1308 mtx_lock(&grp->sg_lock);
1316 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1317 mtx_destroy(&st->st_lock);
1318 cv_destroy(&st->st_cond);
1319 mem_free(st, sizeof(*st));
1321 grp->sg_threadcount--;
1324 mtx_unlock(&grp->sg_lock);
1328 svc_thread_start(void *arg)
1331 svc_run_internal((SVCGROUP *) arg, FALSE);
1336 svc_new_thread(SVCGROUP *grp)
1338 SVCPOOL *pool = grp->sg_pool;
1341 mtx_lock(&grp->sg_lock);
1342 grp->sg_threadcount++;
1343 mtx_unlock(&grp->sg_lock);
1344 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1345 "%s: service", pool->sp_name);
1349 svc_run(SVCPOOL *pool)
1358 snprintf(td->td_name, sizeof(td->td_name),
1359 "%s: master", pool->sp_name);
1360 pool->sp_state = SVCPOOL_ACTIVE;
1363 /* Choose group count based on number of threads and CPUs. */
1364 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1365 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1366 for (g = 0; g < pool->sp_groupcount; g++) {
1367 grp = &pool->sp_groups[g];
1368 grp->sg_minthreads = max(1,
1369 pool->sp_minthreads / pool->sp_groupcount);
1370 grp->sg_maxthreads = max(1,
1371 pool->sp_maxthreads / pool->sp_groupcount);
1372 grp->sg_lastcreatetime = time_uptime;
1375 /* Starting threads */
1376 pool->sp_groups[0].sg_threadcount++;
1377 for (g = 0; g < pool->sp_groupcount; g++) {
1378 grp = &pool->sp_groups[g];
1379 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1380 svc_new_thread(grp);
1382 svc_run_internal(&pool->sp_groups[0], TRUE);
1384 /* Waiting for threads to stop. */
1385 for (g = 0; g < pool->sp_groupcount; g++) {
1386 grp = &pool->sp_groups[g];
1387 mtx_lock(&grp->sg_lock);
1388 while (grp->sg_threadcount > 0)
1389 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1390 mtx_unlock(&grp->sg_lock);
1395 svc_exit(SVCPOOL *pool)
1401 pool->sp_state = SVCPOOL_CLOSING;
1402 for (g = 0; g < pool->sp_groupcount; g++) {
1403 grp = &pool->sp_groups[g];
1404 mtx_lock(&grp->sg_lock);
1405 if (grp->sg_state != SVCPOOL_CLOSING) {
1406 grp->sg_state = SVCPOOL_CLOSING;
1407 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1408 cv_signal(&st->st_cond);
1410 mtx_unlock(&grp->sg_lock);
1415 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1422 rqstp->rq_args = NULL;
1424 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1425 stat = xargs(&xdrs, args);
1432 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1436 if (rqstp->rq_addr) {
1437 free(rqstp->rq_addr, M_SONAME);
1438 rqstp->rq_addr = NULL;
1441 xdrs.x_op = XDR_FREE;
1442 return (xargs(&xdrs, args));
1446 svc_freereq(struct svc_req *rqstp)
1451 st = rqstp->rq_thread;
1455 pool->sp_done(st, rqstp);
1458 if (rqstp->rq_auth.svc_ah_ops)
1459 SVCAUTH_RELEASE(&rqstp->rq_auth);
1461 if (rqstp->rq_xprt) {
1462 SVC_RELEASE(rqstp->rq_xprt);
1466 free(rqstp->rq_addr, M_SONAME);
1469 m_freem(rqstp->rq_args);