Fix an issue with positive namecache timeouts. Locked children often
[dragonfly.git] / sys / kern / kern_syslink.c
1 /*
2  * Copyright (c) 2006-2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/kern_syslink.c,v 1.12 2007/06/17 21:31:05 dillon Exp $
35  */
36 /*
37  * This module implements the core syslink() system call and provides
38  * glue for kernel syslink frontends and backends, creating a intra-host
39  * communications infrastructure and DMA transport abstraction.
40  */
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/endian.h>
46 #include <sys/malloc.h>
47 #include <sys/alist.h>
48 #include <sys/file.h>
49 #include <sys/proc.h>
50 #include <sys/lock.h>
51 #include <sys/uio.h>
52 #include <sys/objcache.h>
53 #include <sys/queue.h>
54 #include <sys/thread.h>
55 #include <sys/tree.h>
56 #include <sys/sysctl.h>
57 #include <sys/sysproto.h>
58 #include <sys/mbuf.h>
59 #include <sys/socket.h>
60 #include <sys/socketvar.h>
61 #include <sys/socketops.h>
62 #include <sys/sysref.h>
63 #include <sys/syslink.h>
64 #include <sys/syslink_msg.h>
65 #include <netinet/in.h>
66
67 #include <sys/thread2.h>
68 #include <sys/spinlock2.h>
69
70 #include "opt_syslink.h"
71
72 /*
73  * Syslink Connection abstraction
74  */
75 struct slcommon {
76         struct spinlock spin;
77         int     refs;
78 };
79
80 struct sldesc {
81         struct slmsgq   inq;
82         struct slmsg_rb_tree reply_rb_root; /* replies to requests */
83         struct spinlock spin;
84         struct sldesc   *peer;          /* peer syslink, if any */
85         struct file     *xfp;           /* external file pointer */
86         struct slcommon *common;
87         int     flags;
88         int     rwaiters;               /* number of threads waiting */
89         int     wblocked;               /* blocked waiting for us to drain */
90         size_t  cmdbytes;               /* unreplied commands pending */
91         size_t  repbytes;               /* undrained replies pending */
92         int     (*backend_wblocked)(struct sldesc *, int, sl_proto_t);
93         int     (*backend_write)(struct sldesc *, struct slmsg *);
94         void    (*backend_reply)(struct sldesc *,struct slmsg *,struct slmsg *);
95         void    (*backend_dispose)(struct sldesc *, struct slmsg *);
96 };
97
98 #define SLF_RSHUTDOWN   0x0001
99 #define SLF_WSHUTDOWN   0x0002
100
101 static int syslink_cmd_new(struct syslink_info_new *info, int *result);
102 static struct sldesc *allocsldesc(struct slcommon *common);
103 static void setsldescfp(struct sldesc *sl, struct file *fp);
104 static void shutdownsldesc(struct sldesc *sl, int how);
105 static void shutdownsldesc2(struct sldesc *sl, int how);
106 static void sldrop(struct sldesc *sl);
107 static int syslink_validate_msg(struct syslink_msg *msg, int bytes);
108 static int syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
109                                  int swapit, int depth);
110
111 static int backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto);
112 static int backend_write_user(struct sldesc *sl, struct slmsg *slmsg);
113 static void backend_reply_user(struct sldesc *sl, struct slmsg *slcmd,
114                                struct slmsg *slrep);
115 static void backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg);
116
117 static int backend_wblocked_kern(struct sldesc *sl, int nbio, sl_proto_t proto);
118 static int backend_write_kern(struct sldesc *sl, struct slmsg *slmsg);
119 static void backend_reply_kern(struct sldesc *sl, struct slmsg *slcmd,
120                                struct slmsg *slrep);
121 static void backend_dispose_kern(struct sldesc *sl, struct slmsg *slmsg);
122
123 /*
124  * Objcache memory backend
125  *
126  * All three object caches return slmsg structures but each is optimized
127  * for syslink message buffers of varying sizes.  We use the slightly
128  * more complex ctor/dtor API in order to provide ready-to-go slmsg's.
129  */
130
131 static struct objcache *sl_objcache_big;
132 static struct objcache *sl_objcache_small;
133 static struct objcache *sl_objcache_none;
134
135 MALLOC_DEFINE(M_SYSLINK, "syslink", "syslink manager");
136
137 static boolean_t slmsg_ctor(void *data, void *private, int ocflags);
138 static void slmsg_dtor(void *data, void *private);
139
140 static
141 void
142 syslinkinit(void *dummy __unused)
143 {
144         size_t n = sizeof(struct slmsg);
145
146         sl_objcache_none = objcache_create_mbacked(M_SYSLINK, n, 0, 64,
147                                                    slmsg_ctor, slmsg_dtor,
148                                                    &sl_objcache_none);
149         sl_objcache_small= objcache_create_mbacked(M_SYSLINK, n, 0, 64,
150                                                    slmsg_ctor, slmsg_dtor,
151                                                    &sl_objcache_small);
152         sl_objcache_big  = objcache_create_mbacked(M_SYSLINK, n, 0, 16,
153                                                    slmsg_ctor, slmsg_dtor,
154                                                    &sl_objcache_big);
155 }
156
157 static
158 boolean_t
159 slmsg_ctor(void *data, void *private, int ocflags)
160 {
161         struct slmsg *slmsg = data;
162
163         bzero(slmsg, sizeof(*slmsg));
164
165         slmsg->oc = *(struct objcache **)private;
166         if (slmsg->oc == sl_objcache_none) {
167                 slmsg->maxsize = 0;
168         } else if (slmsg->oc == sl_objcache_small) {
169                 slmsg->maxsize = SLMSG_SMALL;
170         } else if (slmsg->oc == sl_objcache_big) {
171                 slmsg->maxsize = SLMSG_BIG;
172         } else {
173                 panic("slmsg_ctor: bad objcache?\n");
174         }
175         if (slmsg->maxsize) {
176                 slmsg->msg = kmalloc(slmsg->maxsize,
177                                      M_SYSLINK, M_WAITOK|M_ZERO);
178         }
179         return(TRUE);
180 }
181
182 static
183 void
184 slmsg_dtor(void *data, void *private)
185 {
186         struct slmsg *slmsg = data;
187
188         if (slmsg->maxsize && slmsg->msg) {
189                 kfree(slmsg->msg, M_SYSLINK);
190                 slmsg->msg = NULL;
191         }
192         slmsg->oc = NULL;
193 }
194
195 SYSINIT(syslink, SI_BOOT2_MACHDEP, SI_ORDER_ANY, syslinkinit, NULL)
196
197 static int rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2);
198 RB_GENERATE2(slmsg_rb_tree, slmsg, rbnode, rb_slmsg_compare,
199              sysid_t, msg->sm_msgid);
200
201 /*
202  * Sysctl elements
203  */
204 static int syslink_enabled;
205 SYSCTL_NODE(_kern, OID_AUTO, syslink, CTLFLAG_RW, 0, "Pipe operation");
206 SYSCTL_INT(_kern_syslink, OID_AUTO, enabled,
207             CTLFLAG_RW, &syslink_enabled, 0, "Enable SYSLINK");
208 static size_t syslink_bufsize = 65536;
209 SYSCTL_UINT(_kern_syslink, OID_AUTO, bufsize,
210             CTLFLAG_RW, &syslink_bufsize, 0, "Maximum buffer size");
211
212 /*
213  * Fileops API - typically used to glue a userland frontend with a
214  *               kernel backend.
215  */
216
217 static int slfileop_read(struct file *fp, struct uio *uio,
218                         struct ucred *cred, int flags);
219 static int slfileop_write(struct file *fp, struct uio *uio,
220                          struct ucred *cred, int flags);
221 static int slfileop_close(struct file *fp);
222 static int slfileop_stat(struct file *fp, struct stat *sb, struct ucred *cred);
223 static int slfileop_shutdown(struct file *fp, int how);
224 static int slfileop_ioctl(struct file *fp, u_long cmd, caddr_t data,
225                          struct ucred *cred);
226 static int slfileop_poll(struct file *fp, int events, struct ucred *cred);
227 static int slfileop_kqfilter(struct file *fp, struct knote *kn);
228
229 static struct fileops syslinkops = {
230     .fo_read =          slfileop_read,
231     .fo_write =         slfileop_write,
232     .fo_ioctl =         slfileop_ioctl,
233     .fo_poll =          slfileop_poll,
234     .fo_kqfilter =      slfileop_kqfilter,
235     .fo_stat =          slfileop_stat,
236     .fo_close =         slfileop_close,
237     .fo_shutdown =      slfileop_shutdown
238 };
239
240 /************************************************************************
241  *                      PRIMARY SYSTEM CALL INTERFACE                   *
242  ************************************************************************
243  *
244  * syslink(int cmd, struct syslink_info *info, size_t bytes)
245  */
246 int
247 sys_syslink(struct syslink_args *uap)
248 {
249         union syslink_info_all info;
250         int error;
251
252         /*
253          * System call is under construction and disabled by default. 
254          * Superuser access is also required for now, but eventually
255          * will not be needed.
256          */
257         if (syslink_enabled == 0)
258                 return (EAUTH);
259         error = suser(curthread);
260         if (error)
261                 return (error);
262
263         /*
264          * Load and validate the info structure.  Unloaded bytes are zerod
265          * out.  The label field must always be 0-filled, even if not used
266          * for a command.
267          */
268         bzero(&info, sizeof(info));
269         if ((unsigned)uap->bytes <= sizeof(info)) {
270                 if (uap->bytes)
271                         error = copyin(uap->info, &info, uap->bytes);
272         } else {
273                 error = EINVAL;
274         }
275         if (error)
276                 return (error);
277
278         /*
279          * Process the command
280          */
281         switch(uap->cmd) {
282         case SYSLINK_CMD_NEW:
283                 error = syslink_cmd_new(&info.cmd_new, &uap->sysmsg_result);
284                 break;
285         default:
286                 error = EINVAL;
287                 break;
288         }
289         if (error == 0 && info.head.wbflag)
290                 copyout(&info, uap->info, uap->bytes);
291         return (error);
292 }
293
294 /*
295  * Create a linked pair of descriptors, like a pipe.
296  */
297 static
298 int
299 syslink_cmd_new(struct syslink_info_new *info, int *result)
300 {
301         struct proc *p = curproc;
302         struct file *fp1;
303         struct file *fp2;
304         struct sldesc *sl;
305         struct sldesc *slpeer;
306         int error;
307         int fd1, fd2;
308
309         error = falloc(p, &fp1, &fd1);
310         if (error)
311                 return(error);
312         error = falloc(p, &fp2, &fd2);
313         if (error) {
314                 fsetfd(p, NULL, fd1);
315                 fdrop(fp1);
316                 return(error);
317         }
318         slpeer = allocsldesc(NULL);
319         slpeer->backend_wblocked = backend_wblocked_user;
320         slpeer->backend_write = backend_write_user;
321         slpeer->backend_reply = backend_reply_user;
322         slpeer->backend_dispose = backend_dispose_user;
323         sl = allocsldesc(slpeer->common);
324         sl->peer = slpeer;
325         sl->backend_wblocked = backend_wblocked_user;
326         sl->backend_write = backend_write_user;
327         sl->backend_reply = backend_reply_user;
328         sl->backend_dispose = backend_dispose_user;
329         slpeer->peer = sl;
330
331         setsldescfp(sl, fp1);
332         setsldescfp(slpeer, fp2);
333
334         fsetfd(p, fp1, fd1);
335         fdrop(fp1);
336         fsetfd(p, fp2, fd2);
337         fdrop(fp2);
338
339         info->head.wbflag = 1;  /* write back */
340         info->fds[0] = fd1;
341         info->fds[1] = fd2;
342
343         return(0);
344 }
345
346 /************************************************************************
347  *                      LOW LEVEL SLDESC SUPPORT                        *
348  ************************************************************************
349  *
350  */
351
352 static
353 struct sldesc *
354 allocsldesc(struct slcommon *common)
355 {
356         struct sldesc *sl;
357
358         sl = kmalloc(sizeof(struct sldesc), M_SYSLINK, M_WAITOK|M_ZERO);
359         if (common == NULL)
360                 common = kmalloc(sizeof(*common), M_SYSLINK, M_WAITOK|M_ZERO);
361         TAILQ_INIT(&sl->inq);           /* incoming requests */
362         RB_INIT(&sl->reply_rb_root);    /* match incoming replies */
363         spin_init(&sl->spin);
364         sl->common = common;
365         ++common->refs;
366         return(sl);
367 }
368
369 static
370 void
371 setsldescfp(struct sldesc *sl, struct file *fp)
372 {
373         sl->xfp = fp;
374         fp->f_type = DTYPE_SYSLINK;
375         fp->f_flag = FREAD | FWRITE;
376         fp->f_ops = &syslinkops;
377         fp->f_data = sl;
378 }
379
380 /*
381  * Red-black tree compare function
382  */
383 static
384 int
385 rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2)
386 {
387         if (msg1->msg->sm_msgid < msg2->msg->sm_msgid)
388                 return(-1);
389         if (msg1->msg->sm_msgid == msg2->msg->sm_msgid)
390                 return(0);
391         return(1);
392 }
393
394 static
395 void
396 shutdownsldesc(struct sldesc *sl, int how)
397 {
398         struct slmsg *slmsg;
399         int rhow;
400
401         shutdownsldesc2(sl, how);
402
403         /*
404          * Return unread and unreplied messages
405          */
406         spin_lock_wr(&sl->spin);
407         while ((slmsg = TAILQ_FIRST(&sl->inq)) != NULL) {
408                 TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
409                 spin_unlock_wr(&sl->spin);
410                 if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
411                         sl->repbytes -= slmsg->maxsize;
412                         slmsg->flags &= ~SLMSGF_ONINQ;
413                         sl->peer->backend_dispose(sl->peer, slmsg);
414                 }
415                 /* leave ONINQ set for commands, it will cleared below */
416                 spin_lock_wr(&sl->spin);
417         }
418         while ((slmsg = RB_ROOT(&sl->reply_rb_root)) != NULL) {
419                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slmsg);
420                 sl->cmdbytes -= slmsg->maxsize;
421                 spin_unlock_wr(&sl->spin);
422                 slmsg->flags &= ~SLMSGF_ONINQ;
423                 sl->peer->backend_reply(sl->peer, slmsg, NULL);
424                 spin_lock_wr(&sl->spin);
425         }
426         spin_unlock_wr(&sl->spin);
427
428         /*
429          * Call shutdown on the peer with the opposite flags
430          */
431         rhow = 0;
432         switch(how) {
433         case SHUT_RD:
434                 rhow = SHUT_WR;
435                 break;
436         case SHUT_WR:
437                 rhow = SHUT_WR;
438                 break;
439         case SHUT_RDWR:
440                 rhow = SHUT_RDWR;
441                 break;
442         }
443         shutdownsldesc2(sl->peer, rhow);
444 }
445
446 static
447 void
448 shutdownsldesc2(struct sldesc *sl, int how)
449 {
450         spin_lock_wr(&sl->spin);
451         switch(how) {
452         case SHUT_RD:
453                 sl->flags |= SLF_RSHUTDOWN;
454                 break;
455         case SHUT_WR:
456                 sl->flags |= SLF_WSHUTDOWN;
457                 break;
458         case SHUT_RDWR:
459                 sl->flags |= SLF_RSHUTDOWN | SLF_WSHUTDOWN;
460                 break;
461         }
462         spin_unlock_wr(&sl->spin);
463
464         /*
465          * Handle signaling on the user side
466          */
467         if (how & SHUT_RD) {
468                 if (sl->rwaiters)
469                         wakeup(&sl->rwaiters);
470         }
471         if (how & SHUT_WR) {
472                 if (sl->wblocked) {
473                         sl->wblocked = 0;       /* race ok */
474                         wakeup(&sl->wblocked);
475                 }
476         }
477 }
478
479 static
480 void
481 sldrop(struct sldesc *sl)
482 {
483         struct sldesc *slpeer;
484
485         spin_lock_wr(&sl->common->spin);
486         if (--sl->common->refs == 0) {
487                 spin_unlock_wr(&sl->common->spin);
488                 if ((slpeer = sl->peer) != NULL) {
489                         sl->peer = NULL;
490                         slpeer->peer = NULL;
491                         slpeer->common = NULL;
492                         KKASSERT(slpeer->xfp == NULL);
493                         KKASSERT(TAILQ_EMPTY(&slpeer->inq));
494                         KKASSERT(RB_EMPTY(&slpeer->reply_rb_root));
495                         kfree(slpeer, M_SYSLINK);
496                 }
497                 KKASSERT(sl->xfp == NULL);
498                 KKASSERT(TAILQ_EMPTY(&sl->inq));
499                 KKASSERT(RB_EMPTY(&sl->reply_rb_root));
500                 kfree(sl->common, M_SYSLINK);
501                 sl->common = NULL;
502                 kfree(sl, M_SYSLINK);
503         } else {
504                 spin_unlock_wr(&sl->common->spin);
505         }
506 }
507
508 /************************************************************************
509  *                              FILEOPS API                             *
510  ************************************************************************
511  *
512  * Implement userland fileops.
513  *
514  * MPSAFE ops
515  */
516 static
517 int
518 slfileop_read(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
519 {
520         struct sldesc *sl = fp->f_data;         /* fp refed on call */
521         struct slmsg *slmsg;
522         struct iovec *iov0;
523         int error;
524         int nbio;
525
526         /*
527          * Kinda messy.  Figure out the non-blocking state
528          */
529         if (flags & O_FBLOCKING)
530                 nbio = 0;
531         else if (flags & O_FNONBLOCKING)
532                 nbio = 1;
533         else if (fp->f_flag & O_NONBLOCK)
534                 nbio = 1;
535         else
536                 nbio = 0;
537
538         /*
539          * Validate the uio
540          */
541         if (uio->uio_iovcnt < 1) {
542                 error = 0;
543                 goto done2;
544         }
545         iov0 = &uio->uio_iov[0];
546
547         /*
548          * Get a message, blocking if necessary.
549          */
550         spin_lock_wr(&sl->spin);
551         while ((slmsg = TAILQ_FIRST(&sl->inq)) == NULL) {
552                 if (sl->flags & SLF_RSHUTDOWN) {
553                         error = 0;
554                         goto done1;
555                 }
556                 if (nbio) {
557                         error = EAGAIN;
558                         goto done1;
559                 }
560                 ++sl->rwaiters;
561                 error = msleep(&sl->rwaiters, &sl->spin, PCATCH, "slrmsg", 0);
562                 --sl->rwaiters;
563                 if (error)
564                         goto done1;
565         }
566
567         /*
568          * We have a message.  If there isn't enough space, return
569          * ENOSPC without dequeueing it.
570          */
571         if (slmsg->msgsize > iov0->iov_len) {
572                 error = ENOSPC;
573                 goto done1;
574         }
575
576         /*
577          * Dequeue the message.  Adjust repbytes immediately.  cmdbytes
578          * are adjusted when the command is replied to, not here.
579          */
580         TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
581         if (slmsg->msg->sm_proto & SM_PROTO_REPLY)
582                 sl->repbytes -= slmsg->maxsize;
583         spin_unlock_wr(&sl->spin);
584
585         /*
586          * Load the message data into the user buffer and clean up.  We
587          * may have to wakeup blocked writers.
588          */
589         if ((error = uiomove((void *)slmsg->msg, slmsg->msgsize, uio)) == 0) {
590                 /* yip yip */
591         }
592
593         if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
594                 /*
595                  * Dispose of any received reply after we've copied it
596                  * to userland.  We don't need the slmsg any more.
597                  */
598                 slmsg->flags &= ~SLMSGF_ONINQ;
599                 sl->peer->backend_dispose(sl->peer, slmsg);
600                 if (sl->wblocked && sl->repbytes < syslink_bufsize) {
601                         sl->wblocked = 0;       /* MP race ok here */
602                         wakeup(&sl->wblocked);
603                 }
604         } else if (error) {
605                 /*
606                  * Reply to a command that we failed to copy to userspace.
607                  */
608                 spin_lock_wr(&sl->spin);
609                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slmsg);
610                 sl->cmdbytes -= slmsg->maxsize;
611                 spin_unlock_wr(&sl->spin);
612                 slmsg->flags &= ~SLMSGF_ONINQ;
613                 sl->peer->backend_reply(sl->peer, slmsg, NULL);
614                 if (sl->wblocked && sl->cmdbytes < syslink_bufsize) {
615                         sl->wblocked = 0;       /* MP race ok here */
616                         wakeup(&sl->wblocked);
617                 }
618         } else {
619                 /*
620                  * Leave the command in the RB tree but clear ONINQ now
621                  * that we have returned it to userland so userland can
622                  * reply to it.
623                  */
624                 slmsg->flags &= ~SLMSGF_ONINQ;
625         }
626         return(error);
627 done1:
628         spin_unlock_wr(&sl->spin);
629 done2:
630         return(error);
631 }
632
633 /*
634  * Userland writes syslink message
635  */
636 static
637 int
638 slfileop_write(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
639 {
640         struct sldesc *sl = fp->f_data;
641         struct slmsg *slmsg;
642         struct slmsg *slcmd;
643         struct syslink_msg sltmp;
644         struct iovec *iov0;
645         sl_proto_t proto;
646         int nbio;
647         int error;
648
649         /*
650          * Kinda messy.  Figure out the non-blocking state
651          */
652         if (flags & O_FBLOCKING)
653                 nbio = 0;
654         else if (flags & O_FNONBLOCKING)
655                 nbio = 1;
656         else if (fp->f_flag & O_NONBLOCK)
657                 nbio = 1;
658         else
659                 nbio = 0;
660
661         /*
662          * Validate the uio
663          */
664         if (uio->uio_iovcnt < 1) {
665                 error = 0;
666                 goto done2;
667         }
668         iov0 = &uio->uio_iov[0];
669         if (iov0->iov_len > SLMSG_BIG) {
670                 error = EFBIG;
671                 goto done2;
672         }
673
674         /*
675          * Handle the buffer-full case.  slpeer cmdbytes is managed
676          * by the backend function, not us so if the callback just
677          * directly implements the message and never adjusts cmdbytes,
678          * we will never sleep here.
679          */
680         if (sl->flags & SLF_WSHUTDOWN) {
681                 error = EPIPE;
682                 goto done2;
683         }
684
685         /*
686          * Only commands can block the pipe, not replies.  Otherwise a
687          * deadlock is possible.
688          */
689         error = copyin(iov0->iov_base, &sltmp, sizeof(sltmp));
690         if (error)
691                 goto done2;
692         if ((proto = sltmp.sm_proto) & SM_PROTO_ENDIAN_REV)
693                 proto = bswap16(proto);
694         error = sl->peer->backend_wblocked(sl->peer, nbio, proto);
695         if (error)
696                 goto done2;
697
698         /*
699          * Allocate a slmsg and load the message.  Note that the bytes
700          * returned to userland only reflects the primary syslink message
701          * and does not include any DMA buffers.
702          */
703         if (iov0->iov_len <= SLMSG_SMALL)
704                 slmsg = objcache_get(sl_objcache_small, M_WAITOK);
705         else
706                 slmsg = objcache_get(sl_objcache_big, M_WAITOK);
707         slmsg->msgsize = iov0->iov_len;
708
709         error = uiomove((void *)slmsg->msg, iov0->iov_len, uio);
710         if (error)
711                 goto done1;
712         error = syslink_validate_msg(slmsg->msg, slmsg->msgsize);
713         if (error)
714                 goto done1;
715
716         /*
717          * Replies have to be matched up against received commands.
718          */
719         if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
720                 spin_lock_wr(&sl->spin);
721                 slcmd = slmsg_rb_tree_RB_LOOKUP(&sl->reply_rb_root,
722                                                 slmsg->msg->sm_msgid);
723                 if (slcmd == NULL || (slcmd->flags & SLMSGF_ONINQ)) {
724                         error = ENOENT;
725                         spin_unlock_wr(&sl->spin);
726                         goto done1;
727                 }
728                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slcmd);
729                 sl->cmdbytes -= slcmd->maxsize;
730                 spin_unlock_wr(&sl->spin);
731                 sl->peer->backend_reply(sl->peer, slcmd, slmsg);
732                 if (sl->wblocked && sl->cmdbytes < syslink_bufsize) {
733                         sl->wblocked = 0;       /* MP race ok here */
734                         wakeup(&sl->wblocked);
735                 }
736                 /* error is 0 */
737         } else {
738                 error = sl->peer->backend_write(sl->peer, slmsg);
739         }
740 done1:
741         if (error)
742                 objcache_put(slmsg->oc, slmsg);
743 done2:
744         return(error);
745 }
746
747 /*
748  * Close a syslink descriptor.
749  *
750  * Disassociate the syslink from the file descriptor and disconnect from
751  * any peer.
752  */
753 static
754 int
755 slfileop_close(struct file *fp)
756 {
757         struct sldesc *sl;
758
759         /*
760          * Disassociate the file pointer.  Take ownership of the ref on the
761          * sldesc.
762          */
763         sl = fp->f_data;
764         fp->f_data = NULL;
765         fp->f_ops = &badfileops;
766         sl->xfp = NULL;
767
768         /*
769          * Shutdown both directions.  The other side will not issue API
770          * calls to us after we've shutdown both directions.
771          */
772         shutdownsldesc(sl, SHUT_RDWR);
773
774         /*
775          * Cleanup
776          */
777         KKASSERT(sl->cmdbytes == 0);
778         KKASSERT(sl->repbytes == 0);
779         sldrop(sl);
780         return(0);
781 }
782
783 static
784 int
785 slfileop_stat (struct file *fp, struct stat *sb, struct ucred *cred)
786 {
787         return(EINVAL);
788 }
789
790 static
791 int
792 slfileop_shutdown (struct file *fp, int how)
793 {
794         shutdownsldesc((struct sldesc *)fp->f_data, how);
795         return(0);
796 }
797
798 static
799 int
800 slfileop_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
801 {
802         return(EINVAL);
803 }
804
805 static
806 int
807 slfileop_poll (struct file *fp, int events, struct ucred *cred)
808 {
809         return(0);
810 }
811
812 static
813 int
814 slfileop_kqfilter(struct file *fp, struct knote *kn)
815 {
816         return(0);
817 }
818
819 /************************************************************************
820  *                          MESSAGE VALIDATION                          *
821  ************************************************************************
822  *
823  * Validate that the syslink message.  Check that all headers and elements
824  * conform.  Correct the endian if necessary.
825  *
826  * NOTE: If reverse endian needs to be corrected, SE_CMDF_UNTRANSLATED
827  * is recursively flipped on all syslink_elm's in the message.  As the
828  * message traverses the mesh, multiple flips may occur.  It is
829  * up to the RPC protocol layer to correct opaque data payloads and
830  * SE_CMDF_UNTRANSLATED prevents the protocol layer from misinterpreting
831  * a command or reply element which has not been endian-corrected.
832  */
833 static
834 int
835 syslink_validate_msg(struct syslink_msg *msg, int bytes)
836 {
837         int aligned_reclen;
838         int swapit;
839         int error;
840
841         /*
842          * The raw message must be properly-aligned.
843          */
844         if (bytes & SL_ALIGNMASK)
845                 return (EINVAL);
846
847         while (bytes) {
848                 /*
849                  * The message must at least contain the msgid, bytes, and
850                  * protoid.
851                  */
852                 if (bytes < SL_MIN_PAD_SIZE)
853                         return (EINVAL);
854
855                 /*
856                  * Fix the endian if it is reversed.
857                  */
858                 if (msg->sm_proto & SM_PROTO_ENDIAN_REV) {
859                         msg->sm_msgid = bswap64(msg->sm_msgid);
860                         msg->sm_sessid = bswap64(msg->sm_sessid);
861                         msg->sm_bytes = bswap16(msg->sm_bytes);
862                         msg->sm_proto = bswap16(msg->sm_proto);
863                         msg->sm_rlabel = bswap32(msg->sm_rlabel);
864                         if (msg->sm_proto & SM_PROTO_ENDIAN_REV)
865                                 return (EINVAL);
866                         swapit = 1;
867                 } else {
868                         swapit = 0;
869                 }
870
871                 /*
872                  * Validate the contents.  For PADs, the entire payload is
873                  * ignored and the minimum message size can be as small as
874                  * 8 bytes.
875                  */
876                 if (msg->sm_proto == SMPROTO_PAD) {
877                         if (msg->sm_bytes < SL_MIN_PAD_SIZE ||
878                             msg->sm_bytes > bytes) {
879                                 return (EINVAL);
880                         }
881                         /* ignore the entire payload, it can be garbage */
882                 } else {
883                         if (msg->sm_bytes < SL_MIN_MSG_SIZE ||
884                             msg->sm_bytes > bytes) {
885                                 return (EINVAL);
886                         }
887                         error = syslink_validate_elm(
888                                     &msg->sm_head,
889                                     msg->sm_bytes - 
890                                         offsetof(struct syslink_msg,
891                                                  sm_head),
892                                     swapit, SL_MAXDEPTH);
893                         if (error)
894                                 return (error);
895                 }
896
897                 /*
898                  * The aligned payload size must be used to locate the
899                  * next syslink_msg in the buffer.
900                  */
901                 aligned_reclen = SL_MSG_ALIGN(msg->sm_bytes);
902                 bytes -= aligned_reclen;
903                 msg = (void *)((char *)msg + aligned_reclen);
904         }
905         return(0);
906 }
907
908 static
909 int
910 syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
911                      int swapit, int depth)
912 {
913         int aligned_reclen;
914
915         /*
916          * If the buffer isn't big enough to fit the header, stop now!
917          */
918         if (bytes < SL_MIN_ELM_SIZE)
919                 return (EINVAL);
920         /*
921          * All syslink_elm headers are recursively endian-adjusted.  Opaque
922          * data payloads are not.
923          */
924         if (swapit) {
925                 elm->se_cmd = bswap16(elm->se_cmd) ^ SE_CMDF_UNTRANSLATED;
926                 elm->se_bytes = bswap16(elm->se_bytes);
927                 elm->se_aux = bswap32(elm->se_aux);
928         }
929
930         /*
931          * Check element size requirements.
932          */
933         if (elm->se_bytes < SL_MIN_ELM_SIZE || elm->se_bytes > bytes)
934                 return (EINVAL);
935
936         /*
937          * Recursively check structured payloads.  A structured payload may
938          * contain as few as 0 recursive elements.
939          */
940         if (elm->se_cmd & SE_CMDF_STRUCTURED) {
941                 if (depth == 0)
942                         return (EINVAL);
943                 bytes -= SL_MIN_ELM_SIZE;
944                 ++elm;
945                 while (bytes > 0) {
946                         if (syslink_validate_elm(elm, bytes, swapit, depth - 1))
947                                 return (EINVAL);
948                         aligned_reclen = SL_MSG_ALIGN(elm->se_bytes);
949                         elm = (void *)((char *)elm + aligned_reclen);
950                         bytes -= aligned_reclen;
951                 }
952         }
953         return(0);
954 }
955
956 /************************************************************************
957  *                  BACKEND FUNCTIONS - USER DESCRIPTOR                 *
958  ************************************************************************
959  *
960  * Peer backend links are primarily used when userland creates a pair
961  * of linked descriptors.
962  */
963
964 /*
965  * Do any required blocking / nbio handling for attempts to write to
966  * a sldesc associated with a user descriptor.
967  */
968 static
969 int
970 backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto)
971 {
972         int error = 0;
973         int *bytesp = (proto & SM_PROTO_REPLY) ? &sl->repbytes : &sl->cmdbytes;
974
975         /*
976          * Block until sufficient data is drained by the target.  It is
977          * ok to have a MP race against cmdbytes.
978          */
979         if (*bytesp >= syslink_bufsize) {
980                 spin_lock_wr(&sl->spin);
981                 while (*bytesp >= syslink_bufsize) {
982                         if (sl->flags & SLF_WSHUTDOWN) {
983                                 error = EPIPE;
984                                 break;
985                         }
986                         if (nbio) {
987                                 error = EAGAIN;
988                                 break;
989                         }
990                         ++sl->wblocked;
991                         error = msleep(&sl->wblocked, &sl->spin,
992                                        PCATCH, "slwmsg", 0);
993                         if (error)
994                                 break;
995                 }
996                 spin_unlock_wr(&sl->spin);
997         }
998         return (error);
999 }
1000
1001 /*
1002  * Unconditionally write a syslink message to the sldesc associated with
1003  * a user descriptor.  Command messages are also placed in a red-black
1004  * tree so their DMA tag (if any) can be accessed and so they can be
1005  * linked to any reply message.
1006  */
1007 static
1008 int
1009 backend_write_user(struct sldesc *sl, struct slmsg *slmsg)
1010 {
1011         int error;
1012
1013         spin_lock_wr(&sl->spin);
1014         if (sl->flags & SLF_RSHUTDOWN) {
1015                 /*
1016                  * Not accepting new messages
1017                  */
1018                 error = EPIPE;
1019         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
1020                 /*
1021                  * Write a reply
1022                  */
1023                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1024                 sl->repbytes += slmsg->maxsize;
1025                 slmsg->flags |= SLMSGF_ONINQ;
1026                 error = 0;
1027         } else if (RB_INSERT(slmsg_rb_tree, &sl->reply_rb_root, slmsg)) {
1028                 /*
1029                  * Write a command, but there was a msgid collision when
1030                  * we tried to insert it into the RB tree.
1031                  */
1032                 error = EEXIST;
1033         } else {
1034                 /*
1035                  * Write a command, successful insertion into the RB tree.
1036                  */
1037                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1038                 sl->cmdbytes += slmsg->maxsize;
1039                 slmsg->flags |= SLMSGF_ONINQ;
1040                 error = 0;
1041         }
1042         spin_unlock_wr(&sl->spin);
1043         if (sl->rwaiters)
1044                 wakeup(&sl->rwaiters);
1045         return(error);
1046 }
1047
1048 /*
1049  * Our peer is replying a command we previously sent it back to us, along
1050  * with the reply message (if not NULL).  We just queue the reply to
1051  * userland and free of the command.
1052  */
1053 static
1054 void
1055 backend_reply_user(struct sldesc *sl, struct slmsg *slcmd, struct slmsg *slrep)
1056 {
1057         int error;
1058
1059         objcache_put(slcmd->oc, slcmd);
1060         if (slrep) {
1061                 spin_lock_wr(&sl->spin);
1062                 if ((sl->flags & SLF_RSHUTDOWN) == 0) {
1063                         TAILQ_INSERT_TAIL(&sl->inq, slrep, tqnode);
1064                         sl->repbytes += slrep->maxsize;
1065                         error = 0;
1066                 } else {
1067                         error = EPIPE;
1068                 }
1069                 spin_unlock_wr(&sl->spin);
1070                 if (error)
1071                         sl->peer->backend_dispose(sl->peer, slrep);
1072                 else if (sl->rwaiters)
1073                         wakeup(&sl->rwaiters);
1074         }
1075 }
1076
1077 static
1078 void
1079 backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg)
1080 {
1081         objcache_put(slmsg->oc, slmsg);
1082 }
1083
1084 /************************************************************************
1085  *                      KERNEL DRIVER OR FILESYSTEM API                 *
1086  ************************************************************************
1087  *
1088  */
1089
1090 /*
1091  * Create a user<->kernel link, returning the user descriptor in *fdp
1092  * and the kernel descriptor in *kslp.  0 is returned on success, and an
1093  * error code is returned on failure.
1094  */
1095 int
1096 syslink_ukbackend(int *fdp, struct sldesc **kslp)
1097 {
1098         struct proc *p = curproc;
1099         struct file *fp;
1100         struct sldesc *usl;
1101         struct sldesc *ksl;
1102         int error;
1103         int fd;
1104
1105         *fdp = -1;
1106         *kslp = NULL;
1107
1108         error = falloc(p, &fp, &fd);
1109         if (error)
1110                 return(error);
1111         usl = allocsldesc(NULL);
1112         usl->backend_wblocked = backend_wblocked_user;
1113         usl->backend_write = backend_write_user;
1114         usl->backend_reply = backend_reply_user;
1115         usl->backend_dispose = backend_dispose_user;
1116
1117         ksl = allocsldesc(usl->common);
1118         ksl->peer = usl;
1119         ksl->backend_wblocked = backend_wblocked_kern;
1120         ksl->backend_write = backend_write_kern;
1121         ksl->backend_reply = backend_reply_kern;
1122         ksl->backend_dispose = backend_dispose_kern;
1123
1124         usl->peer = ksl;
1125
1126         setsldescfp(usl, fp);
1127         fsetfd(p, fp, fd);
1128         fdrop(fp);
1129
1130         *fdp = fd;
1131         *kslp = ksl;
1132         return(0);
1133 }
1134
1135 /*
1136  * Assign a unique message id, issue a syslink message to userland,
1137  * and wait for a reply.
1138  */
1139 int
1140 syslink_kdomsg(struct sldesc *ksl, struct slmsg *slmsg)
1141 {
1142         struct syslink_msg *msg;
1143         int error;
1144
1145         /*
1146          * Finish initializing slmsg and post it to the red-black tree for
1147          * reply matching.  If the message id is already in use we return
1148          * EEXIST, giving the originator the chance to roll a new msgid.
1149          */
1150         msg = slmsg->msg;
1151         slmsg->msgsize = msg->sm_bytes;
1152         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1153                 return (error);
1154         msg->sm_msgid = allocsysid();
1155
1156         /*
1157          * Issue the request and wait for a matching reply or failure,
1158          * then remove the message from the matching tree and return.
1159          */
1160         error = ksl->peer->backend_write(ksl->peer, slmsg);
1161         spin_lock_wr(&ksl->spin);
1162         if (error == 0) {
1163                 while (slmsg->rep == NULL) {
1164                         error = msleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1165                         /* XXX ignore error for now */
1166                 }
1167                 if (slmsg->rep == (struct slmsg *)-1) {
1168                         error = EIO;
1169                         slmsg->rep = NULL;
1170                 } else {
1171                         error = slmsg->rep->msg->sm_head.se_aux;
1172                 }
1173         }
1174         spin_unlock_wr(&ksl->spin);
1175         return(error);
1176 }
1177
1178 struct slmsg *
1179 syslink_kallocmsg(void)
1180 {
1181         return(objcache_get(sl_objcache_small, M_WAITOK));
1182 }
1183
1184 void
1185 syslink_kfreemsg(struct sldesc *ksl, struct slmsg *slmsg)
1186 {
1187         struct slmsg *rep;
1188
1189         if ((rep = slmsg->rep) != NULL) {
1190                 slmsg->rep = NULL;
1191                 ksl->peer->backend_dispose(ksl->peer, rep);
1192         }
1193         objcache_put(slmsg->oc, slmsg);
1194 }
1195
1196 void
1197 syslink_kshutdown(struct sldesc *ksl, int how)
1198 {
1199         shutdownsldesc(ksl, how);
1200 }
1201
1202 void
1203 syslink_kclose(struct sldesc *ksl)
1204 {
1205         shutdownsldesc(ksl, SHUT_RDWR);
1206         sldrop(ksl);
1207 }
1208
1209 /************************************************************************
1210  *                  BACKEND FUNCTIONS FOR KERNEL API                    *
1211  ************************************************************************
1212  *
1213  * These are the backend functions for a sldesc associated with a kernel
1214  * API.
1215  */
1216
1217 /*
1218  * Our peer wants to write a syslink message to us and is asking us to
1219  * block if our input queue is full.  We don't implement command reception
1220  * so don't block right now.
1221  */
1222 static
1223 int
1224 backend_wblocked_kern(struct sldesc *ksl, int nbio, sl_proto_t proto)
1225 {
1226         /* never blocks */
1227         return(0);
1228 }
1229
1230 /*
1231  * Our peer is writing a request to the kernel.  At the moment we do not
1232  * accept commands.
1233  */
1234 static
1235 int
1236 backend_write_kern(struct sldesc *ksl, struct slmsg *slmsg)
1237 {
1238         return(EOPNOTSUPP);
1239 }
1240
1241 /*
1242  * Our peer wants to reply to a syslink message we sent it earlier.  The
1243  * original command (that we passed to our peer), and the peer's reply
1244  * is specified.  If the peer has failed slrep will be NULL.
1245  */
1246 static
1247 void
1248 backend_reply_kern(struct sldesc *ksl, struct slmsg *slcmd, struct slmsg *slrep)
1249 {
1250         spin_lock_wr(&ksl->spin);
1251         if (slrep == NULL) {
1252                 slcmd->rep = (struct slmsg *)-1;
1253         } else {
1254                 slcmd->rep = slrep;
1255         }
1256         spin_unlock_wr(&ksl->spin);
1257         wakeup(slcmd);
1258 }
1259
1260 /*
1261  * Any reply messages we sent to our peer are returned to us for disposal.
1262  * Since we do not currently accept commands from our peer, there will not
1263  * be any replies returned to the peer to dispose of.
1264  */
1265 static
1266 void
1267 backend_dispose_kern(struct sldesc *ksl, struct slmsg *slmsg)
1268 {
1269         panic("backend_dispose_kern: kernel can't accept commands so it "
1270               "certainly did not reply to one!");
1271 }
1272