telnet.1: Remove unnecessary .Pp
[dragonfly.git] / sys / kern / kern_syslink.c
1 /*
2  * Copyright (c) 2006-2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/kern_syslink.c,v 1.16 2008/10/26 04:29:19 sephe Exp $
35  */
36 /*
37  * This module implements the core syslink() system call and provides
38  * glue for kernel syslink frontends and backends, creating a intra-host
39  * communications infrastructure and DMA transport abstraction.
40  */
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/endian.h>
46 #include <sys/malloc.h>
47 #include <sys/alist.h>
48 #include <sys/file.h>
49 #include <sys/proc.h>
50 #include <sys/priv.h>
51 #include <sys/lock.h>
52 #include <sys/uio.h>
53 #include <sys/objcache.h>
54 #include <sys/queue.h>
55 #include <sys/thread.h>
56 #include <sys/tree.h>
57 #include <sys/sysctl.h>
58 #include <sys/sysproto.h>
59 #include <sys/mbuf.h>
60 #include <sys/socket.h>
61 #include <sys/socketvar.h>
62 #include <sys/socketops.h>
63 #include <sys/sysref.h>
64 #include <sys/syslink.h>
65 #include <sys/syslink_msg.h>
66 #include <netinet/in.h>
67
68 #include <sys/thread2.h>
69 #include <sys/spinlock2.h>
70 #include <sys/buf2.h>
71
72 #include "opt_syslink.h"
73
74 /*
75  * Syslink Connection abstraction
76  */
77 struct slcommon {
78         struct spinlock spin;
79         int     refs;
80 };
81
82 struct sldesc {
83         struct slmsgq   inq;
84         struct slmsg_rb_tree reply_rb_root; /* replies to requests */
85         struct spinlock spin;
86         struct sldesc   *peer;          /* peer syslink, if any */
87         struct file     *xfp;           /* external file pointer */
88         struct slcommon *common;
89         int     flags;
90         int     rwaiters;               /* number of threads waiting */
91         int     wblocked;               /* blocked waiting for us to drain */
92         size_t  cmdbytes;               /* unreplied commands pending */
93         size_t  repbytes;               /* undrained replies pending */
94         int     (*backend_wblocked)(struct sldesc *, int, sl_proto_t);
95         int     (*backend_write)(struct sldesc *, struct slmsg *);
96         void    (*backend_reply)(struct sldesc *,struct slmsg *,struct slmsg *);
97         void    (*backend_dispose)(struct sldesc *, struct slmsg *);
98 };
99
100 #define SLF_RSHUTDOWN   0x0001
101 #define SLF_WSHUTDOWN   0x0002
102
103 static int syslink_cmd_new(struct syslink_info_new *info, int *result);
104 static struct sldesc *allocsldesc(struct slcommon *common);
105 static void setsldescfp(struct sldesc *sl, struct file *fp);
106 static void shutdownsldesc(struct sldesc *sl, int how);
107 static void shutdownsldesc2(struct sldesc *sl, int how);
108 static void sldrop(struct sldesc *sl);
109 static int syslink_validate_msg(struct syslink_msg *msg, int bytes);
110 static int syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
111                                  int swapit, int depth);
112
113 static int sl_local_mmap(struct slmsg *slmsg, char *base, size_t len);
114 static void sl_local_munmap(struct slmsg *slmsg);
115
116 static int backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto);
117 static int backend_write_user(struct sldesc *sl, struct slmsg *slmsg);
118 static void backend_reply_user(struct sldesc *sl, struct slmsg *slcmd,
119                                struct slmsg *slrep);
120 static void backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg);
121
122 static int backend_wblocked_kern(struct sldesc *sl, int nbio, sl_proto_t proto);
123 static int backend_write_kern(struct sldesc *sl, struct slmsg *slmsg);
124 static void backend_reply_kern(struct sldesc *sl, struct slmsg *slcmd,
125                                struct slmsg *slrep);
126 static void backend_dispose_kern(struct sldesc *sl, struct slmsg *slmsg);
127 static void slmsg_put(struct slmsg *slmsg);
128
129 /*
130  * Objcache memory backend
131  *
132  * All three object caches return slmsg structures but each is optimized
133  * for syslink message buffers of varying sizes.  We use the slightly
134  * more complex ctor/dtor API in order to provide ready-to-go slmsg's.
135  */
136
137 static struct objcache *sl_objcache_big;
138 static struct objcache *sl_objcache_small;
139 static struct objcache *sl_objcache_none;
140
141 MALLOC_DEFINE(M_SYSLINK, "syslink", "syslink manager");
142
143 static boolean_t slmsg_ctor(void *data, void *private, int ocflags);
144 static void slmsg_dtor(void *data, void *private);
145
146 static
147 void
148 syslinkinit(void *dummy __unused)
149 {
150         size_t n = sizeof(struct slmsg);
151
152         sl_objcache_none = objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
153                                                    slmsg_ctor, slmsg_dtor,
154                                                    &sl_objcache_none);
155         sl_objcache_small= objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
156                                                    slmsg_ctor, slmsg_dtor,
157                                                    &sl_objcache_small);
158         sl_objcache_big  = objcache_create_mbacked(M_SYSLINK, n, NULL, 16,
159                                                    slmsg_ctor, slmsg_dtor,
160                                                    &sl_objcache_big);
161 }
162
163 static
164 boolean_t
165 slmsg_ctor(void *data, void *private, int ocflags)
166 {
167         struct slmsg *slmsg = data;
168
169         bzero(slmsg, sizeof(*slmsg));
170
171         slmsg->oc = *(struct objcache **)private;
172         if (slmsg->oc == sl_objcache_none) {
173                 slmsg->maxsize = 0;
174         } else if (slmsg->oc == sl_objcache_small) {
175                 slmsg->maxsize = SLMSG_SMALL;
176         } else if (slmsg->oc == sl_objcache_big) {
177                 slmsg->maxsize = SLMSG_BIG;
178         } else {
179                 panic("slmsg_ctor: bad objcache?\n");
180         }
181         if (slmsg->maxsize) {
182                 slmsg->msg = kmalloc(slmsg->maxsize,
183                                      M_SYSLINK, M_WAITOK|M_ZERO);
184         }
185         xio_init(&slmsg->xio);
186         return(TRUE);
187 }
188
189 static
190 void
191 slmsg_dtor(void *data, void *private)
192 {
193         struct slmsg *slmsg = data;
194
195         if (slmsg->maxsize && slmsg->msg) {
196                 kfree(slmsg->msg, M_SYSLINK);
197                 slmsg->msg = NULL;
198         }
199         slmsg->oc = NULL;
200 }
201
202 SYSINIT(syslink, SI_BOOT2_MACHDEP, SI_ORDER_ANY, syslinkinit, NULL)
203
204 static int rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2);
205 RB_GENERATE2(slmsg_rb_tree, slmsg, rbnode, rb_slmsg_compare,
206              sysid_t, msg->sm_msgid);
207
208 /*
209  * Sysctl elements
210  */
211 static int syslink_enabled;
212 SYSCTL_NODE(_kern, OID_AUTO, syslink, CTLFLAG_RW, 0, "Pipe operation");
213 SYSCTL_INT(_kern_syslink, OID_AUTO, enabled,
214             CTLFLAG_RW, &syslink_enabled, 0, "Enable SYSLINK");
215 static size_t syslink_bufsize = 65536;
216 SYSCTL_UINT(_kern_syslink, OID_AUTO, bufsize,
217             CTLFLAG_RW, &syslink_bufsize, 0, "Maximum buffer size");
218
219 /*
220  * Fileops API - typically used to glue a userland frontend with a
221  *               kernel backend.
222  */
223
224 static int slfileop_read(struct file *fp, struct uio *uio,
225                         struct ucred *cred, int flags);
226 static int slfileop_write(struct file *fp, struct uio *uio,
227                          struct ucred *cred, int flags);
228 static int slfileop_close(struct file *fp);
229 static int slfileop_stat(struct file *fp, struct stat *sb, struct ucred *cred);
230 static int slfileop_shutdown(struct file *fp, int how);
231 static int slfileop_ioctl(struct file *fp, u_long cmd, caddr_t data,
232                          struct ucred *cred);
233 static int slfileop_poll(struct file *fp, int events, struct ucred *cred);
234 static int slfileop_kqfilter(struct file *fp, struct knote *kn);
235
236 static struct fileops syslinkops = {
237     .fo_read =          slfileop_read,
238     .fo_write =         slfileop_write,
239     .fo_ioctl =         slfileop_ioctl,
240     .fo_poll =          slfileop_poll,
241     .fo_kqfilter =      slfileop_kqfilter,
242     .fo_stat =          slfileop_stat,
243     .fo_close =         slfileop_close,
244     .fo_shutdown =      slfileop_shutdown
245 };
246
247 /************************************************************************
248  *                      PRIMARY SYSTEM CALL INTERFACE                   *
249  ************************************************************************
250  *
251  * syslink(int cmd, struct syslink_info *info, size_t bytes)
252  */
253 int
254 sys_syslink(struct syslink_args *uap)
255 {
256         union syslink_info_all info;
257         int error;
258
259         /*
260          * System call is under construction and disabled by default. 
261          * Superuser access is also required for now, but eventually
262          * will not be needed.
263          */
264         if (syslink_enabled == 0)
265                 return (EAUTH);
266         error = priv_check(curthread, PRIV_ROOT);
267         if (error)
268                 return (error);
269
270         /*
271          * Load and validate the info structure.  Unloaded bytes are zerod
272          * out.  The label field must always be 0-filled, even if not used
273          * for a command.
274          */
275         bzero(&info, sizeof(info));
276         if ((unsigned)uap->bytes <= sizeof(info)) {
277                 if (uap->bytes)
278                         error = copyin(uap->info, &info, uap->bytes);
279         } else {
280                 error = EINVAL;
281         }
282         if (error)
283                 return (error);
284
285         /*
286          * Process the command
287          */
288         switch(uap->cmd) {
289         case SYSLINK_CMD_NEW:
290                 error = syslink_cmd_new(&info.cmd_new, &uap->sysmsg_result);
291                 break;
292         default:
293                 error = EINVAL;
294                 break;
295         }
296         if (error == 0 && info.head.wbflag)
297                 copyout(&info, uap->info, uap->bytes);
298         return (error);
299 }
300
301 /*
302  * Create a linked pair of descriptors, like a pipe.
303  */
304 static
305 int
306 syslink_cmd_new(struct syslink_info_new *info, int *result)
307 {
308         struct proc *p = curproc;
309         struct file *fp1;
310         struct file *fp2;
311         struct sldesc *sl;
312         struct sldesc *slpeer;
313         int error;
314         int fd1, fd2;
315
316         error = falloc(p, &fp1, &fd1);
317         if (error)
318                 return(error);
319         error = falloc(p, &fp2, &fd2);
320         if (error) {
321                 fsetfd(p, NULL, fd1);
322                 fdrop(fp1);
323                 return(error);
324         }
325         slpeer = allocsldesc(NULL);
326         slpeer->backend_wblocked = backend_wblocked_user;
327         slpeer->backend_write = backend_write_user;
328         slpeer->backend_reply = backend_reply_user;
329         slpeer->backend_dispose = backend_dispose_user;
330         sl = allocsldesc(slpeer->common);
331         sl->peer = slpeer;
332         sl->backend_wblocked = backend_wblocked_user;
333         sl->backend_write = backend_write_user;
334         sl->backend_reply = backend_reply_user;
335         sl->backend_dispose = backend_dispose_user;
336         slpeer->peer = sl;
337
338         setsldescfp(sl, fp1);
339         setsldescfp(slpeer, fp2);
340
341         fsetfd(p, fp1, fd1);
342         fdrop(fp1);
343         fsetfd(p, fp2, fd2);
344         fdrop(fp2);
345
346         info->head.wbflag = 1;  /* write back */
347         info->fds[0] = fd1;
348         info->fds[1] = fd2;
349
350         return(0);
351 }
352
353 /************************************************************************
354  *                      LOW LEVEL SLDESC SUPPORT                        *
355  ************************************************************************
356  *
357  */
358
359 static
360 struct sldesc *
361 allocsldesc(struct slcommon *common)
362 {
363         struct sldesc *sl;
364
365         sl = kmalloc(sizeof(struct sldesc), M_SYSLINK, M_WAITOK|M_ZERO);
366         if (common == NULL)
367                 common = kmalloc(sizeof(*common), M_SYSLINK, M_WAITOK|M_ZERO);
368         TAILQ_INIT(&sl->inq);           /* incoming requests */
369         RB_INIT(&sl->reply_rb_root);    /* match incoming replies */
370         spin_init(&sl->spin);
371         sl->common = common;
372         ++common->refs;
373         return(sl);
374 }
375
376 static
377 void
378 setsldescfp(struct sldesc *sl, struct file *fp)
379 {
380         sl->xfp = fp;
381         fp->f_type = DTYPE_SYSLINK;
382         fp->f_flag = FREAD | FWRITE;
383         fp->f_ops = &syslinkops;
384         fp->f_data = sl;
385 }
386
387 /*
388  * Red-black tree compare function
389  */
390 static
391 int
392 rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2)
393 {
394         if (msg1->msg->sm_msgid < msg2->msg->sm_msgid)
395                 return(-1);
396         if (msg1->msg->sm_msgid == msg2->msg->sm_msgid)
397                 return(0);
398         return(1);
399 }
400
401 static
402 void
403 shutdownsldesc(struct sldesc *sl, int how)
404 {
405         struct slmsg *slmsg;
406         int rhow;
407
408         shutdownsldesc2(sl, how);
409
410         /*
411          * Return unread and unreplied messages
412          */
413         spin_lock_wr(&sl->spin);
414         while ((slmsg = TAILQ_FIRST(&sl->inq)) != NULL) {
415                 TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
416                 spin_unlock_wr(&sl->spin);
417                 if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
418                         sl->repbytes -= slmsg->maxsize;
419                         slmsg->flags &= ~SLMSGF_ONINQ;
420                         sl->peer->backend_dispose(sl->peer, slmsg);
421                 }
422                 /* leave ONINQ set for commands, it will cleared below */
423                 spin_lock_wr(&sl->spin);
424         }
425         while ((slmsg = RB_ROOT(&sl->reply_rb_root)) != NULL) {
426                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slmsg);
427                 sl->cmdbytes -= slmsg->maxsize;
428                 spin_unlock_wr(&sl->spin);
429                 slmsg->flags &= ~SLMSGF_ONINQ;
430                 sl->peer->backend_reply(sl->peer, slmsg, NULL);
431                 spin_lock_wr(&sl->spin);
432         }
433         spin_unlock_wr(&sl->spin);
434
435         /*
436          * Call shutdown on the peer with the opposite flags
437          */
438         rhow = 0;
439         switch(how) {
440         case SHUT_RD:
441                 rhow = SHUT_WR;
442                 break;
443         case SHUT_WR:
444                 rhow = SHUT_WR;
445                 break;
446         case SHUT_RDWR:
447                 rhow = SHUT_RDWR;
448                 break;
449         }
450         shutdownsldesc2(sl->peer, rhow);
451 }
452
453 static
454 void
455 shutdownsldesc2(struct sldesc *sl, int how)
456 {
457         spin_lock_wr(&sl->spin);
458         switch(how) {
459         case SHUT_RD:
460                 sl->flags |= SLF_RSHUTDOWN;
461                 break;
462         case SHUT_WR:
463                 sl->flags |= SLF_WSHUTDOWN;
464                 break;
465         case SHUT_RDWR:
466                 sl->flags |= SLF_RSHUTDOWN | SLF_WSHUTDOWN;
467                 break;
468         }
469         spin_unlock_wr(&sl->spin);
470
471         /*
472          * Handle signaling on the user side
473          */
474         if (how & SHUT_RD) {
475                 if (sl->rwaiters)
476                         wakeup(&sl->rwaiters);
477         }
478         if (how & SHUT_WR) {
479                 if (sl->wblocked) {
480                         sl->wblocked = 0;       /* race ok */
481                         wakeup(&sl->wblocked);
482                 }
483         }
484 }
485
486 static
487 void
488 sldrop(struct sldesc *sl)
489 {
490         struct sldesc *slpeer;
491
492         spin_lock_wr(&sl->common->spin);
493         if (--sl->common->refs == 0) {
494                 spin_unlock_wr(&sl->common->spin);
495                 if ((slpeer = sl->peer) != NULL) {
496                         sl->peer = NULL;
497                         slpeer->peer = NULL;
498                         slpeer->common = NULL;
499                         KKASSERT(slpeer->xfp == NULL);
500                         KKASSERT(TAILQ_EMPTY(&slpeer->inq));
501                         KKASSERT(RB_EMPTY(&slpeer->reply_rb_root));
502                         kfree(slpeer, M_SYSLINK);
503                 }
504                 KKASSERT(sl->xfp == NULL);
505                 KKASSERT(TAILQ_EMPTY(&sl->inq));
506                 KKASSERT(RB_EMPTY(&sl->reply_rb_root));
507                 kfree(sl->common, M_SYSLINK);
508                 sl->common = NULL;
509                 kfree(sl, M_SYSLINK);
510         } else {
511                 spin_unlock_wr(&sl->common->spin);
512         }
513 }
514
515 static
516 void
517 slmsg_put(struct slmsg *slmsg)
518 {
519         if (slmsg->flags & SLMSGF_HASXIO) {
520                 slmsg->flags &= ~SLMSGF_HASXIO;
521                 get_mplock();
522                 xio_release(&slmsg->xio);
523                 rel_mplock();
524         }
525         slmsg->flags &= ~SLMSGF_LINMAP;
526         objcache_put(slmsg->oc, slmsg);
527 }
528
529 /************************************************************************
530  *                              FILEOPS API                             *
531  ************************************************************************
532  *
533  * Implement userland fileops.
534  *
535  * MPSAFE ops
536  */
537 static
538 int
539 slfileop_read(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
540 {
541         struct sldesc *sl = fp->f_data;         /* fp refed on call */
542         struct slmsg *slmsg;
543         struct iovec *iov0;
544         struct iovec *iov1;
545         struct syslink_msg *wmsg;
546         int error;
547         int nbio;
548
549         /*
550          * Kinda messy.  Figure out the non-blocking state
551          */
552         if (flags & O_FBLOCKING)
553                 nbio = 0;
554         else if (flags & O_FNONBLOCKING)
555                 nbio = 1;
556         else if (fp->f_flag & O_NONBLOCK)
557                 nbio = 1;
558         else
559                 nbio = 0;
560
561         /*
562          * Validate the uio.
563          *
564          * iov0 - message buffer
565          * iov1 - DMA buffer or backup buffer
566          */
567         if (uio->uio_iovcnt < 1) {
568                 error = 0;
569                 goto done2;
570         }
571         iov0 = &uio->uio_iov[0];
572         if (uio->uio_iovcnt > 2) {
573                 error = EINVAL;
574                 goto done2;
575         }
576
577         /*
578          * Get a message, blocking if necessary.
579          */
580         spin_lock_wr(&sl->spin);
581         while ((slmsg = TAILQ_FIRST(&sl->inq)) == NULL) {
582                 if (sl->flags & SLF_RSHUTDOWN) {
583                         error = 0;
584                         goto done1;
585                 }
586                 if (nbio) {
587                         error = EAGAIN;
588                         goto done1;
589                 }
590                 ++sl->rwaiters;
591                 error = msleep(&sl->rwaiters, &sl->spin, PCATCH, "slrmsg", 0);
592                 --sl->rwaiters;
593                 if (error)
594                         goto done1;
595         }
596         wmsg = slmsg->msg;
597
598         /*
599          * We have a message and still hold the spinlock.  Make sure the
600          * uio has enough room to hold the message.
601          *
602          * Note that replies do not have XIOs.
603          */
604         if (slmsg->msgsize > iov0->iov_len) {
605                 error = ENOSPC;
606                 goto done1;
607         }
608         if (slmsg->xio.xio_bytes) {
609                 if (uio->uio_iovcnt != 2) {
610                         error = ENOSPC;
611                         goto done1;
612                 }
613                 iov1 = &uio->uio_iov[1];
614                 if (slmsg->xio.xio_bytes > iov1->iov_len) {
615                         error = ENOSPC;
616                         goto done1;
617                 }
618         } else {
619                 iov1 = NULL;
620         }
621
622         /*
623          * Dequeue the message.  Adjust repbytes immediately.  cmdbytes
624          * are adjusted when the command is replied to, not here.
625          */
626         TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
627         if (slmsg->msg->sm_proto & SM_PROTO_REPLY)
628                 sl->repbytes -= slmsg->maxsize;
629         spin_unlock_wr(&sl->spin);
630
631         /*
632          * Load the message data into the user buffer.
633          *
634          * If receiving a command an XIO may exist specifying a DMA buffer.
635          * For commands, if DMAW is set we have to copy or map the buffer
636          * so the caller can access the data being written.  If DMAR is set
637          * we do not have to copy but we still must map the buffer so the
638          * caller can directly fill in the data being requested.
639          */
640         error = uiomove((void *)slmsg->msg, slmsg->msgsize, uio);
641         if (error == 0 && slmsg->xio.xio_bytes &&
642             (wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
643                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW) {
644                         /*
645                          * Data being passed to caller or being passed in both
646                          * directions, copy or map.
647                          */
648                         get_mplock();
649                         if ((flags & O_MAPONREAD) &&
650                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
651                                 error = sl_local_mmap(slmsg,
652                                                       iov1->iov_base,
653                                                       iov1->iov_len);
654                                 if (error)
655                                 error = xio_copy_xtou(&slmsg->xio, 0,
656                                                       iov1->iov_base,
657                                                       slmsg->xio.xio_bytes);
658                         } else {
659                                 error = xio_copy_xtou(&slmsg->xio, 0,
660                                                       iov1->iov_base,
661                                                       slmsg->xio.xio_bytes);
662                         }
663                         rel_mplock();
664                 } else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR) {
665                         /*
666                          * Data will be passed back to originator, map
667                          * the buffer if we can, else use the backup
668                          * buffer at the same VA supplied by the caller.
669                          */
670                         get_mplock();
671                         if ((flags & O_MAPONREAD) &&
672                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
673                                 error = sl_local_mmap(slmsg,
674                                                       iov1->iov_base,
675                                                       iov1->iov_len);
676                                 error = 0; /* ignore errors */
677                         }
678                         rel_mplock();
679                 }
680         }
681
682         /*
683          * Clean up.
684          */
685         if (error) {
686                 /*
687                  * Requeue the message if we could not read it successfully
688                  */
689                 spin_lock_wr(&sl->spin);
690                 TAILQ_INSERT_HEAD(&sl->inq, slmsg, tqnode);
691                 slmsg->flags |= SLMSGF_ONINQ;
692                 spin_unlock_wr(&sl->spin);
693         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
694                 /*
695                  * Dispose of any received reply after we've copied it
696                  * to userland.  We don't need the slmsg any more.
697                  */
698                 slmsg->flags &= ~SLMSGF_ONINQ;
699                 sl->peer->backend_dispose(sl->peer, slmsg);
700                 if (sl->wblocked && sl->repbytes < syslink_bufsize) {
701                         sl->wblocked = 0;       /* MP race ok here */
702                         wakeup(&sl->wblocked);
703                 }
704         } else {
705                 /*
706                  * Leave the command in the RB tree but clear ONINQ now
707                  * that we have returned it to userland so userland can
708                  * reply to it.
709                  */
710                 slmsg->flags &= ~SLMSGF_ONINQ;
711         }
712         return(error);
713 done1:
714         spin_unlock_wr(&sl->spin);
715 done2:
716         return(error);
717 }
718
719 /*
720  * Userland writes syslink message (optionally with DMA buffer in iov[1]).
721  */
722 static
723 int
724 slfileop_write(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
725 {
726         struct sldesc *sl = fp->f_data;
727         struct slmsg *slmsg;
728         struct slmsg *slcmd;
729         struct syslink_msg sltmp;
730         struct syslink_msg *wmsg;       /* wire message */
731         struct iovec *iov0;
732         struct iovec *iov1;
733         sl_proto_t proto;
734         int nbio;
735         int error;
736         int xflags;
737
738         /*
739          * Kinda messy.  Figure out the non-blocking state
740          */
741         if (flags & O_FBLOCKING)
742                 nbio = 0;
743         else if (flags & O_FNONBLOCKING)
744                 nbio = 1;
745         else if (fp->f_flag & O_NONBLOCK)
746                 nbio = 1;
747         else
748                 nbio = 0;
749
750         /*
751          * Validate the uio
752          */
753         if (uio->uio_iovcnt < 1) {
754                 error = 0;
755                 goto done2;
756         }
757         iov0 = &uio->uio_iov[0];
758         if (iov0->iov_len > SLMSG_BIG) {
759                 error = EFBIG;
760                 goto done2;
761         }
762         if (uio->uio_iovcnt > 2) {
763                 error = EFBIG;
764                 goto done2;
765         }
766         if (uio->uio_iovcnt > 1) {
767                 iov1 = &uio->uio_iov[1];
768                 if (iov1->iov_len > XIO_INTERNAL_SIZE) {
769                         error = EFBIG;
770                         goto done2;
771                 }
772                 if ((intptr_t)iov1->iov_base & PAGE_MASK) {
773                         error = EINVAL;
774                         goto done2;
775                 }
776         } else {
777                 iov1 = NULL;
778         }
779
780         /*
781          * Handle the buffer-full case.  slpeer cmdbytes is managed
782          * by the backend function, not us so if the callback just
783          * directly implements the message and never adjusts cmdbytes,
784          * we will never sleep here.
785          */
786         if (sl->flags & SLF_WSHUTDOWN) {
787                 error = EPIPE;
788                 goto done2;
789         }
790
791         /*
792          * Only commands can block the pipe, not replies.  Otherwise a
793          * deadlock is possible.
794          */
795         error = copyin(iov0->iov_base, &sltmp, sizeof(sltmp));
796         if (error)
797                 goto done2;
798         if ((proto = sltmp.sm_proto) & SM_PROTO_ENDIAN_REV)
799                 proto = bswap16(proto);
800         error = sl->peer->backend_wblocked(sl->peer, nbio, proto);
801         if (error)
802                 goto done2;
803
804         /*
805          * Allocate a slmsg and load the message.  Note that the bytes
806          * returned to userland only reflects the primary syslink message
807          * and does not include any DMA buffers.
808          */
809         if (iov0->iov_len <= SLMSG_SMALL)
810                 slmsg = objcache_get(sl_objcache_small, M_WAITOK);
811         else
812                 slmsg = objcache_get(sl_objcache_big, M_WAITOK);
813         slmsg->msgsize = iov0->iov_len;
814         wmsg = slmsg->msg;
815
816         error = uiomove((void *)wmsg, iov0->iov_len, uio);
817         if (error)
818                 goto done1;
819         error = syslink_validate_msg(wmsg, slmsg->msgsize);
820         if (error)
821                 goto done1;
822
823         if ((wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
824                 /*
825                  * Install the XIO for commands if any DMA flags are set.
826                  *
827                  * XIOF_VMLINEAR requires that the XIO represent a
828                  * contiguous set of pages associated with a single VM
829                  * object (so the reader side can mmap it easily).
830                  *
831                  * XIOF_VMLINEAR might not be set when the kernel sends
832                  * commands to userland so the reader side backs off to
833                  * a backup buffer if it isn't set, but we require it
834                  * for userland writes.
835                  */
836                 xflags = XIOF_VMLINEAR;
837                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR)
838                         xflags |= XIOF_READ | XIOF_WRITE;
839                 else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW)
840                         xflags |= XIOF_READ;
841                 if (xflags && iov1) {
842                         get_mplock();
843                         error = xio_init_ubuf(&slmsg->xio, iov1->iov_base,
844                                               iov1->iov_len, xflags);
845                         rel_mplock();
846                         if (error)
847                                 goto done1;
848                         slmsg->flags |= SLMSGF_HASXIO;
849                 }
850                 error = sl->peer->backend_write(sl->peer, slmsg);
851         } else {
852                 /*
853                  * Replies have to be matched up against received commands.
854                  */
855                 spin_lock_wr(&sl->spin);
856                 slcmd = slmsg_rb_tree_RB_LOOKUP(&sl->reply_rb_root,
857                                                 slmsg->msg->sm_msgid);
858                 if (slcmd == NULL || (slcmd->flags & SLMSGF_ONINQ)) {
859                         error = ENOENT;
860                         spin_unlock_wr(&sl->spin);
861                         goto done1;
862                 }
863                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slcmd);
864                 sl->cmdbytes -= slcmd->maxsize;
865                 spin_unlock_wr(&sl->spin);
866
867                 /*
868                  * If the original command specified DMAR, has an xio, and
869                  * our write specifies a DMA buffer, then we can do a
870                  * copyback.  But if we are linearly mapped and the caller
871                  * is using the map base address, then the caller filled in
872                  * the data via the direct memory map and no copyback is
873                  * needed.
874                  */
875                 if ((slcmd->msg->sm_head.se_cmd & SE_CMDF_DMAR) && iov1 &&
876                     (slcmd->flags & SLMSGF_HASXIO) &&
877                     ((slcmd->flags & SLMSGF_LINMAP) == 0 ||
878                      iov1->iov_base != slcmd->vmbase)
879                 ) {
880                         size_t count;
881                         if (iov1->iov_len > slcmd->xio.xio_bytes)
882                                 count = slcmd->xio.xio_bytes;
883                         else
884                                 count = iov1->iov_len;
885                         get_mplock();
886                         error = xio_copy_utox(&slcmd->xio, 0, iov1->iov_base,
887                                               count);
888                         rel_mplock();
889                 }
890
891                 /*
892                  * If we had mapped a DMA buffer, remove it
893                  */
894                 if (slcmd->flags & SLMSGF_LINMAP) {
895                         get_mplock();
896                         sl_local_munmap(slcmd);
897                         rel_mplock();
898                 }
899
900                 /*
901                  * Reply and handle unblocking
902                  */
903                 sl->peer->backend_reply(sl->peer, slcmd, slmsg);
904                 if (sl->wblocked && sl->cmdbytes < syslink_bufsize) {
905                         sl->wblocked = 0;       /* MP race ok here */
906                         wakeup(&sl->wblocked);
907                 }
908
909                 /*
910                  * slmsg has already been dealt with, make sure error is
911                  * 0 so we do not double-free it.
912                  */
913                 error = 0;
914         }
915         /* fall through */
916 done1:
917         if (error)
918                 slmsg_put(slmsg);
919         /* fall through */
920 done2:
921         return(error);
922 }
923
924 /*
925  * Close a syslink descriptor.
926  *
927  * Disassociate the syslink from the file descriptor and disconnect from
928  * any peer.
929  */
930 static
931 int
932 slfileop_close(struct file *fp)
933 {
934         struct sldesc *sl;
935
936         /*
937          * Disassociate the file pointer.  Take ownership of the ref on the
938          * sldesc.
939          */
940         sl = fp->f_data;
941         fp->f_data = NULL;
942         fp->f_ops = &badfileops;
943         sl->xfp = NULL;
944
945         /*
946          * Shutdown both directions.  The other side will not issue API
947          * calls to us after we've shutdown both directions.
948          */
949         shutdownsldesc(sl, SHUT_RDWR);
950
951         /*
952          * Cleanup
953          */
954         KKASSERT(sl->cmdbytes == 0);
955         KKASSERT(sl->repbytes == 0);
956         sldrop(sl);
957         return(0);
958 }
959
960 static
961 int
962 slfileop_stat (struct file *fp, struct stat *sb, struct ucred *cred)
963 {
964         return(EINVAL);
965 }
966
967 static
968 int
969 slfileop_shutdown (struct file *fp, int how)
970 {
971         shutdownsldesc((struct sldesc *)fp->f_data, how);
972         return(0);
973 }
974
975 static
976 int
977 slfileop_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
978 {
979         return(EINVAL);
980 }
981
982 static
983 int
984 slfileop_poll (struct file *fp, int events, struct ucred *cred)
985 {
986         return(0);
987 }
988
989 static
990 int
991 slfileop_kqfilter(struct file *fp, struct knote *kn)
992 {
993         return(0);
994 }
995
996 /************************************************************************
997  *                          LOCAL MEMORY MAPPING                        *
998  ************************************************************************
999  *
1000  * This feature is currently not implemented
1001  *
1002  */
1003
1004 static
1005 int
1006 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1007 {
1008         return (EOPNOTSUPP);
1009 }
1010
1011 static
1012 void
1013 sl_local_munmap(struct slmsg *slmsg)
1014 {
1015         /* empty */
1016 }
1017
1018 #if 0
1019
1020 static
1021 int
1022 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1023 {
1024         struct vmspace *vms = curproc->p_vmspace;
1025         vm_offset_t addr = (vm_offset_t)base;
1026
1027         /* XXX  check user address range */
1028         error = vm_map_replace(
1029                         &vma->vm_map,
1030                         (vm_offset_t)base, (vm_offset_t)base + len,
1031                         slmsg->xio.xio_pages[0]->object,
1032                         slmsg->xio.xio_pages[0]->pindex << PAGE_SHIFT,
1033                         VM_PROT_READ|VM_PROT_WRITE,
1034                         VM_PROT_READ|VM_PROT_WRITE,
1035                         MAP_DISABLE_SYNCER);
1036         }
1037         if (error == 0) {
1038                 slmsg->flags |= SLMSGF_LINMAP;
1039                 slmsg->vmbase = base;
1040                 slmsg->vmsize = len;
1041         }
1042         return (error);
1043 }
1044
1045 static
1046 void
1047 sl_local_munmap(struct slmsg *slmsg)
1048 {
1049         if (slmsg->flags & SLMSGF_LINMAP) {
1050                 vm_map_remove(&curproc->p_vmspace->vm_map,
1051                               slmsg->vmbase,
1052                               slmsg->vmbase + slcmd->vmsize);
1053                 slmsg->flags &= ~SLMSGF_LINMAP;
1054         }
1055 }
1056
1057 #endif
1058
1059 /************************************************************************
1060  *                          MESSAGE VALIDATION                          *
1061  ************************************************************************
1062  *
1063  * Validate that the syslink message.  Check that all headers and elements
1064  * conform.  Correct the endian if necessary.
1065  *
1066  * NOTE: If reverse endian needs to be corrected, SE_CMDF_UNTRANSLATED
1067  * is recursively flipped on all syslink_elm's in the message.  As the
1068  * message traverses the mesh, multiple flips may occur.  It is
1069  * up to the RPC protocol layer to correct opaque data payloads and
1070  * SE_CMDF_UNTRANSLATED prevents the protocol layer from misinterpreting
1071  * a command or reply element which has not been endian-corrected.
1072  */
1073 static
1074 int
1075 syslink_validate_msg(struct syslink_msg *msg, int bytes)
1076 {
1077         int aligned_reclen;
1078         int swapit;
1079         int error;
1080
1081         /*
1082          * The raw message must be properly-aligned.
1083          */
1084         if (bytes & SL_ALIGNMASK)
1085                 return (EINVAL);
1086
1087         while (bytes) {
1088                 /*
1089                  * The message must at least contain the msgid, bytes, and
1090                  * protoid.
1091                  */
1092                 if (bytes < SL_MIN_PAD_SIZE)
1093                         return (EINVAL);
1094
1095                 /*
1096                  * Fix the endian if it is reversed.
1097                  */
1098                 if (msg->sm_proto & SM_PROTO_ENDIAN_REV) {
1099                         msg->sm_msgid = bswap64(msg->sm_msgid);
1100                         msg->sm_sessid = bswap64(msg->sm_sessid);
1101                         msg->sm_bytes = bswap16(msg->sm_bytes);
1102                         msg->sm_proto = bswap16(msg->sm_proto);
1103                         msg->sm_rlabel = bswap32(msg->sm_rlabel);
1104                         if (msg->sm_proto & SM_PROTO_ENDIAN_REV)
1105                                 return (EINVAL);
1106                         swapit = 1;
1107                 } else {
1108                         swapit = 0;
1109                 }
1110
1111                 /*
1112                  * Validate the contents.  For PADs, the entire payload is
1113                  * ignored and the minimum message size can be as small as
1114                  * 8 bytes.
1115                  */
1116                 if (msg->sm_proto == SMPROTO_PAD) {
1117                         if (msg->sm_bytes < SL_MIN_PAD_SIZE ||
1118                             msg->sm_bytes > bytes) {
1119                                 return (EINVAL);
1120                         }
1121                         /* ignore the entire payload, it can be garbage */
1122                 } else {
1123                         if (msg->sm_bytes < SL_MIN_MSG_SIZE ||
1124                             msg->sm_bytes > bytes) {
1125                                 return (EINVAL);
1126                         }
1127                         error = syslink_validate_elm(
1128                                     &msg->sm_head,
1129                                     msg->sm_bytes - 
1130                                         offsetof(struct syslink_msg,
1131                                                  sm_head),
1132                                     swapit, SL_MAXDEPTH);
1133                         if (error)
1134                                 return (error);
1135                 }
1136
1137                 /*
1138                  * The aligned payload size must be used to locate the
1139                  * next syslink_msg in the buffer.
1140                  */
1141                 aligned_reclen = SL_MSG_ALIGN(msg->sm_bytes);
1142                 bytes -= aligned_reclen;
1143                 msg = (void *)((char *)msg + aligned_reclen);
1144         }
1145         return(0);
1146 }
1147
1148 static
1149 int
1150 syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
1151                      int swapit, int depth)
1152 {
1153         int aligned_reclen;
1154
1155         /*
1156          * If the buffer isn't big enough to fit the header, stop now!
1157          */
1158         if (bytes < SL_MIN_ELM_SIZE)
1159                 return (EINVAL);
1160         /*
1161          * All syslink_elm headers are recursively endian-adjusted.  Opaque
1162          * data payloads are not.
1163          */
1164         if (swapit) {
1165                 elm->se_cmd = bswap16(elm->se_cmd) ^ SE_CMDF_UNTRANSLATED;
1166                 elm->se_bytes = bswap16(elm->se_bytes);
1167                 elm->se_aux = bswap32(elm->se_aux);
1168         }
1169
1170         /*
1171          * Check element size requirements.
1172          */
1173         if (elm->se_bytes < SL_MIN_ELM_SIZE || elm->se_bytes > bytes)
1174                 return (EINVAL);
1175
1176         /*
1177          * Recursively check structured payloads.  A structured payload may
1178          * contain as few as 0 recursive elements.
1179          */
1180         if (elm->se_cmd & SE_CMDF_STRUCTURED) {
1181                 if (depth == 0)
1182                         return (EINVAL);
1183                 bytes -= SL_MIN_ELM_SIZE;
1184                 ++elm;
1185                 while (bytes > 0) {
1186                         if (syslink_validate_elm(elm, bytes, swapit, depth - 1))
1187                                 return (EINVAL);
1188                         aligned_reclen = SL_MSG_ALIGN(elm->se_bytes);
1189                         elm = (void *)((char *)elm + aligned_reclen);
1190                         bytes -= aligned_reclen;
1191                 }
1192         }
1193         return(0);
1194 }
1195
1196 /************************************************************************
1197  *                  BACKEND FUNCTIONS - USER DESCRIPTOR                 *
1198  ************************************************************************
1199  *
1200  * Peer backend links are primarily used when userland creates a pair
1201  * of linked descriptors.
1202  */
1203
1204 /*
1205  * Do any required blocking / nbio handling for attempts to write to
1206  * a sldesc associated with a user descriptor.
1207  */
1208 static
1209 int
1210 backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto)
1211 {
1212         int error = 0;
1213         int *bytesp = (proto & SM_PROTO_REPLY) ? &sl->repbytes : &sl->cmdbytes;
1214
1215         /*
1216          * Block until sufficient data is drained by the target.  It is
1217          * ok to have a MP race against cmdbytes.
1218          */
1219         if (*bytesp >= syslink_bufsize) {
1220                 spin_lock_wr(&sl->spin);
1221                 while (*bytesp >= syslink_bufsize) {
1222                         if (sl->flags & SLF_WSHUTDOWN) {
1223                                 error = EPIPE;
1224                                 break;
1225                         }
1226                         if (nbio) {
1227                                 error = EAGAIN;
1228                                 break;
1229                         }
1230                         ++sl->wblocked;
1231                         error = msleep(&sl->wblocked, &sl->spin,
1232                                        PCATCH, "slwmsg", 0);
1233                         if (error)
1234                                 break;
1235                 }
1236                 spin_unlock_wr(&sl->spin);
1237         }
1238         return (error);
1239 }
1240
1241 /*
1242  * Unconditionally write a syslink message to the sldesc associated with
1243  * a user descriptor.  Command messages are also placed in a red-black
1244  * tree so their DMA tag (if any) can be accessed and so they can be
1245  * linked to any reply message.
1246  */
1247 static
1248 int
1249 backend_write_user(struct sldesc *sl, struct slmsg *slmsg)
1250 {
1251         int error;
1252
1253         spin_lock_wr(&sl->spin);
1254         if (sl->flags & SLF_RSHUTDOWN) {
1255                 /*
1256                  * Not accepting new messages
1257                  */
1258                 error = EPIPE;
1259         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
1260                 /*
1261                  * Write a reply
1262                  */
1263                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1264                 sl->repbytes += slmsg->maxsize;
1265                 slmsg->flags |= SLMSGF_ONINQ;
1266                 error = 0;
1267         } else if (RB_INSERT(slmsg_rb_tree, &sl->reply_rb_root, slmsg)) {
1268                 /*
1269                  * Write a command, but there was a msgid collision when
1270                  * we tried to insert it into the RB tree.
1271                  */
1272                 error = EEXIST;
1273         } else {
1274                 /*
1275                  * Write a command, successful insertion into the RB tree.
1276                  */
1277                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1278                 sl->cmdbytes += slmsg->maxsize;
1279                 slmsg->flags |= SLMSGF_ONINQ;
1280                 error = 0;
1281         }
1282         spin_unlock_wr(&sl->spin);
1283         if (sl->rwaiters)
1284                 wakeup(&sl->rwaiters);
1285         return(error);
1286 }
1287
1288 /*
1289  * Our peer is replying a command we previously sent it back to us, along
1290  * with the reply message (if not NULL).  We just queue the reply to
1291  * userland and free of the command.
1292  */
1293 static
1294 void
1295 backend_reply_user(struct sldesc *sl, struct slmsg *slcmd, struct slmsg *slrep)
1296 {
1297         int error;
1298
1299         slmsg_put(slcmd);
1300         if (slrep) {
1301                 spin_lock_wr(&sl->spin);
1302                 if ((sl->flags & SLF_RSHUTDOWN) == 0) {
1303                         TAILQ_INSERT_TAIL(&sl->inq, slrep, tqnode);
1304                         sl->repbytes += slrep->maxsize;
1305                         error = 0;
1306                 } else {
1307                         error = EPIPE;
1308                 }
1309                 spin_unlock_wr(&sl->spin);
1310                 if (error)
1311                         sl->peer->backend_dispose(sl->peer, slrep);
1312                 else if (sl->rwaiters)
1313                         wakeup(&sl->rwaiters);
1314         }
1315 }
1316
1317 static
1318 void
1319 backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg)
1320 {
1321         slmsg_put(slmsg);
1322 }
1323
1324 /************************************************************************
1325  *                      KERNEL DRIVER OR FILESYSTEM API                 *
1326  ************************************************************************
1327  *
1328  */
1329
1330 /*
1331  * Create a user<->kernel link, returning the user descriptor in *fdp
1332  * and the kernel descriptor in *kslp.  0 is returned on success, and an
1333  * error code is returned on failure.
1334  */
1335 int
1336 syslink_ukbackend(int *fdp, struct sldesc **kslp)
1337 {
1338         struct proc *p = curproc;
1339         struct file *fp;
1340         struct sldesc *usl;
1341         struct sldesc *ksl;
1342         int error;
1343         int fd;
1344
1345         *fdp = -1;
1346         *kslp = NULL;
1347
1348         error = falloc(p, &fp, &fd);
1349         if (error)
1350                 return(error);
1351         usl = allocsldesc(NULL);
1352         usl->backend_wblocked = backend_wblocked_user;
1353         usl->backend_write = backend_write_user;
1354         usl->backend_reply = backend_reply_user;
1355         usl->backend_dispose = backend_dispose_user;
1356
1357         ksl = allocsldesc(usl->common);
1358         ksl->peer = usl;
1359         ksl->backend_wblocked = backend_wblocked_kern;
1360         ksl->backend_write = backend_write_kern;
1361         ksl->backend_reply = backend_reply_kern;
1362         ksl->backend_dispose = backend_dispose_kern;
1363
1364         usl->peer = ksl;
1365
1366         setsldescfp(usl, fp);
1367         fsetfd(p, fp, fd);
1368         fdrop(fp);
1369
1370         *fdp = fd;
1371         *kslp = ksl;
1372         return(0);
1373 }
1374
1375 /*
1376  * Assign a unique message id, issue a syslink message to userland,
1377  * and wait for a reply.
1378  */
1379 int
1380 syslink_kdomsg(struct sldesc *ksl, struct slmsg *slmsg)
1381 {
1382         struct syslink_msg *msg;
1383         int error;
1384
1385         /*
1386          * Finish initializing slmsg and post it to the red-black tree for
1387          * reply matching.  If the message id is already in use we return
1388          * EEXIST, giving the originator the chance to roll a new msgid.
1389          */
1390         msg = slmsg->msg;
1391         slmsg->msgsize = msg->sm_bytes;
1392         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1393                 return (error);
1394         msg->sm_msgid = allocsysid();
1395
1396         /*
1397          * Issue the request and wait for a matching reply or failure,
1398          * then remove the message from the matching tree and return.
1399          */
1400         error = ksl->peer->backend_write(ksl->peer, slmsg);
1401         spin_lock_wr(&ksl->spin);
1402         if (error == 0) {
1403                 while (slmsg->rep == NULL) {
1404                         error = msleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1405                         /* XXX ignore error for now */
1406                 }
1407                 if (slmsg->rep == (struct slmsg *)-1) {
1408                         error = EIO;
1409                         slmsg->rep = NULL;
1410                 } else {
1411                         error = slmsg->rep->msg->sm_head.se_aux;
1412                 }
1413         }
1414         spin_unlock_wr(&ksl->spin);
1415         return(error);
1416 }
1417
1418 /*
1419  * Similar to syslink_kdomsg but return immediately instead of
1420  * waiting for a reply.  The kernel must supply a callback function
1421  * which will be made in the context of the user process replying
1422  * to the message.
1423  */
1424 int
1425 syslink_ksendmsg(struct sldesc *ksl, struct slmsg *slmsg,
1426                  void (*func)(struct slmsg *, void *, int), void *arg)
1427 {
1428         struct syslink_msg *msg;
1429         int error;
1430
1431         /*
1432          * Finish initializing slmsg and post it to the red-black tree for
1433          * reply matching.  If the message id is already in use we return
1434          * EEXIST, giving the originator the chance to roll a new msgid.
1435          */
1436         msg = slmsg->msg;
1437         slmsg->msgsize = msg->sm_bytes;
1438         slmsg->callback_func = func;
1439         slmsg->callback_data = arg;
1440         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1441                 return (error);
1442         msg->sm_msgid = allocsysid();
1443
1444         /*
1445          * Issue the request.  If no error occured the operation will be
1446          * in progress, otherwise the operation is considered to have failed
1447          * and the caller can deallocate the slmsg.
1448          */
1449         error = ksl->peer->backend_write(ksl->peer, slmsg);
1450         return (error);
1451 }
1452
1453 int
1454 syslink_kwaitmsg(struct sldesc *ksl, struct slmsg *slmsg)
1455 {
1456         int error;
1457
1458         spin_lock_wr(&ksl->spin);
1459         while (slmsg->rep == NULL) {
1460                 error = msleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1461                 /* XXX ignore error for now */
1462         }
1463         if (slmsg->rep == (struct slmsg *)-1) {
1464                 error = EIO;
1465                 slmsg->rep = NULL;
1466         } else {
1467                 error = slmsg->rep->msg->sm_head.se_aux;
1468         }
1469         spin_unlock_wr(&ksl->spin);
1470         return(error);
1471 }
1472
1473 struct slmsg *
1474 syslink_kallocmsg(void)
1475 {
1476         return(objcache_get(sl_objcache_small, M_WAITOK));
1477 }
1478
1479 void
1480 syslink_kfreemsg(struct sldesc *ksl, struct slmsg *slmsg)
1481 {
1482         struct slmsg *rep;
1483
1484         if ((rep = slmsg->rep) != NULL) {
1485                 slmsg->rep = NULL;
1486                 ksl->peer->backend_dispose(ksl->peer, rep);
1487         }
1488         slmsg->callback_func = NULL;
1489         slmsg_put(slmsg);
1490 }
1491
1492 void
1493 syslink_kshutdown(struct sldesc *ksl, int how)
1494 {
1495         shutdownsldesc(ksl, how);
1496 }
1497
1498 void
1499 syslink_kclose(struct sldesc *ksl)
1500 {
1501         shutdownsldesc(ksl, SHUT_RDWR);
1502         sldrop(ksl);
1503 }
1504
1505 /*
1506  * Associate a DMA buffer with a kernel syslink message prior to it
1507  * being sent to userland.  The DMA buffer is set up from the point
1508  * of view of the target.
1509  */
1510 int
1511 syslink_kdmabuf_pages(struct slmsg *slmsg, struct vm_page **mbase, int npages)
1512 {
1513         int xflags;
1514         int error;
1515
1516         xflags = XIOF_VMLINEAR;
1517         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1518                 xflags |= XIOF_READ | XIOF_WRITE;
1519         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1520                 xflags |= XIOF_READ;
1521         error = xio_init_pages(&slmsg->xio, mbase, npages, xflags);
1522         slmsg->flags |= SLMSGF_HASXIO;
1523         return (error);
1524 }
1525
1526 /*
1527  * Associate a DMA buffer with a kernel syslink message prior to it
1528  * being sent to userland.  The DMA buffer is set up from the point
1529  * of view of the target.
1530  */
1531 int
1532 syslink_kdmabuf_data(struct slmsg *slmsg, char *base, int bytes)
1533 {
1534         int xflags;
1535
1536         xflags = XIOF_VMLINEAR;
1537         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1538                 xflags |= XIOF_READ | XIOF_WRITE;
1539         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1540                 xflags |= XIOF_READ;
1541         xio_init_kbuf(&slmsg->xio, base, bytes);
1542         slmsg->xio.xio_flags |= xflags;
1543         slmsg->flags |= SLMSGF_HASXIO;
1544         return(0);
1545 }
1546
1547 /************************************************************************
1548  *                  BACKEND FUNCTIONS FOR KERNEL API                    *
1549  ************************************************************************
1550  *
1551  * These are the backend functions for a sldesc associated with a kernel
1552  * API.
1553  */
1554
1555 /*
1556  * Our peer wants to write a syslink message to us and is asking us to
1557  * block if our input queue is full.  We don't implement command reception
1558  * so don't block right now.
1559  */
1560 static
1561 int
1562 backend_wblocked_kern(struct sldesc *ksl, int nbio, sl_proto_t proto)
1563 {
1564         /* never blocks */
1565         return(0);
1566 }
1567
1568 /*
1569  * Our peer is writing a request to the kernel.  At the moment we do not
1570  * accept commands.
1571  */
1572 static
1573 int
1574 backend_write_kern(struct sldesc *ksl, struct slmsg *slmsg)
1575 {
1576         return(EOPNOTSUPP);
1577 }
1578
1579 /*
1580  * Our peer wants to reply to a syslink message we sent it earlier.  The
1581  * original command (that we passed to our peer), and the peer's reply
1582  * is specified.  If the peer has failed slrep will be NULL.
1583  */
1584 static
1585 void
1586 backend_reply_kern(struct sldesc *ksl, struct slmsg *slcmd, struct slmsg *slrep)
1587 {
1588         int error;
1589
1590         spin_lock_wr(&ksl->spin);
1591         if (slrep == NULL) {
1592                 slcmd->rep = (struct slmsg *)-1;
1593                 error = EIO;
1594         } else {
1595                 slcmd->rep = slrep;
1596                 error = slrep->msg->sm_head.se_aux;
1597         }
1598         spin_unlock_wr(&ksl->spin);
1599
1600         /*
1601          * Issue callback or wakeup a synchronous waiter.
1602          */
1603         if (slcmd->callback_func) {
1604                 slcmd->callback_func(slcmd, slcmd->callback_data, error);
1605         } else {
1606                 wakeup(slcmd);
1607         }
1608 }
1609
1610 /*
1611  * Any reply messages we sent to our peer are returned to us for disposal.
1612  * Since we do not currently accept commands from our peer, there will not
1613  * be any replies returned to the peer to dispose of.
1614  */
1615 static
1616 void
1617 backend_dispose_kern(struct sldesc *ksl, struct slmsg *slmsg)
1618 {
1619         panic("backend_dispose_kern: kernel can't accept commands so it "
1620               "certainly did not reply to one!");
1621 }
1622