Rename msleep() to ssleep().
[dragonfly.git] / sys / kern / kern_syslink.c
1 /*
2  * Copyright (c) 2006-2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/kern_syslink.c,v 1.16 2008/10/26 04:29:19 sephe Exp $
35  */
36 /*
37  * This module implements the core syslink() system call and provides
38  * glue for kernel syslink frontends and backends, creating a intra-host
39  * communications infrastructure and DMA transport abstraction.
40  */
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/endian.h>
46 #include <sys/malloc.h>
47 #include <sys/alist.h>
48 #include <sys/file.h>
49 #include <sys/proc.h>
50 #include <sys/priv.h>
51 #include <sys/lock.h>
52 #include <sys/uio.h>
53 #include <sys/objcache.h>
54 #include <sys/queue.h>
55 #include <sys/thread.h>
56 #include <sys/tree.h>
57 #include <sys/sysctl.h>
58 #include <sys/sysproto.h>
59 #include <sys/mbuf.h>
60 #include <sys/socket.h>
61 #include <sys/socketvar.h>
62 #include <sys/socketops.h>
63 #include <sys/sysref.h>
64 #include <sys/syslink.h>
65 #include <sys/syslink_msg.h>
66 #include <netinet/in.h>
67
68 #include <sys/thread2.h>
69 #include <sys/spinlock2.h>
70 #include <sys/buf2.h>
71
72 #include "opt_syslink.h"
73
74 /*
75  * Syslink Connection abstraction
76  */
77 struct slcommon {
78         struct spinlock spin;
79         int     refs;
80 };
81
82 struct sldesc {
83         struct slmsgq   inq;
84         struct slmsg_rb_tree reply_rb_root; /* replies to requests */
85         struct spinlock spin;
86         struct sldesc   *peer;          /* peer syslink, if any */
87         struct file     *xfp;           /* external file pointer */
88         struct slcommon *common;
89         int     flags;
90         int     rwaiters;               /* number of threads waiting */
91         int     wblocked;               /* blocked waiting for us to drain */
92         size_t  cmdbytes;               /* unreplied commands pending */
93         size_t  repbytes;               /* undrained replies pending */
94         int     (*backend_wblocked)(struct sldesc *, int, sl_proto_t);
95         int     (*backend_write)(struct sldesc *, struct slmsg *);
96         void    (*backend_reply)(struct sldesc *,struct slmsg *,struct slmsg *);
97         void    (*backend_dispose)(struct sldesc *, struct slmsg *);
98 };
99
100 #define SLF_RSHUTDOWN   0x0001
101 #define SLF_WSHUTDOWN   0x0002
102
103 static int syslink_cmd_new(struct syslink_info_new *info, int *result);
104 static struct sldesc *allocsldesc(struct slcommon *common);
105 static void setsldescfp(struct sldesc *sl, struct file *fp);
106 static void shutdownsldesc(struct sldesc *sl, int how);
107 static void shutdownsldesc2(struct sldesc *sl, int how);
108 static void sldrop(struct sldesc *sl);
109 static int syslink_validate_msg(struct syslink_msg *msg, int bytes);
110 static int syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
111                                  int swapit, int depth);
112
113 static int sl_local_mmap(struct slmsg *slmsg, char *base, size_t len);
114 static void sl_local_munmap(struct slmsg *slmsg);
115
116 static int backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto);
117 static int backend_write_user(struct sldesc *sl, struct slmsg *slmsg);
118 static void backend_reply_user(struct sldesc *sl, struct slmsg *slcmd,
119                                struct slmsg *slrep);
120 static void backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg);
121
122 static int backend_wblocked_kern(struct sldesc *sl, int nbio, sl_proto_t proto);
123 static int backend_write_kern(struct sldesc *sl, struct slmsg *slmsg);
124 static void backend_reply_kern(struct sldesc *sl, struct slmsg *slcmd,
125                                struct slmsg *slrep);
126 static void backend_dispose_kern(struct sldesc *sl, struct slmsg *slmsg);
127 static void slmsg_put(struct slmsg *slmsg);
128
129 /*
130  * Objcache memory backend
131  *
132  * All three object caches return slmsg structures but each is optimized
133  * for syslink message buffers of varying sizes.  We use the slightly
134  * more complex ctor/dtor API in order to provide ready-to-go slmsg's.
135  */
136
137 static struct objcache *sl_objcache_big;
138 static struct objcache *sl_objcache_small;
139 static struct objcache *sl_objcache_none;
140
141 MALLOC_DEFINE(M_SYSLINK, "syslink", "syslink manager");
142
143 static boolean_t slmsg_ctor(void *data, void *private, int ocflags);
144 static void slmsg_dtor(void *data, void *private);
145
146 static
147 void
148 syslinkinit(void *dummy __unused)
149 {
150         size_t n = sizeof(struct slmsg);
151
152         sl_objcache_none = objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
153                                                    slmsg_ctor, slmsg_dtor,
154                                                    &sl_objcache_none);
155         sl_objcache_small= objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
156                                                    slmsg_ctor, slmsg_dtor,
157                                                    &sl_objcache_small);
158         sl_objcache_big  = objcache_create_mbacked(M_SYSLINK, n, NULL, 16,
159                                                    slmsg_ctor, slmsg_dtor,
160                                                    &sl_objcache_big);
161 }
162
163 static
164 boolean_t
165 slmsg_ctor(void *data, void *private, int ocflags)
166 {
167         struct slmsg *slmsg = data;
168
169         bzero(slmsg, sizeof(*slmsg));
170
171         slmsg->oc = *(struct objcache **)private;
172         if (slmsg->oc == sl_objcache_none) {
173                 slmsg->maxsize = 0;
174         } else if (slmsg->oc == sl_objcache_small) {
175                 slmsg->maxsize = SLMSG_SMALL;
176         } else if (slmsg->oc == sl_objcache_big) {
177                 slmsg->maxsize = SLMSG_BIG;
178         } else {
179                 panic("slmsg_ctor: bad objcache?\n");
180         }
181         if (slmsg->maxsize) {
182                 slmsg->msg = kmalloc(slmsg->maxsize,
183                                      M_SYSLINK, M_WAITOK|M_ZERO);
184         }
185         xio_init(&slmsg->xio);
186         return(TRUE);
187 }
188
189 static
190 void
191 slmsg_dtor(void *data, void *private)
192 {
193         struct slmsg *slmsg = data;
194
195         if (slmsg->maxsize && slmsg->msg) {
196                 kfree(slmsg->msg, M_SYSLINK);
197                 slmsg->msg = NULL;
198         }
199         slmsg->oc = NULL;
200 }
201
202 SYSINIT(syslink, SI_BOOT2_MACHDEP, SI_ORDER_ANY, syslinkinit, NULL)
203
204 static int rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2);
205 RB_GENERATE2(slmsg_rb_tree, slmsg, rbnode, rb_slmsg_compare,
206              sysid_t, msg->sm_msgid);
207
208 /*
209  * Sysctl elements
210  */
211 static int syslink_enabled;
212 SYSCTL_NODE(_kern, OID_AUTO, syslink, CTLFLAG_RW, 0, "Pipe operation");
213 SYSCTL_INT(_kern_syslink, OID_AUTO, enabled,
214             CTLFLAG_RW, &syslink_enabled, 0, "Enable SYSLINK");
215 static size_t syslink_bufsize = 65536;
216 SYSCTL_UINT(_kern_syslink, OID_AUTO, bufsize,
217             CTLFLAG_RW, &syslink_bufsize, 0, "Maximum buffer size");
218
219 /*
220  * Fileops API - typically used to glue a userland frontend with a
221  *               kernel backend.
222  */
223
224 static int slfileop_read(struct file *fp, struct uio *uio,
225                         struct ucred *cred, int flags);
226 static int slfileop_write(struct file *fp, struct uio *uio,
227                          struct ucred *cred, int flags);
228 static int slfileop_close(struct file *fp);
229 static int slfileop_stat(struct file *fp, struct stat *sb, struct ucred *cred);
230 static int slfileop_shutdown(struct file *fp, int how);
231 static int slfileop_ioctl(struct file *fp, u_long cmd, caddr_t data,
232                          struct ucred *cred);
233 static int slfileop_poll(struct file *fp, int events, struct ucred *cred);
234 static int slfileop_kqfilter(struct file *fp, struct knote *kn);
235
236 static struct fileops syslinkops = {
237     .fo_read =          slfileop_read,
238     .fo_write =         slfileop_write,
239     .fo_ioctl =         slfileop_ioctl,
240     .fo_poll =          slfileop_poll,
241     .fo_kqfilter =      slfileop_kqfilter,
242     .fo_stat =          slfileop_stat,
243     .fo_close =         slfileop_close,
244     .fo_shutdown =      slfileop_shutdown
245 };
246
247 /************************************************************************
248  *                      PRIMARY SYSTEM CALL INTERFACE                   *
249  ************************************************************************
250  *
251  * syslink(int cmd, struct syslink_info *info, size_t bytes)
252  */
253 int
254 sys_syslink(struct syslink_args *uap)
255 {
256         union syslink_info_all info;
257         int error;
258
259         /*
260          * System call is under construction and disabled by default. 
261          * Superuser access is also required for now, but eventually
262          * will not be needed.
263          */
264         if (syslink_enabled == 0)
265                 return (EAUTH);
266         error = priv_check(curthread, PRIV_ROOT);
267         if (error)
268                 return (error);
269
270         /*
271          * Load and validate the info structure.  Unloaded bytes are zerod
272          * out.  The label field must always be 0-filled, even if not used
273          * for a command.
274          */
275         bzero(&info, sizeof(info));
276         if ((unsigned)uap->bytes <= sizeof(info)) {
277                 if (uap->bytes)
278                         error = copyin(uap->info, &info, uap->bytes);
279         } else {
280                 error = EINVAL;
281         }
282         if (error)
283                 return (error);
284
285         /*
286          * Process the command
287          */
288         switch(uap->cmd) {
289         case SYSLINK_CMD_NEW:
290                 error = syslink_cmd_new(&info.cmd_new, &uap->sysmsg_result);
291                 break;
292         default:
293                 error = EINVAL;
294                 break;
295         }
296         if (error == 0 && info.head.wbflag)
297                 copyout(&info, uap->info, uap->bytes);
298         return (error);
299 }
300
301 /*
302  * Create a linked pair of descriptors, like a pipe.
303  */
304 static
305 int
306 syslink_cmd_new(struct syslink_info_new *info, int *result)
307 {
308         struct proc *p = curproc;
309         struct file *fp1;
310         struct file *fp2;
311         struct sldesc *sl;
312         struct sldesc *slpeer;
313         int error;
314         int fd1, fd2;
315
316         error = falloc(p, &fp1, &fd1);
317         if (error)
318                 return(error);
319         error = falloc(p, &fp2, &fd2);
320         if (error) {
321                 fsetfd(p, NULL, fd1);
322                 fdrop(fp1);
323                 return(error);
324         }
325         slpeer = allocsldesc(NULL);
326         slpeer->backend_wblocked = backend_wblocked_user;
327         slpeer->backend_write = backend_write_user;
328         slpeer->backend_reply = backend_reply_user;
329         slpeer->backend_dispose = backend_dispose_user;
330         sl = allocsldesc(slpeer->common);
331         sl->peer = slpeer;
332         sl->backend_wblocked = backend_wblocked_user;
333         sl->backend_write = backend_write_user;
334         sl->backend_reply = backend_reply_user;
335         sl->backend_dispose = backend_dispose_user;
336         slpeer->peer = sl;
337
338         setsldescfp(sl, fp1);
339         setsldescfp(slpeer, fp2);
340
341         fsetfd(p, fp1, fd1);
342         fdrop(fp1);
343         fsetfd(p, fp2, fd2);
344         fdrop(fp2);
345
346         info->head.wbflag = 1;  /* write back */
347         info->fds[0] = fd1;
348         info->fds[1] = fd2;
349
350         return(0);
351 }
352
353 /************************************************************************
354  *                      LOW LEVEL SLDESC SUPPORT                        *
355  ************************************************************************
356  *
357  */
358
359 static
360 struct sldesc *
361 allocsldesc(struct slcommon *common)
362 {
363         struct sldesc *sl;
364
365         sl = kmalloc(sizeof(struct sldesc), M_SYSLINK, M_WAITOK|M_ZERO);
366         if (common == NULL)
367                 common = kmalloc(sizeof(*common), M_SYSLINK, M_WAITOK|M_ZERO);
368         TAILQ_INIT(&sl->inq);           /* incoming requests */
369         RB_INIT(&sl->reply_rb_root);    /* match incoming replies */
370         spin_init(&sl->spin);
371         sl->common = common;
372         ++common->refs;
373         return(sl);
374 }
375
376 static
377 void
378 setsldescfp(struct sldesc *sl, struct file *fp)
379 {
380         sl->xfp = fp;
381         fp->f_type = DTYPE_SYSLINK;
382         fp->f_flag = FREAD | FWRITE;
383         fp->f_ops = &syslinkops;
384         fp->f_data = sl;
385 }
386
387 /*
388  * Red-black tree compare function
389  */
390 static
391 int
392 rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2)
393 {
394         if (msg1->msg->sm_msgid < msg2->msg->sm_msgid)
395                 return(-1);
396         if (msg1->msg->sm_msgid == msg2->msg->sm_msgid)
397                 return(0);
398         return(1);
399 }
400
401 static
402 void
403 shutdownsldesc(struct sldesc *sl, int how)
404 {
405         struct slmsg *slmsg;
406         int rhow;
407
408         shutdownsldesc2(sl, how);
409
410         /*
411          * Return unread and unreplied messages
412          */
413         spin_lock_wr(&sl->spin);
414         while ((slmsg = TAILQ_FIRST(&sl->inq)) != NULL) {
415                 TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
416                 spin_unlock_wr(&sl->spin);
417                 if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
418                         sl->repbytes -= slmsg->maxsize;
419                         slmsg->flags &= ~SLMSGF_ONINQ;
420                         sl->peer->backend_dispose(sl->peer, slmsg);
421                 }
422                 /* leave ONINQ set for commands, it will cleared below */
423                 spin_lock_wr(&sl->spin);
424         }
425         while ((slmsg = RB_ROOT(&sl->reply_rb_root)) != NULL) {
426                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slmsg);
427                 sl->cmdbytes -= slmsg->maxsize;
428                 spin_unlock_wr(&sl->spin);
429                 slmsg->flags &= ~SLMSGF_ONINQ;
430                 sl->peer->backend_reply(sl->peer, slmsg, NULL);
431                 spin_lock_wr(&sl->spin);
432         }
433         spin_unlock_wr(&sl->spin);
434
435         /*
436          * Call shutdown on the peer with the opposite flags
437          */
438         rhow = 0;
439         switch(how) {
440         case SHUT_RD:
441                 rhow = SHUT_WR;
442                 break;
443         case SHUT_WR:
444                 rhow = SHUT_WR;
445                 break;
446         case SHUT_RDWR:
447                 rhow = SHUT_RDWR;
448                 break;
449         }
450         shutdownsldesc2(sl->peer, rhow);
451 }
452
453 static
454 void
455 shutdownsldesc2(struct sldesc *sl, int how)
456 {
457         spin_lock_wr(&sl->spin);
458         switch(how) {
459         case SHUT_RD:
460                 sl->flags |= SLF_RSHUTDOWN;
461                 break;
462         case SHUT_WR:
463                 sl->flags |= SLF_WSHUTDOWN;
464                 break;
465         case SHUT_RDWR:
466                 sl->flags |= SLF_RSHUTDOWN | SLF_WSHUTDOWN;
467                 break;
468         }
469         spin_unlock_wr(&sl->spin);
470
471         /*
472          * Handle signaling on the user side
473          */
474         if (how & SHUT_RD) {
475                 if (sl->rwaiters)
476                         wakeup(&sl->rwaiters);
477         }
478         if (how & SHUT_WR) {
479                 if (sl->wblocked) {
480                         sl->wblocked = 0;       /* race ok */
481                         wakeup(&sl->wblocked);
482                 }
483         }
484 }
485
486 static
487 void
488 sldrop(struct sldesc *sl)
489 {
490         struct sldesc *slpeer;
491
492         spin_lock_wr(&sl->common->spin);
493         if (--sl->common->refs == 0) {
494                 spin_unlock_wr(&sl->common->spin);
495                 if ((slpeer = sl->peer) != NULL) {
496                         sl->peer = NULL;
497                         slpeer->peer = NULL;
498                         slpeer->common = NULL;
499                         KKASSERT(slpeer->xfp == NULL);
500                         KKASSERT(TAILQ_EMPTY(&slpeer->inq));
501                         KKASSERT(RB_EMPTY(&slpeer->reply_rb_root));
502                         kfree(slpeer, M_SYSLINK);
503                 }
504                 KKASSERT(sl->xfp == NULL);
505                 KKASSERT(TAILQ_EMPTY(&sl->inq));
506                 KKASSERT(RB_EMPTY(&sl->reply_rb_root));
507                 kfree(sl->common, M_SYSLINK);
508                 sl->common = NULL;
509                 kfree(sl, M_SYSLINK);
510         } else {
511                 spin_unlock_wr(&sl->common->spin);
512         }
513 }
514
515 static
516 void
517 slmsg_put(struct slmsg *slmsg)
518 {
519         if (slmsg->flags & SLMSGF_HASXIO) {
520                 slmsg->flags &= ~SLMSGF_HASXIO;
521                 get_mplock();
522                 xio_release(&slmsg->xio);
523                 rel_mplock();
524         }
525         slmsg->flags &= ~SLMSGF_LINMAP;
526         objcache_put(slmsg->oc, slmsg);
527 }
528
529 /************************************************************************
530  *                              FILEOPS API                             *
531  ************************************************************************
532  *
533  * Implement userland fileops.
534  *
535  * MPSAFE ops
536  */
537 static
538 int
539 slfileop_read(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
540 {
541         struct sldesc *sl = fp->f_data;         /* fp refed on call */
542         struct slmsg *slmsg;
543         struct iovec *iov0;
544         struct iovec *iov1;
545         struct syslink_msg *wmsg;
546         int error;
547         int nbio;
548
549         /*
550          * Kinda messy.  Figure out the non-blocking state
551          */
552         if (flags & O_FBLOCKING)
553                 nbio = 0;
554         else if (flags & O_FNONBLOCKING)
555                 nbio = 1;
556         else if (fp->f_flag & O_NONBLOCK)
557                 nbio = 1;
558         else
559                 nbio = 0;
560
561         /*
562          * Validate the uio.
563          *
564          * iov0 - message buffer
565          * iov1 - DMA buffer or backup buffer
566          */
567         if (uio->uio_iovcnt < 1) {
568                 error = 0;
569                 goto done2;
570         }
571         iov0 = &uio->uio_iov[0];
572         if (uio->uio_iovcnt > 2) {
573                 error = EINVAL;
574                 goto done2;
575         }
576
577         /*
578          * Get a message, blocking if necessary.
579          */
580         spin_lock_wr(&sl->spin);
581         while ((slmsg = TAILQ_FIRST(&sl->inq)) == NULL) {
582                 if (sl->flags & SLF_RSHUTDOWN) {
583                         error = 0;
584                         goto done1;
585                 }
586                 if (nbio) {
587                         error = EAGAIN;
588                         goto done1;
589                 }
590                 ++sl->rwaiters;
591                 error = ssleep(&sl->rwaiters, &sl->spin, PCATCH, "slrmsg", 0);
592                 --sl->rwaiters;
593                 if (error)
594                         goto done1;
595         }
596         wmsg = slmsg->msg;
597
598         /*
599          * We have a message and still hold the spinlock.  Make sure the
600          * uio has enough room to hold the message.
601          *
602          * Note that replies do not have XIOs.
603          */
604         if (slmsg->msgsize > iov0->iov_len) {
605                 error = ENOSPC;
606                 goto done1;
607         }
608         if (slmsg->xio.xio_bytes) {
609                 if (uio->uio_iovcnt != 2) {
610                         error = ENOSPC;
611                         goto done1;
612                 }
613                 iov1 = &uio->uio_iov[1];
614                 if (slmsg->xio.xio_bytes > iov1->iov_len) {
615                         error = ENOSPC;
616                         goto done1;
617                 }
618         } else {
619                 iov1 = NULL;
620         }
621
622         /*
623          * Dequeue the message.  Adjust repbytes immediately.  cmdbytes
624          * are adjusted when the command is replied to, not here.
625          */
626         TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
627         if (slmsg->msg->sm_proto & SM_PROTO_REPLY)
628                 sl->repbytes -= slmsg->maxsize;
629         spin_unlock_wr(&sl->spin);
630
631         /*
632          * Load the message data into the user buffer.
633          *
634          * If receiving a command an XIO may exist specifying a DMA buffer.
635          * For commands, if DMAW is set we have to copy or map the buffer
636          * so the caller can access the data being written.  If DMAR is set
637          * we do not have to copy but we still must map the buffer so the
638          * caller can directly fill in the data being requested.
639          */
640         error = uiomove((void *)slmsg->msg, slmsg->msgsize, uio);
641         if (error == 0 && slmsg->xio.xio_bytes &&
642             (wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
643                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW) {
644                         /*
645                          * Data being passed to caller or being passed in both
646                          * directions, copy or map.
647                          */
648                         get_mplock();
649                         if ((flags & O_MAPONREAD) &&
650                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
651                                 error = sl_local_mmap(slmsg,
652                                                       iov1->iov_base,
653                                                       iov1->iov_len);
654                                 if (error)
655                                 error = xio_copy_xtou(&slmsg->xio, 0,
656                                                       iov1->iov_base,
657                                                       slmsg->xio.xio_bytes);
658                         } else {
659                                 error = xio_copy_xtou(&slmsg->xio, 0,
660                                                       iov1->iov_base,
661                                                       slmsg->xio.xio_bytes);
662                         }
663                         rel_mplock();
664                 } else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR) {
665                         /*
666                          * Data will be passed back to originator, map
667                          * the buffer if we can, else use the backup
668                          * buffer at the same VA supplied by the caller.
669                          */
670                         get_mplock();
671                         if ((flags & O_MAPONREAD) &&
672                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
673                                 error = sl_local_mmap(slmsg,
674                                                       iov1->iov_base,
675                                                       iov1->iov_len);
676                                 error = 0; /* ignore errors */
677                         }
678                         rel_mplock();
679                 }
680         }
681
682         /*
683          * Clean up.
684          */
685         if (error) {
686                 /*
687                  * Requeue the message if we could not read it successfully
688                  */
689                 spin_lock_wr(&sl->spin);
690                 TAILQ_INSERT_HEAD(&sl->inq, slmsg, tqnode);
691                 slmsg->flags |= SLMSGF_ONINQ;
692                 spin_unlock_wr(&sl->spin);
693         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
694                 /*
695                  * Dispose of any received reply after we've copied it
696                  * to userland.  We don't need the slmsg any more.
697                  */
698                 slmsg->flags &= ~SLMSGF_ONINQ;
699                 sl->peer->backend_dispose(sl->peer, slmsg);
700                 if (sl->wblocked && sl->repbytes < syslink_bufsize) {
701                         sl->wblocked = 0;       /* MP race ok here */
702                         wakeup(&sl->wblocked);
703                 }
704         } else {
705                 /*
706                  * Leave the command in the RB tree but clear ONINQ now
707                  * that we have returned it to userland so userland can
708                  * reply to it.
709                  */
710                 slmsg->flags &= ~SLMSGF_ONINQ;
711         }
712         return(error);
713 done1:
714         spin_unlock_wr(&sl->spin);
715 done2:
716         return(error);
717 }
718
719 /*
720  * Userland writes syslink message (optionally with DMA buffer in iov[1]).
721  */
722 static
723 int
724 slfileop_write(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
725 {
726         struct sldesc *sl = fp->f_data;
727         struct slmsg *slmsg;
728         struct slmsg *slcmd;
729         struct syslink_msg sltmp;
730         struct syslink_msg *wmsg;       /* wire message */
731         struct iovec *iov0;
732         struct iovec *iov1;
733         sl_proto_t proto;
734         int nbio;
735         int error;
736         int xflags;
737
738         /*
739          * Kinda messy.  Figure out the non-blocking state
740          */
741         if (flags & O_FBLOCKING)
742                 nbio = 0;
743         else if (flags & O_FNONBLOCKING)
744                 nbio = 1;
745         else if (fp->f_flag & O_NONBLOCK)
746                 nbio = 1;
747         else
748                 nbio = 0;
749
750         /*
751          * Validate the uio
752          */
753         if (uio->uio_iovcnt < 1) {
754                 error = 0;
755                 goto done2;
756         }
757         iov0 = &uio->uio_iov[0];
758         if (iov0->iov_len > SLMSG_BIG) {
759                 error = EFBIG;
760                 goto done2;
761         }
762         if (uio->uio_iovcnt > 2) {
763                 error = EFBIG;
764                 goto done2;
765         }
766         if (uio->uio_iovcnt > 1) {
767                 iov1 = &uio->uio_iov[1];
768                 if (iov1->iov_len > XIO_INTERNAL_SIZE) {
769                         error = EFBIG;
770                         goto done2;
771                 }
772                 if ((intptr_t)iov1->iov_base & PAGE_MASK) {
773                         error = EINVAL;
774                         goto done2;
775                 }
776         } else {
777                 iov1 = NULL;
778         }
779
780         /*
781          * Handle the buffer-full case.  slpeer cmdbytes is managed
782          * by the backend function, not us so if the callback just
783          * directly implements the message and never adjusts cmdbytes,
784          * we will never sleep here.
785          */
786         if (sl->flags & SLF_WSHUTDOWN) {
787                 error = EPIPE;
788                 goto done2;
789         }
790
791         /*
792          * Only commands can block the pipe, not replies.  Otherwise a
793          * deadlock is possible.
794          */
795         error = copyin(iov0->iov_base, &sltmp, sizeof(sltmp));
796         if (error)
797                 goto done2;
798         if ((proto = sltmp.sm_proto) & SM_PROTO_ENDIAN_REV)
799                 proto = bswap16(proto);
800         error = sl->peer->backend_wblocked(sl->peer, nbio, proto);
801         if (error)
802                 goto done2;
803
804         /*
805          * Allocate a slmsg and load the message.  Note that the bytes
806          * returned to userland only reflects the primary syslink message
807          * and does not include any DMA buffers.
808          */
809         if (iov0->iov_len <= SLMSG_SMALL)
810                 slmsg = objcache_get(sl_objcache_small, M_WAITOK);
811         else
812                 slmsg = objcache_get(sl_objcache_big, M_WAITOK);
813         slmsg->msgsize = iov0->iov_len;
814         wmsg = slmsg->msg;
815
816         error = uiomove((void *)wmsg, iov0->iov_len, uio);
817         if (error)
818                 goto done1;
819         error = syslink_validate_msg(wmsg, slmsg->msgsize);
820         if (error)
821                 goto done1;
822
823         if ((wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
824                 /*
825                  * Install the XIO for commands if any DMA flags are set.
826                  *
827                  * XIOF_VMLINEAR requires that the XIO represent a
828                  * contiguous set of pages associated with a single VM
829                  * object (so the reader side can mmap it easily).
830                  *
831                  * XIOF_VMLINEAR might not be set when the kernel sends
832                  * commands to userland so the reader side backs off to
833                  * a backup buffer if it isn't set, but we require it
834                  * for userland writes.
835                  */
836                 xflags = XIOF_VMLINEAR;
837                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR)
838                         xflags |= XIOF_READ | XIOF_WRITE;
839                 else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW)
840                         xflags |= XIOF_READ;
841                 if (xflags && iov1) {
842                         get_mplock();
843                         error = xio_init_ubuf(&slmsg->xio, iov1->iov_base,
844                                               iov1->iov_len, xflags);
845                         rel_mplock();
846                         if (error)
847                                 goto done1;
848                         slmsg->flags |= SLMSGF_HASXIO;
849                 }
850                 error = sl->peer->backend_write(sl->peer, slmsg);
851         } else {
852                 /*
853                  * Replies have to be matched up against received commands.
854                  */
855                 spin_lock_wr(&sl->spin);
856                 slcmd = slmsg_rb_tree_RB_LOOKUP(&sl->reply_rb_root,
857                                                 slmsg->msg->sm_msgid);
858                 if (slcmd == NULL || (slcmd->flags & SLMSGF_ONINQ)) {
859                         error = ENOENT;
860                         spin_unlock_wr(&sl->spin);
861                         goto done1;
862                 }
863                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slcmd);
864                 sl->cmdbytes -= slcmd->maxsize;
865                 spin_unlock_wr(&sl->spin);
866
867                 /*
868                  * If the original command specified DMAR, has an xio, and
869                  * our write specifies a DMA buffer, then we can do a
870                  * copyback.  But if we are linearly mapped and the caller
871                  * is using the map base address, then the caller filled in
872                  * the data via the direct memory map and no copyback is
873                  * needed.
874                  */
875                 if ((slcmd->msg->sm_head.se_cmd & SE_CMDF_DMAR) && iov1 &&
876                     (slcmd->flags & SLMSGF_HASXIO) &&
877                     ((slcmd->flags & SLMSGF_LINMAP) == 0 ||
878                      iov1->iov_base != slcmd->vmbase)
879                 ) {
880                         size_t count;
881                         if (iov1->iov_len > slcmd->xio.xio_bytes)
882                                 count = slcmd->xio.xio_bytes;
883                         else
884                                 count = iov1->iov_len;
885                         get_mplock();
886                         error = xio_copy_utox(&slcmd->xio, 0, iov1->iov_base,
887                                               count);
888                         rel_mplock();
889                 }
890
891                 /*
892                  * If we had mapped a DMA buffer, remove it
893                  */
894                 if (slcmd->flags & SLMSGF_LINMAP) {
895                         get_mplock();
896                         sl_local_munmap(slcmd);
897                         rel_mplock();
898                 }
899
900                 /*
901                  * Reply and handle unblocking
902                  */
903                 sl->peer->backend_reply(sl->peer, slcmd, slmsg);
904                 if (sl->wblocked && sl->cmdbytes < syslink_bufsize) {
905                         sl->wblocked = 0;       /* MP race ok here */
906                         wakeup(&sl->wblocked);
907                 }
908
909                 /*
910                  * slmsg has already been dealt with, make sure error is
911                  * 0 so we do not double-free it.
912                  */
913                 error = 0;
914         }
915         /* fall through */
916 done1:
917         if (error)
918                 slmsg_put(slmsg);
919         /* fall through */
920 done2:
921         return(error);
922 }
923
924 /*
925  * Close a syslink descriptor.
926  *
927  * Disassociate the syslink from the file descriptor and disconnect from
928  * any peer.
929  */
930 static
931 int
932 slfileop_close(struct file *fp)
933 {
934         struct sldesc *sl;
935
936         /*
937          * Disassociate the file pointer.  Take ownership of the ref on the
938          * sldesc.
939          */
940         sl = fp->f_data;
941         fp->f_data = NULL;
942         fp->f_ops = &badfileops;
943         sl->xfp = NULL;
944
945         /*
946          * Shutdown both directions.  The other side will not issue API
947          * calls to us after we've shutdown both directions.
948          */
949         shutdownsldesc(sl, SHUT_RDWR);
950
951         /*
952          * Cleanup
953          */
954         KKASSERT(sl->cmdbytes == 0);
955         KKASSERT(sl->repbytes == 0);
956         sldrop(sl);
957         return(0);
958 }
959
960 /*
961  * MPSAFE
962  */
963 static
964 int
965 slfileop_stat (struct file *fp, struct stat *sb, struct ucred *cred)
966 {
967         return(EINVAL);
968 }
969
970 static
971 int
972 slfileop_shutdown (struct file *fp, int how)
973 {
974         shutdownsldesc((struct sldesc *)fp->f_data, how);
975         return(0);
976 }
977
978 static
979 int
980 slfileop_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
981 {
982         return(EINVAL);
983 }
984
985 static
986 int
987 slfileop_poll (struct file *fp, int events, struct ucred *cred)
988 {
989         return(0);
990 }
991
992 static
993 int
994 slfileop_kqfilter(struct file *fp, struct knote *kn)
995 {
996         return(0);
997 }
998
999 /************************************************************************
1000  *                          LOCAL MEMORY MAPPING                        *
1001  ************************************************************************
1002  *
1003  * This feature is currently not implemented
1004  *
1005  */
1006
1007 static
1008 int
1009 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1010 {
1011         return (EOPNOTSUPP);
1012 }
1013
1014 static
1015 void
1016 sl_local_munmap(struct slmsg *slmsg)
1017 {
1018         /* empty */
1019 }
1020
1021 #if 0
1022
1023 static
1024 int
1025 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1026 {
1027         struct vmspace *vms = curproc->p_vmspace;
1028         vm_offset_t addr = (vm_offset_t)base;
1029
1030         /* XXX  check user address range */
1031         error = vm_map_replace(
1032                         &vma->vm_map,
1033                         (vm_offset_t)base, (vm_offset_t)base + len,
1034                         slmsg->xio.xio_pages[0]->object,
1035                         slmsg->xio.xio_pages[0]->pindex << PAGE_SHIFT,
1036                         VM_PROT_READ|VM_PROT_WRITE,
1037                         VM_PROT_READ|VM_PROT_WRITE,
1038                         MAP_DISABLE_SYNCER);
1039         }
1040         if (error == 0) {
1041                 slmsg->flags |= SLMSGF_LINMAP;
1042                 slmsg->vmbase = base;
1043                 slmsg->vmsize = len;
1044         }
1045         return (error);
1046 }
1047
1048 static
1049 void
1050 sl_local_munmap(struct slmsg *slmsg)
1051 {
1052         if (slmsg->flags & SLMSGF_LINMAP) {
1053                 vm_map_remove(&curproc->p_vmspace->vm_map,
1054                               slmsg->vmbase,
1055                               slmsg->vmbase + slcmd->vmsize);
1056                 slmsg->flags &= ~SLMSGF_LINMAP;
1057         }
1058 }
1059
1060 #endif
1061
1062 /************************************************************************
1063  *                          MESSAGE VALIDATION                          *
1064  ************************************************************************
1065  *
1066  * Validate that the syslink message.  Check that all headers and elements
1067  * conform.  Correct the endian if necessary.
1068  *
1069  * NOTE: If reverse endian needs to be corrected, SE_CMDF_UNTRANSLATED
1070  * is recursively flipped on all syslink_elm's in the message.  As the
1071  * message traverses the mesh, multiple flips may occur.  It is
1072  * up to the RPC protocol layer to correct opaque data payloads and
1073  * SE_CMDF_UNTRANSLATED prevents the protocol layer from misinterpreting
1074  * a command or reply element which has not been endian-corrected.
1075  */
1076 static
1077 int
1078 syslink_validate_msg(struct syslink_msg *msg, int bytes)
1079 {
1080         int aligned_reclen;
1081         int swapit;
1082         int error;
1083
1084         /*
1085          * The raw message must be properly-aligned.
1086          */
1087         if (bytes & SL_ALIGNMASK)
1088                 return (EINVAL);
1089
1090         while (bytes) {
1091                 /*
1092                  * The message must at least contain the msgid, bytes, and
1093                  * protoid.
1094                  */
1095                 if (bytes < SL_MIN_PAD_SIZE)
1096                         return (EINVAL);
1097
1098                 /*
1099                  * Fix the endian if it is reversed.
1100                  */
1101                 if (msg->sm_proto & SM_PROTO_ENDIAN_REV) {
1102                         msg->sm_msgid = bswap64(msg->sm_msgid);
1103                         msg->sm_sessid = bswap64(msg->sm_sessid);
1104                         msg->sm_bytes = bswap16(msg->sm_bytes);
1105                         msg->sm_proto = bswap16(msg->sm_proto);
1106                         msg->sm_rlabel = bswap32(msg->sm_rlabel);
1107                         if (msg->sm_proto & SM_PROTO_ENDIAN_REV)
1108                                 return (EINVAL);
1109                         swapit = 1;
1110                 } else {
1111                         swapit = 0;
1112                 }
1113
1114                 /*
1115                  * Validate the contents.  For PADs, the entire payload is
1116                  * ignored and the minimum message size can be as small as
1117                  * 8 bytes.
1118                  */
1119                 if (msg->sm_proto == SMPROTO_PAD) {
1120                         if (msg->sm_bytes < SL_MIN_PAD_SIZE ||
1121                             msg->sm_bytes > bytes) {
1122                                 return (EINVAL);
1123                         }
1124                         /* ignore the entire payload, it can be garbage */
1125                 } else {
1126                         if (msg->sm_bytes < SL_MIN_MSG_SIZE ||
1127                             msg->sm_bytes > bytes) {
1128                                 return (EINVAL);
1129                         }
1130                         error = syslink_validate_elm(
1131                                     &msg->sm_head,
1132                                     msg->sm_bytes - 
1133                                         offsetof(struct syslink_msg,
1134                                                  sm_head),
1135                                     swapit, SL_MAXDEPTH);
1136                         if (error)
1137                                 return (error);
1138                 }
1139
1140                 /*
1141                  * The aligned payload size must be used to locate the
1142                  * next syslink_msg in the buffer.
1143                  */
1144                 aligned_reclen = SL_MSG_ALIGN(msg->sm_bytes);
1145                 bytes -= aligned_reclen;
1146                 msg = (void *)((char *)msg + aligned_reclen);
1147         }
1148         return(0);
1149 }
1150
1151 static
1152 int
1153 syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
1154                      int swapit, int depth)
1155 {
1156         int aligned_reclen;
1157
1158         /*
1159          * If the buffer isn't big enough to fit the header, stop now!
1160          */
1161         if (bytes < SL_MIN_ELM_SIZE)
1162                 return (EINVAL);
1163         /*
1164          * All syslink_elm headers are recursively endian-adjusted.  Opaque
1165          * data payloads are not.
1166          */
1167         if (swapit) {
1168                 elm->se_cmd = bswap16(elm->se_cmd) ^ SE_CMDF_UNTRANSLATED;
1169                 elm->se_bytes = bswap16(elm->se_bytes);
1170                 elm->se_aux = bswap32(elm->se_aux);
1171         }
1172
1173         /*
1174          * Check element size requirements.
1175          */
1176         if (elm->se_bytes < SL_MIN_ELM_SIZE || elm->se_bytes > bytes)
1177                 return (EINVAL);
1178
1179         /*
1180          * Recursively check structured payloads.  A structured payload may
1181          * contain as few as 0 recursive elements.
1182          */
1183         if (elm->se_cmd & SE_CMDF_STRUCTURED) {
1184                 if (depth == 0)
1185                         return (EINVAL);
1186                 bytes -= SL_MIN_ELM_SIZE;
1187                 ++elm;
1188                 while (bytes > 0) {
1189                         if (syslink_validate_elm(elm, bytes, swapit, depth - 1))
1190                                 return (EINVAL);
1191                         aligned_reclen = SL_MSG_ALIGN(elm->se_bytes);
1192                         elm = (void *)((char *)elm + aligned_reclen);
1193                         bytes -= aligned_reclen;
1194                 }
1195         }
1196         return(0);
1197 }
1198
1199 /************************************************************************
1200  *                  BACKEND FUNCTIONS - USER DESCRIPTOR                 *
1201  ************************************************************************
1202  *
1203  * Peer backend links are primarily used when userland creates a pair
1204  * of linked descriptors.
1205  */
1206
1207 /*
1208  * Do any required blocking / nbio handling for attempts to write to
1209  * a sldesc associated with a user descriptor.
1210  */
1211 static
1212 int
1213 backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto)
1214 {
1215         int error = 0;
1216         int *bytesp = (proto & SM_PROTO_REPLY) ? &sl->repbytes : &sl->cmdbytes;
1217
1218         /*
1219          * Block until sufficient data is drained by the target.  It is
1220          * ok to have a MP race against cmdbytes.
1221          */
1222         if (*bytesp >= syslink_bufsize) {
1223                 spin_lock_wr(&sl->spin);
1224                 while (*bytesp >= syslink_bufsize) {
1225                         if (sl->flags & SLF_WSHUTDOWN) {
1226                                 error = EPIPE;
1227                                 break;
1228                         }
1229                         if (nbio) {
1230                                 error = EAGAIN;
1231                                 break;
1232                         }
1233                         ++sl->wblocked;
1234                         error = ssleep(&sl->wblocked, &sl->spin,
1235                                        PCATCH, "slwmsg", 0);
1236                         if (error)
1237                                 break;
1238                 }
1239                 spin_unlock_wr(&sl->spin);
1240         }
1241         return (error);
1242 }
1243
1244 /*
1245  * Unconditionally write a syslink message to the sldesc associated with
1246  * a user descriptor.  Command messages are also placed in a red-black
1247  * tree so their DMA tag (if any) can be accessed and so they can be
1248  * linked to any reply message.
1249  */
1250 static
1251 int
1252 backend_write_user(struct sldesc *sl, struct slmsg *slmsg)
1253 {
1254         int error;
1255
1256         spin_lock_wr(&sl->spin);
1257         if (sl->flags & SLF_RSHUTDOWN) {
1258                 /*
1259                  * Not accepting new messages
1260                  */
1261                 error = EPIPE;
1262         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
1263                 /*
1264                  * Write a reply
1265                  */
1266                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1267                 sl->repbytes += slmsg->maxsize;
1268                 slmsg->flags |= SLMSGF_ONINQ;
1269                 error = 0;
1270         } else if (RB_INSERT(slmsg_rb_tree, &sl->reply_rb_root, slmsg)) {
1271                 /*
1272                  * Write a command, but there was a msgid collision when
1273                  * we tried to insert it into the RB tree.
1274                  */
1275                 error = EEXIST;
1276         } else {
1277                 /*
1278                  * Write a command, successful insertion into the RB tree.
1279                  */
1280                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1281                 sl->cmdbytes += slmsg->maxsize;
1282                 slmsg->flags |= SLMSGF_ONINQ;
1283                 error = 0;
1284         }
1285         spin_unlock_wr(&sl->spin);
1286         if (sl->rwaiters)
1287                 wakeup(&sl->rwaiters);
1288         return(error);
1289 }
1290
1291 /*
1292  * Our peer is replying a command we previously sent it back to us, along
1293  * with the reply message (if not NULL).  We just queue the reply to
1294  * userland and free of the command.
1295  */
1296 static
1297 void
1298 backend_reply_user(struct sldesc *sl, struct slmsg *slcmd, struct slmsg *slrep)
1299 {
1300         int error;
1301
1302         slmsg_put(slcmd);
1303         if (slrep) {
1304                 spin_lock_wr(&sl->spin);
1305                 if ((sl->flags & SLF_RSHUTDOWN) == 0) {
1306                         TAILQ_INSERT_TAIL(&sl->inq, slrep, tqnode);
1307                         sl->repbytes += slrep->maxsize;
1308                         error = 0;
1309                 } else {
1310                         error = EPIPE;
1311                 }
1312                 spin_unlock_wr(&sl->spin);
1313                 if (error)
1314                         sl->peer->backend_dispose(sl->peer, slrep);
1315                 else if (sl->rwaiters)
1316                         wakeup(&sl->rwaiters);
1317         }
1318 }
1319
1320 static
1321 void
1322 backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg)
1323 {
1324         slmsg_put(slmsg);
1325 }
1326
1327 /************************************************************************
1328  *                      KERNEL DRIVER OR FILESYSTEM API                 *
1329  ************************************************************************
1330  *
1331  */
1332
1333 /*
1334  * Create a user<->kernel link, returning the user descriptor in *fdp
1335  * and the kernel descriptor in *kslp.  0 is returned on success, and an
1336  * error code is returned on failure.
1337  */
1338 int
1339 syslink_ukbackend(int *fdp, struct sldesc **kslp)
1340 {
1341         struct proc *p = curproc;
1342         struct file *fp;
1343         struct sldesc *usl;
1344         struct sldesc *ksl;
1345         int error;
1346         int fd;
1347
1348         *fdp = -1;
1349         *kslp = NULL;
1350
1351         error = falloc(p, &fp, &fd);
1352         if (error)
1353                 return(error);
1354         usl = allocsldesc(NULL);
1355         usl->backend_wblocked = backend_wblocked_user;
1356         usl->backend_write = backend_write_user;
1357         usl->backend_reply = backend_reply_user;
1358         usl->backend_dispose = backend_dispose_user;
1359
1360         ksl = allocsldesc(usl->common);
1361         ksl->peer = usl;
1362         ksl->backend_wblocked = backend_wblocked_kern;
1363         ksl->backend_write = backend_write_kern;
1364         ksl->backend_reply = backend_reply_kern;
1365         ksl->backend_dispose = backend_dispose_kern;
1366
1367         usl->peer = ksl;
1368
1369         setsldescfp(usl, fp);
1370         fsetfd(p, fp, fd);
1371         fdrop(fp);
1372
1373         *fdp = fd;
1374         *kslp = ksl;
1375         return(0);
1376 }
1377
1378 /*
1379  * Assign a unique message id, issue a syslink message to userland,
1380  * and wait for a reply.
1381  */
1382 int
1383 syslink_kdomsg(struct sldesc *ksl, struct slmsg *slmsg)
1384 {
1385         struct syslink_msg *msg;
1386         int error;
1387
1388         /*
1389          * Finish initializing slmsg and post it to the red-black tree for
1390          * reply matching.  If the message id is already in use we return
1391          * EEXIST, giving the originator the chance to roll a new msgid.
1392          */
1393         msg = slmsg->msg;
1394         slmsg->msgsize = msg->sm_bytes;
1395         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1396                 return (error);
1397         msg->sm_msgid = allocsysid();
1398
1399         /*
1400          * Issue the request and wait for a matching reply or failure,
1401          * then remove the message from the matching tree and return.
1402          */
1403         error = ksl->peer->backend_write(ksl->peer, slmsg);
1404         spin_lock_wr(&ksl->spin);
1405         if (error == 0) {
1406                 while (slmsg->rep == NULL) {
1407                         error = ssleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1408                         /* XXX ignore error for now */
1409                 }
1410                 if (slmsg->rep == (struct slmsg *)-1) {
1411                         error = EIO;
1412                         slmsg->rep = NULL;
1413                 } else {
1414                         error = slmsg->rep->msg->sm_head.se_aux;
1415                 }
1416         }
1417         spin_unlock_wr(&ksl->spin);
1418         return(error);
1419 }
1420
1421 /*
1422  * Similar to syslink_kdomsg but return immediately instead of
1423  * waiting for a reply.  The kernel must supply a callback function
1424  * which will be made in the context of the user process replying
1425  * to the message.
1426  */
1427 int
1428 syslink_ksendmsg(struct sldesc *ksl, struct slmsg *slmsg,
1429                  void (*func)(struct slmsg *, void *, int), void *arg)
1430 {
1431         struct syslink_msg *msg;
1432         int error;
1433
1434         /*
1435          * Finish initializing slmsg and post it to the red-black tree for
1436          * reply matching.  If the message id is already in use we return
1437          * EEXIST, giving the originator the chance to roll a new msgid.
1438          */
1439         msg = slmsg->msg;
1440         slmsg->msgsize = msg->sm_bytes;
1441         slmsg->callback_func = func;
1442         slmsg->callback_data = arg;
1443         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1444                 return (error);
1445         msg->sm_msgid = allocsysid();
1446
1447         /*
1448          * Issue the request.  If no error occured the operation will be
1449          * in progress, otherwise the operation is considered to have failed
1450          * and the caller can deallocate the slmsg.
1451          */
1452         error = ksl->peer->backend_write(ksl->peer, slmsg);
1453         return (error);
1454 }
1455
1456 int
1457 syslink_kwaitmsg(struct sldesc *ksl, struct slmsg *slmsg)
1458 {
1459         int error;
1460
1461         spin_lock_wr(&ksl->spin);
1462         while (slmsg->rep == NULL) {
1463                 error = ssleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1464                 /* XXX ignore error for now */
1465         }
1466         if (slmsg->rep == (struct slmsg *)-1) {
1467                 error = EIO;
1468                 slmsg->rep = NULL;
1469         } else {
1470                 error = slmsg->rep->msg->sm_head.se_aux;
1471         }
1472         spin_unlock_wr(&ksl->spin);
1473         return(error);
1474 }
1475
1476 struct slmsg *
1477 syslink_kallocmsg(void)
1478 {
1479         return(objcache_get(sl_objcache_small, M_WAITOK));
1480 }
1481
1482 void
1483 syslink_kfreemsg(struct sldesc *ksl, struct slmsg *slmsg)
1484 {
1485         struct slmsg *rep;
1486
1487         if ((rep = slmsg->rep) != NULL) {
1488                 slmsg->rep = NULL;
1489                 ksl->peer->backend_dispose(ksl->peer, rep);
1490         }
1491         slmsg->callback_func = NULL;
1492         slmsg_put(slmsg);
1493 }
1494
1495 void
1496 syslink_kshutdown(struct sldesc *ksl, int how)
1497 {
1498         shutdownsldesc(ksl, how);
1499 }
1500
1501 void
1502 syslink_kclose(struct sldesc *ksl)
1503 {
1504         shutdownsldesc(ksl, SHUT_RDWR);
1505         sldrop(ksl);
1506 }
1507
1508 /*
1509  * Associate a DMA buffer with a kernel syslink message prior to it
1510  * being sent to userland.  The DMA buffer is set up from the point
1511  * of view of the target.
1512  */
1513 int
1514 syslink_kdmabuf_pages(struct slmsg *slmsg, struct vm_page **mbase, int npages)
1515 {
1516         int xflags;
1517         int error;
1518
1519         xflags = XIOF_VMLINEAR;
1520         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1521                 xflags |= XIOF_READ | XIOF_WRITE;
1522         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1523                 xflags |= XIOF_READ;
1524         error = xio_init_pages(&slmsg->xio, mbase, npages, xflags);
1525         slmsg->flags |= SLMSGF_HASXIO;
1526         return (error);
1527 }
1528
1529 /*
1530  * Associate a DMA buffer with a kernel syslink message prior to it
1531  * being sent to userland.  The DMA buffer is set up from the point
1532  * of view of the target.
1533  */
1534 int
1535 syslink_kdmabuf_data(struct slmsg *slmsg, char *base, int bytes)
1536 {
1537         int xflags;
1538
1539         xflags = XIOF_VMLINEAR;
1540         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1541                 xflags |= XIOF_READ | XIOF_WRITE;
1542         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1543                 xflags |= XIOF_READ;
1544         xio_init_kbuf(&slmsg->xio, base, bytes);
1545         slmsg->xio.xio_flags |= xflags;
1546         slmsg->flags |= SLMSGF_HASXIO;
1547         return(0);
1548 }
1549
1550 /************************************************************************
1551  *                  BACKEND FUNCTIONS FOR KERNEL API                    *
1552  ************************************************************************
1553  *
1554  * These are the backend functions for a sldesc associated with a kernel
1555  * API.
1556  */
1557
1558 /*
1559  * Our peer wants to write a syslink message to us and is asking us to
1560  * block if our input queue is full.  We don't implement command reception
1561  * so don't block right now.
1562  */
1563 static
1564 int
1565 backend_wblocked_kern(struct sldesc *ksl, int nbio, sl_proto_t proto)
1566 {
1567         /* never blocks */
1568         return(0);
1569 }
1570
1571 /*
1572  * Our peer is writing a request to the kernel.  At the moment we do not
1573  * accept commands.
1574  */
1575 static
1576 int
1577 backend_write_kern(struct sldesc *ksl, struct slmsg *slmsg)
1578 {
1579         return(EOPNOTSUPP);
1580 }
1581
1582 /*
1583  * Our peer wants to reply to a syslink message we sent it earlier.  The
1584  * original command (that we passed to our peer), and the peer's reply
1585  * is specified.  If the peer has failed slrep will be NULL.
1586  */
1587 static
1588 void
1589 backend_reply_kern(struct sldesc *ksl, struct slmsg *slcmd, struct slmsg *slrep)
1590 {
1591         int error;
1592
1593         spin_lock_wr(&ksl->spin);
1594         if (slrep == NULL) {
1595                 slcmd->rep = (struct slmsg *)-1;
1596                 error = EIO;
1597         } else {
1598                 slcmd->rep = slrep;
1599                 error = slrep->msg->sm_head.se_aux;
1600         }
1601         spin_unlock_wr(&ksl->spin);
1602
1603         /*
1604          * Issue callback or wakeup a synchronous waiter.
1605          */
1606         if (slcmd->callback_func) {
1607                 slcmd->callback_func(slcmd, slcmd->callback_data, error);
1608         } else {
1609                 wakeup(slcmd);
1610         }
1611 }
1612
1613 /*
1614  * Any reply messages we sent to our peer are returned to us for disposal.
1615  * Since we do not currently accept commands from our peer, there will not
1616  * be any replies returned to the peer to dispose of.
1617  */
1618 static
1619 void
1620 backend_dispose_kern(struct sldesc *ksl, struct slmsg *slmsg)
1621 {
1622         panic("backend_dispose_kern: kernel can't accept commands so it "
1623               "certainly did not reply to one!");
1624 }
1625