- Return the real cluster limit used by the objcache
[dragonfly.git] / sys / kern / kern_syslink.c
1 /*
2  * Copyright (c) 2006-2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/kern_syslink.c,v 1.16 2008/10/26 04:29:19 sephe Exp $
35  */
36 /*
37  * This module implements the core syslink() system call and provides
38  * glue for kernel syslink frontends and backends, creating a intra-host
39  * communications infrastructure and DMA transport abstraction.
40  */
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/endian.h>
46 #include <sys/malloc.h>
47 #include <sys/alist.h>
48 #include <sys/file.h>
49 #include <sys/proc.h>
50 #include <sys/lock.h>
51 #include <sys/uio.h>
52 #include <sys/objcache.h>
53 #include <sys/queue.h>
54 #include <sys/thread.h>
55 #include <sys/tree.h>
56 #include <sys/sysctl.h>
57 #include <sys/sysproto.h>
58 #include <sys/mbuf.h>
59 #include <sys/socket.h>
60 #include <sys/socketvar.h>
61 #include <sys/socketops.h>
62 #include <sys/sysref.h>
63 #include <sys/syslink.h>
64 #include <sys/syslink_msg.h>
65 #include <netinet/in.h>
66
67 #include <sys/thread2.h>
68 #include <sys/spinlock2.h>
69 #include <sys/buf2.h>
70
71 #include "opt_syslink.h"
72
73 /*
74  * Syslink Connection abstraction
75  */
76 struct slcommon {
77         struct spinlock spin;
78         int     refs;
79 };
80
81 struct sldesc {
82         struct slmsgq   inq;
83         struct slmsg_rb_tree reply_rb_root; /* replies to requests */
84         struct spinlock spin;
85         struct sldesc   *peer;          /* peer syslink, if any */
86         struct file     *xfp;           /* external file pointer */
87         struct slcommon *common;
88         int     flags;
89         int     rwaiters;               /* number of threads waiting */
90         int     wblocked;               /* blocked waiting for us to drain */
91         size_t  cmdbytes;               /* unreplied commands pending */
92         size_t  repbytes;               /* undrained replies pending */
93         int     (*backend_wblocked)(struct sldesc *, int, sl_proto_t);
94         int     (*backend_write)(struct sldesc *, struct slmsg *);
95         void    (*backend_reply)(struct sldesc *,struct slmsg *,struct slmsg *);
96         void    (*backend_dispose)(struct sldesc *, struct slmsg *);
97 };
98
99 #define SLF_RSHUTDOWN   0x0001
100 #define SLF_WSHUTDOWN   0x0002
101
102 static int syslink_cmd_new(struct syslink_info_new *info, int *result);
103 static struct sldesc *allocsldesc(struct slcommon *common);
104 static void setsldescfp(struct sldesc *sl, struct file *fp);
105 static void shutdownsldesc(struct sldesc *sl, int how);
106 static void shutdownsldesc2(struct sldesc *sl, int how);
107 static void sldrop(struct sldesc *sl);
108 static int syslink_validate_msg(struct syslink_msg *msg, int bytes);
109 static int syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
110                                  int swapit, int depth);
111
112 static int sl_local_mmap(struct slmsg *slmsg, char *base, size_t len);
113 static void sl_local_munmap(struct slmsg *slmsg);
114
115 static int backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto);
116 static int backend_write_user(struct sldesc *sl, struct slmsg *slmsg);
117 static void backend_reply_user(struct sldesc *sl, struct slmsg *slcmd,
118                                struct slmsg *slrep);
119 static void backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg);
120
121 static int backend_wblocked_kern(struct sldesc *sl, int nbio, sl_proto_t proto);
122 static int backend_write_kern(struct sldesc *sl, struct slmsg *slmsg);
123 static void backend_reply_kern(struct sldesc *sl, struct slmsg *slcmd,
124                                struct slmsg *slrep);
125 static void backend_dispose_kern(struct sldesc *sl, struct slmsg *slmsg);
126 static void slmsg_put(struct slmsg *slmsg);
127
128 /*
129  * Objcache memory backend
130  *
131  * All three object caches return slmsg structures but each is optimized
132  * for syslink message buffers of varying sizes.  We use the slightly
133  * more complex ctor/dtor API in order to provide ready-to-go slmsg's.
134  */
135
136 static struct objcache *sl_objcache_big;
137 static struct objcache *sl_objcache_small;
138 static struct objcache *sl_objcache_none;
139
140 MALLOC_DEFINE(M_SYSLINK, "syslink", "syslink manager");
141
142 static boolean_t slmsg_ctor(void *data, void *private, int ocflags);
143 static void slmsg_dtor(void *data, void *private);
144
145 static
146 void
147 syslinkinit(void *dummy __unused)
148 {
149         size_t n = sizeof(struct slmsg);
150
151         sl_objcache_none = objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
152                                                    slmsg_ctor, slmsg_dtor,
153                                                    &sl_objcache_none);
154         sl_objcache_small= objcache_create_mbacked(M_SYSLINK, n, NULL, 64,
155                                                    slmsg_ctor, slmsg_dtor,
156                                                    &sl_objcache_small);
157         sl_objcache_big  = objcache_create_mbacked(M_SYSLINK, n, NULL, 16,
158                                                    slmsg_ctor, slmsg_dtor,
159                                                    &sl_objcache_big);
160 }
161
162 static
163 boolean_t
164 slmsg_ctor(void *data, void *private, int ocflags)
165 {
166         struct slmsg *slmsg = data;
167
168         bzero(slmsg, sizeof(*slmsg));
169
170         slmsg->oc = *(struct objcache **)private;
171         if (slmsg->oc == sl_objcache_none) {
172                 slmsg->maxsize = 0;
173         } else if (slmsg->oc == sl_objcache_small) {
174                 slmsg->maxsize = SLMSG_SMALL;
175         } else if (slmsg->oc == sl_objcache_big) {
176                 slmsg->maxsize = SLMSG_BIG;
177         } else {
178                 panic("slmsg_ctor: bad objcache?\n");
179         }
180         if (slmsg->maxsize) {
181                 slmsg->msg = kmalloc(slmsg->maxsize,
182                                      M_SYSLINK, M_WAITOK|M_ZERO);
183         }
184         xio_init(&slmsg->xio);
185         return(TRUE);
186 }
187
188 static
189 void
190 slmsg_dtor(void *data, void *private)
191 {
192         struct slmsg *slmsg = data;
193
194         if (slmsg->maxsize && slmsg->msg) {
195                 kfree(slmsg->msg, M_SYSLINK);
196                 slmsg->msg = NULL;
197         }
198         slmsg->oc = NULL;
199 }
200
201 SYSINIT(syslink, SI_BOOT2_MACHDEP, SI_ORDER_ANY, syslinkinit, NULL)
202
203 static int rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2);
204 RB_GENERATE2(slmsg_rb_tree, slmsg, rbnode, rb_slmsg_compare,
205              sysid_t, msg->sm_msgid);
206
207 /*
208  * Sysctl elements
209  */
210 static int syslink_enabled;
211 SYSCTL_NODE(_kern, OID_AUTO, syslink, CTLFLAG_RW, 0, "Pipe operation");
212 SYSCTL_INT(_kern_syslink, OID_AUTO, enabled,
213             CTLFLAG_RW, &syslink_enabled, 0, "Enable SYSLINK");
214 static size_t syslink_bufsize = 65536;
215 SYSCTL_UINT(_kern_syslink, OID_AUTO, bufsize,
216             CTLFLAG_RW, &syslink_bufsize, 0, "Maximum buffer size");
217
218 /*
219  * Fileops API - typically used to glue a userland frontend with a
220  *               kernel backend.
221  */
222
223 static int slfileop_read(struct file *fp, struct uio *uio,
224                         struct ucred *cred, int flags);
225 static int slfileop_write(struct file *fp, struct uio *uio,
226                          struct ucred *cred, int flags);
227 static int slfileop_close(struct file *fp);
228 static int slfileop_stat(struct file *fp, struct stat *sb, struct ucred *cred);
229 static int slfileop_shutdown(struct file *fp, int how);
230 static int slfileop_ioctl(struct file *fp, u_long cmd, caddr_t data,
231                          struct ucred *cred);
232 static int slfileop_poll(struct file *fp, int events, struct ucred *cred);
233 static int slfileop_kqfilter(struct file *fp, struct knote *kn);
234
235 static struct fileops syslinkops = {
236     .fo_read =          slfileop_read,
237     .fo_write =         slfileop_write,
238     .fo_ioctl =         slfileop_ioctl,
239     .fo_poll =          slfileop_poll,
240     .fo_kqfilter =      slfileop_kqfilter,
241     .fo_stat =          slfileop_stat,
242     .fo_close =         slfileop_close,
243     .fo_shutdown =      slfileop_shutdown
244 };
245
246 /************************************************************************
247  *                      PRIMARY SYSTEM CALL INTERFACE                   *
248  ************************************************************************
249  *
250  * syslink(int cmd, struct syslink_info *info, size_t bytes)
251  */
252 int
253 sys_syslink(struct syslink_args *uap)
254 {
255         union syslink_info_all info;
256         int error;
257
258         /*
259          * System call is under construction and disabled by default. 
260          * Superuser access is also required for now, but eventually
261          * will not be needed.
262          */
263         if (syslink_enabled == 0)
264                 return (EAUTH);
265         error = suser(curthread);
266         if (error)
267                 return (error);
268
269         /*
270          * Load and validate the info structure.  Unloaded bytes are zerod
271          * out.  The label field must always be 0-filled, even if not used
272          * for a command.
273          */
274         bzero(&info, sizeof(info));
275         if ((unsigned)uap->bytes <= sizeof(info)) {
276                 if (uap->bytes)
277                         error = copyin(uap->info, &info, uap->bytes);
278         } else {
279                 error = EINVAL;
280         }
281         if (error)
282                 return (error);
283
284         /*
285          * Process the command
286          */
287         switch(uap->cmd) {
288         case SYSLINK_CMD_NEW:
289                 error = syslink_cmd_new(&info.cmd_new, &uap->sysmsg_result);
290                 break;
291         default:
292                 error = EINVAL;
293                 break;
294         }
295         if (error == 0 && info.head.wbflag)
296                 copyout(&info, uap->info, uap->bytes);
297         return (error);
298 }
299
300 /*
301  * Create a linked pair of descriptors, like a pipe.
302  */
303 static
304 int
305 syslink_cmd_new(struct syslink_info_new *info, int *result)
306 {
307         struct proc *p = curproc;
308         struct file *fp1;
309         struct file *fp2;
310         struct sldesc *sl;
311         struct sldesc *slpeer;
312         int error;
313         int fd1, fd2;
314
315         error = falloc(p, &fp1, &fd1);
316         if (error)
317                 return(error);
318         error = falloc(p, &fp2, &fd2);
319         if (error) {
320                 fsetfd(p, NULL, fd1);
321                 fdrop(fp1);
322                 return(error);
323         }
324         slpeer = allocsldesc(NULL);
325         slpeer->backend_wblocked = backend_wblocked_user;
326         slpeer->backend_write = backend_write_user;
327         slpeer->backend_reply = backend_reply_user;
328         slpeer->backend_dispose = backend_dispose_user;
329         sl = allocsldesc(slpeer->common);
330         sl->peer = slpeer;
331         sl->backend_wblocked = backend_wblocked_user;
332         sl->backend_write = backend_write_user;
333         sl->backend_reply = backend_reply_user;
334         sl->backend_dispose = backend_dispose_user;
335         slpeer->peer = sl;
336
337         setsldescfp(sl, fp1);
338         setsldescfp(slpeer, fp2);
339
340         fsetfd(p, fp1, fd1);
341         fdrop(fp1);
342         fsetfd(p, fp2, fd2);
343         fdrop(fp2);
344
345         info->head.wbflag = 1;  /* write back */
346         info->fds[0] = fd1;
347         info->fds[1] = fd2;
348
349         return(0);
350 }
351
352 /************************************************************************
353  *                      LOW LEVEL SLDESC SUPPORT                        *
354  ************************************************************************
355  *
356  */
357
358 static
359 struct sldesc *
360 allocsldesc(struct slcommon *common)
361 {
362         struct sldesc *sl;
363
364         sl = kmalloc(sizeof(struct sldesc), M_SYSLINK, M_WAITOK|M_ZERO);
365         if (common == NULL)
366                 common = kmalloc(sizeof(*common), M_SYSLINK, M_WAITOK|M_ZERO);
367         TAILQ_INIT(&sl->inq);           /* incoming requests */
368         RB_INIT(&sl->reply_rb_root);    /* match incoming replies */
369         spin_init(&sl->spin);
370         sl->common = common;
371         ++common->refs;
372         return(sl);
373 }
374
375 static
376 void
377 setsldescfp(struct sldesc *sl, struct file *fp)
378 {
379         sl->xfp = fp;
380         fp->f_type = DTYPE_SYSLINK;
381         fp->f_flag = FREAD | FWRITE;
382         fp->f_ops = &syslinkops;
383         fp->f_data = sl;
384 }
385
386 /*
387  * Red-black tree compare function
388  */
389 static
390 int
391 rb_slmsg_compare(struct slmsg *msg1, struct slmsg *msg2)
392 {
393         if (msg1->msg->sm_msgid < msg2->msg->sm_msgid)
394                 return(-1);
395         if (msg1->msg->sm_msgid == msg2->msg->sm_msgid)
396                 return(0);
397         return(1);
398 }
399
400 static
401 void
402 shutdownsldesc(struct sldesc *sl, int how)
403 {
404         struct slmsg *slmsg;
405         int rhow;
406
407         shutdownsldesc2(sl, how);
408
409         /*
410          * Return unread and unreplied messages
411          */
412         spin_lock_wr(&sl->spin);
413         while ((slmsg = TAILQ_FIRST(&sl->inq)) != NULL) {
414                 TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
415                 spin_unlock_wr(&sl->spin);
416                 if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
417                         sl->repbytes -= slmsg->maxsize;
418                         slmsg->flags &= ~SLMSGF_ONINQ;
419                         sl->peer->backend_dispose(sl->peer, slmsg);
420                 }
421                 /* leave ONINQ set for commands, it will cleared below */
422                 spin_lock_wr(&sl->spin);
423         }
424         while ((slmsg = RB_ROOT(&sl->reply_rb_root)) != NULL) {
425                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slmsg);
426                 sl->cmdbytes -= slmsg->maxsize;
427                 spin_unlock_wr(&sl->spin);
428                 slmsg->flags &= ~SLMSGF_ONINQ;
429                 sl->peer->backend_reply(sl->peer, slmsg, NULL);
430                 spin_lock_wr(&sl->spin);
431         }
432         spin_unlock_wr(&sl->spin);
433
434         /*
435          * Call shutdown on the peer with the opposite flags
436          */
437         rhow = 0;
438         switch(how) {
439         case SHUT_RD:
440                 rhow = SHUT_WR;
441                 break;
442         case SHUT_WR:
443                 rhow = SHUT_WR;
444                 break;
445         case SHUT_RDWR:
446                 rhow = SHUT_RDWR;
447                 break;
448         }
449         shutdownsldesc2(sl->peer, rhow);
450 }
451
452 static
453 void
454 shutdownsldesc2(struct sldesc *sl, int how)
455 {
456         spin_lock_wr(&sl->spin);
457         switch(how) {
458         case SHUT_RD:
459                 sl->flags |= SLF_RSHUTDOWN;
460                 break;
461         case SHUT_WR:
462                 sl->flags |= SLF_WSHUTDOWN;
463                 break;
464         case SHUT_RDWR:
465                 sl->flags |= SLF_RSHUTDOWN | SLF_WSHUTDOWN;
466                 break;
467         }
468         spin_unlock_wr(&sl->spin);
469
470         /*
471          * Handle signaling on the user side
472          */
473         if (how & SHUT_RD) {
474                 if (sl->rwaiters)
475                         wakeup(&sl->rwaiters);
476         }
477         if (how & SHUT_WR) {
478                 if (sl->wblocked) {
479                         sl->wblocked = 0;       /* race ok */
480                         wakeup(&sl->wblocked);
481                 }
482         }
483 }
484
485 static
486 void
487 sldrop(struct sldesc *sl)
488 {
489         struct sldesc *slpeer;
490
491         spin_lock_wr(&sl->common->spin);
492         if (--sl->common->refs == 0) {
493                 spin_unlock_wr(&sl->common->spin);
494                 if ((slpeer = sl->peer) != NULL) {
495                         sl->peer = NULL;
496                         slpeer->peer = NULL;
497                         slpeer->common = NULL;
498                         KKASSERT(slpeer->xfp == NULL);
499                         KKASSERT(TAILQ_EMPTY(&slpeer->inq));
500                         KKASSERT(RB_EMPTY(&slpeer->reply_rb_root));
501                         kfree(slpeer, M_SYSLINK);
502                 }
503                 KKASSERT(sl->xfp == NULL);
504                 KKASSERT(TAILQ_EMPTY(&sl->inq));
505                 KKASSERT(RB_EMPTY(&sl->reply_rb_root));
506                 kfree(sl->common, M_SYSLINK);
507                 sl->common = NULL;
508                 kfree(sl, M_SYSLINK);
509         } else {
510                 spin_unlock_wr(&sl->common->spin);
511         }
512 }
513
514 static
515 void
516 slmsg_put(struct slmsg *slmsg)
517 {
518         if (slmsg->flags & SLMSGF_HASXIO) {
519                 slmsg->flags &= ~SLMSGF_HASXIO;
520                 get_mplock();
521                 xio_release(&slmsg->xio);
522                 rel_mplock();
523         }
524         slmsg->flags &= ~SLMSGF_LINMAP;
525         objcache_put(slmsg->oc, slmsg);
526 }
527
528 /************************************************************************
529  *                              FILEOPS API                             *
530  ************************************************************************
531  *
532  * Implement userland fileops.
533  *
534  * MPSAFE ops
535  */
536 static
537 int
538 slfileop_read(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
539 {
540         struct sldesc *sl = fp->f_data;         /* fp refed on call */
541         struct slmsg *slmsg;
542         struct iovec *iov0;
543         struct iovec *iov1;
544         struct syslink_msg *wmsg;
545         int error;
546         int nbio;
547
548         /*
549          * Kinda messy.  Figure out the non-blocking state
550          */
551         if (flags & O_FBLOCKING)
552                 nbio = 0;
553         else if (flags & O_FNONBLOCKING)
554                 nbio = 1;
555         else if (fp->f_flag & O_NONBLOCK)
556                 nbio = 1;
557         else
558                 nbio = 0;
559
560         /*
561          * Validate the uio.
562          *
563          * iov0 - message buffer
564          * iov1 - DMA buffer or backup buffer
565          */
566         if (uio->uio_iovcnt < 1) {
567                 error = 0;
568                 goto done2;
569         }
570         iov0 = &uio->uio_iov[0];
571         if (uio->uio_iovcnt > 2) {
572                 error = EINVAL;
573                 goto done2;
574         }
575
576         /*
577          * Get a message, blocking if necessary.
578          */
579         spin_lock_wr(&sl->spin);
580         while ((slmsg = TAILQ_FIRST(&sl->inq)) == NULL) {
581                 if (sl->flags & SLF_RSHUTDOWN) {
582                         error = 0;
583                         goto done1;
584                 }
585                 if (nbio) {
586                         error = EAGAIN;
587                         goto done1;
588                 }
589                 ++sl->rwaiters;
590                 error = msleep(&sl->rwaiters, &sl->spin, PCATCH, "slrmsg", 0);
591                 --sl->rwaiters;
592                 if (error)
593                         goto done1;
594         }
595         wmsg = slmsg->msg;
596
597         /*
598          * We have a message and still hold the spinlock.  Make sure the
599          * uio has enough room to hold the message.
600          *
601          * Note that replies do not have XIOs.
602          */
603         if (slmsg->msgsize > iov0->iov_len) {
604                 error = ENOSPC;
605                 goto done1;
606         }
607         if (slmsg->xio.xio_bytes) {
608                 if (uio->uio_iovcnt != 2) {
609                         error = ENOSPC;
610                         goto done1;
611                 }
612                 iov1 = &uio->uio_iov[1];
613                 if (slmsg->xio.xio_bytes > iov1->iov_len) {
614                         error = ENOSPC;
615                         goto done1;
616                 }
617         } else {
618                 iov1 = NULL;
619         }
620
621         /*
622          * Dequeue the message.  Adjust repbytes immediately.  cmdbytes
623          * are adjusted when the command is replied to, not here.
624          */
625         TAILQ_REMOVE(&sl->inq, slmsg, tqnode);
626         if (slmsg->msg->sm_proto & SM_PROTO_REPLY)
627                 sl->repbytes -= slmsg->maxsize;
628         spin_unlock_wr(&sl->spin);
629
630         /*
631          * Load the message data into the user buffer.
632          *
633          * If receiving a command an XIO may exist specifying a DMA buffer.
634          * For commands, if DMAW is set we have to copy or map the buffer
635          * so the caller can access the data being written.  If DMAR is set
636          * we do not have to copy but we still must map the buffer so the
637          * caller can directly fill in the data being requested.
638          */
639         error = uiomove((void *)slmsg->msg, slmsg->msgsize, uio);
640         if (error == 0 && slmsg->xio.xio_bytes &&
641             (wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
642                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW) {
643                         /*
644                          * Data being passed to caller or being passed in both
645                          * directions, copy or map.
646                          */
647                         get_mplock();
648                         if ((flags & O_MAPONREAD) &&
649                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
650                                 error = sl_local_mmap(slmsg,
651                                                       iov1->iov_base,
652                                                       iov1->iov_len);
653                                 if (error)
654                                 error = xio_copy_xtou(&slmsg->xio, 0,
655                                                       iov1->iov_base,
656                                                       slmsg->xio.xio_bytes);
657                         } else {
658                                 error = xio_copy_xtou(&slmsg->xio, 0,
659                                                       iov1->iov_base,
660                                                       slmsg->xio.xio_bytes);
661                         }
662                         rel_mplock();
663                 } else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR) {
664                         /*
665                          * Data will be passed back to originator, map
666                          * the buffer if we can, else use the backup
667                          * buffer at the same VA supplied by the caller.
668                          */
669                         get_mplock();
670                         if ((flags & O_MAPONREAD) &&
671                             (slmsg->xio.xio_flags & XIOF_VMLINEAR)) {
672                                 error = sl_local_mmap(slmsg,
673                                                       iov1->iov_base,
674                                                       iov1->iov_len);
675                                 error = 0; /* ignore errors */
676                         }
677                         rel_mplock();
678                 }
679         }
680
681         /*
682          * Clean up.
683          */
684         if (error) {
685                 /*
686                  * Requeue the message if we could not read it successfully
687                  */
688                 spin_lock_wr(&sl->spin);
689                 TAILQ_INSERT_HEAD(&sl->inq, slmsg, tqnode);
690                 slmsg->flags |= SLMSGF_ONINQ;
691                 spin_unlock_wr(&sl->spin);
692         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
693                 /*
694                  * Dispose of any received reply after we've copied it
695                  * to userland.  We don't need the slmsg any more.
696                  */
697                 slmsg->flags &= ~SLMSGF_ONINQ;
698                 sl->peer->backend_dispose(sl->peer, slmsg);
699                 if (sl->wblocked && sl->repbytes < syslink_bufsize) {
700                         sl->wblocked = 0;       /* MP race ok here */
701                         wakeup(&sl->wblocked);
702                 }
703         } else {
704                 /*
705                  * Leave the command in the RB tree but clear ONINQ now
706                  * that we have returned it to userland so userland can
707                  * reply to it.
708                  */
709                 slmsg->flags &= ~SLMSGF_ONINQ;
710         }
711         return(error);
712 done1:
713         spin_unlock_wr(&sl->spin);
714 done2:
715         return(error);
716 }
717
718 /*
719  * Userland writes syslink message (optionally with DMA buffer in iov[1]).
720  */
721 static
722 int
723 slfileop_write(struct file *fp, struct uio *uio, struct ucred *cred, int flags)
724 {
725         struct sldesc *sl = fp->f_data;
726         struct slmsg *slmsg;
727         struct slmsg *slcmd;
728         struct syslink_msg sltmp;
729         struct syslink_msg *wmsg;       /* wire message */
730         struct iovec *iov0;
731         struct iovec *iov1;
732         sl_proto_t proto;
733         int nbio;
734         int error;
735         int xflags;
736
737         /*
738          * Kinda messy.  Figure out the non-blocking state
739          */
740         if (flags & O_FBLOCKING)
741                 nbio = 0;
742         else if (flags & O_FNONBLOCKING)
743                 nbio = 1;
744         else if (fp->f_flag & O_NONBLOCK)
745                 nbio = 1;
746         else
747                 nbio = 0;
748
749         /*
750          * Validate the uio
751          */
752         if (uio->uio_iovcnt < 1) {
753                 error = 0;
754                 goto done2;
755         }
756         iov0 = &uio->uio_iov[0];
757         if (iov0->iov_len > SLMSG_BIG) {
758                 error = EFBIG;
759                 goto done2;
760         }
761         if (uio->uio_iovcnt > 2) {
762                 error = EFBIG;
763                 goto done2;
764         }
765         if (uio->uio_iovcnt > 1) {
766                 iov1 = &uio->uio_iov[1];
767                 if (iov1->iov_len > XIO_INTERNAL_SIZE) {
768                         error = EFBIG;
769                         goto done2;
770                 }
771                 if ((intptr_t)iov1->iov_base & PAGE_MASK) {
772                         error = EINVAL;
773                         goto done2;
774                 }
775         } else {
776                 iov1 = NULL;
777         }
778
779         /*
780          * Handle the buffer-full case.  slpeer cmdbytes is managed
781          * by the backend function, not us so if the callback just
782          * directly implements the message and never adjusts cmdbytes,
783          * we will never sleep here.
784          */
785         if (sl->flags & SLF_WSHUTDOWN) {
786                 error = EPIPE;
787                 goto done2;
788         }
789
790         /*
791          * Only commands can block the pipe, not replies.  Otherwise a
792          * deadlock is possible.
793          */
794         error = copyin(iov0->iov_base, &sltmp, sizeof(sltmp));
795         if (error)
796                 goto done2;
797         if ((proto = sltmp.sm_proto) & SM_PROTO_ENDIAN_REV)
798                 proto = bswap16(proto);
799         error = sl->peer->backend_wblocked(sl->peer, nbio, proto);
800         if (error)
801                 goto done2;
802
803         /*
804          * Allocate a slmsg and load the message.  Note that the bytes
805          * returned to userland only reflects the primary syslink message
806          * and does not include any DMA buffers.
807          */
808         if (iov0->iov_len <= SLMSG_SMALL)
809                 slmsg = objcache_get(sl_objcache_small, M_WAITOK);
810         else
811                 slmsg = objcache_get(sl_objcache_big, M_WAITOK);
812         slmsg->msgsize = iov0->iov_len;
813         wmsg = slmsg->msg;
814
815         error = uiomove((void *)wmsg, iov0->iov_len, uio);
816         if (error)
817                 goto done1;
818         error = syslink_validate_msg(wmsg, slmsg->msgsize);
819         if (error)
820                 goto done1;
821
822         if ((wmsg->sm_head.se_cmd & SE_CMDF_REPLY) == 0) {
823                 /*
824                  * Install the XIO for commands if any DMA flags are set.
825                  *
826                  * XIOF_VMLINEAR requires that the XIO represent a
827                  * contiguous set of pages associated with a single VM
828                  * object (so the reader side can mmap it easily).
829                  *
830                  * XIOF_VMLINEAR might not be set when the kernel sends
831                  * commands to userland so the reader side backs off to
832                  * a backup buffer if it isn't set, but we require it
833                  * for userland writes.
834                  */
835                 xflags = XIOF_VMLINEAR;
836                 if (wmsg->sm_head.se_cmd & SE_CMDF_DMAR)
837                         xflags |= XIOF_READ | XIOF_WRITE;
838                 else if (wmsg->sm_head.se_cmd & SE_CMDF_DMAW)
839                         xflags |= XIOF_READ;
840                 if (xflags && iov1) {
841                         get_mplock();
842                         error = xio_init_ubuf(&slmsg->xio, iov1->iov_base,
843                                               iov1->iov_len, xflags);
844                         rel_mplock();
845                         if (error)
846                                 goto done1;
847                         slmsg->flags |= SLMSGF_HASXIO;
848                 }
849                 error = sl->peer->backend_write(sl->peer, slmsg);
850         } else {
851                 /*
852                  * Replies have to be matched up against received commands.
853                  */
854                 spin_lock_wr(&sl->spin);
855                 slcmd = slmsg_rb_tree_RB_LOOKUP(&sl->reply_rb_root,
856                                                 slmsg->msg->sm_msgid);
857                 if (slcmd == NULL || (slcmd->flags & SLMSGF_ONINQ)) {
858                         error = ENOENT;
859                         spin_unlock_wr(&sl->spin);
860                         goto done1;
861                 }
862                 RB_REMOVE(slmsg_rb_tree, &sl->reply_rb_root, slcmd);
863                 sl->cmdbytes -= slcmd->maxsize;
864                 spin_unlock_wr(&sl->spin);
865
866                 /*
867                  * If the original command specified DMAR, has an xio, and
868                  * our write specifies a DMA buffer, then we can do a
869                  * copyback.  But if we are linearly mapped and the caller
870                  * is using the map base address, then the caller filled in
871                  * the data via the direct memory map and no copyback is
872                  * needed.
873                  */
874                 if ((slcmd->msg->sm_head.se_cmd & SE_CMDF_DMAR) && iov1 &&
875                     (slcmd->flags & SLMSGF_HASXIO) &&
876                     ((slcmd->flags & SLMSGF_LINMAP) == 0 ||
877                      iov1->iov_base != slcmd->vmbase)
878                 ) {
879                         size_t count;
880                         if (iov1->iov_len > slcmd->xio.xio_bytes)
881                                 count = slcmd->xio.xio_bytes;
882                         else
883                                 count = iov1->iov_len;
884                         get_mplock();
885                         error = xio_copy_utox(&slcmd->xio, 0, iov1->iov_base,
886                                               count);
887                         rel_mplock();
888                 }
889
890                 /*
891                  * If we had mapped a DMA buffer, remove it
892                  */
893                 if (slcmd->flags & SLMSGF_LINMAP) {
894                         get_mplock();
895                         sl_local_munmap(slcmd);
896                         rel_mplock();
897                 }
898
899                 /*
900                  * Reply and handle unblocking
901                  */
902                 sl->peer->backend_reply(sl->peer, slcmd, slmsg);
903                 if (sl->wblocked && sl->cmdbytes < syslink_bufsize) {
904                         sl->wblocked = 0;       /* MP race ok here */
905                         wakeup(&sl->wblocked);
906                 }
907
908                 /*
909                  * slmsg has already been dealt with, make sure error is
910                  * 0 so we do not double-free it.
911                  */
912                 error = 0;
913         }
914         /* fall through */
915 done1:
916         if (error)
917                 slmsg_put(slmsg);
918         /* fall through */
919 done2:
920         return(error);
921 }
922
923 /*
924  * Close a syslink descriptor.
925  *
926  * Disassociate the syslink from the file descriptor and disconnect from
927  * any peer.
928  */
929 static
930 int
931 slfileop_close(struct file *fp)
932 {
933         struct sldesc *sl;
934
935         /*
936          * Disassociate the file pointer.  Take ownership of the ref on the
937          * sldesc.
938          */
939         sl = fp->f_data;
940         fp->f_data = NULL;
941         fp->f_ops = &badfileops;
942         sl->xfp = NULL;
943
944         /*
945          * Shutdown both directions.  The other side will not issue API
946          * calls to us after we've shutdown both directions.
947          */
948         shutdownsldesc(sl, SHUT_RDWR);
949
950         /*
951          * Cleanup
952          */
953         KKASSERT(sl->cmdbytes == 0);
954         KKASSERT(sl->repbytes == 0);
955         sldrop(sl);
956         return(0);
957 }
958
959 static
960 int
961 slfileop_stat (struct file *fp, struct stat *sb, struct ucred *cred)
962 {
963         return(EINVAL);
964 }
965
966 static
967 int
968 slfileop_shutdown (struct file *fp, int how)
969 {
970         shutdownsldesc((struct sldesc *)fp->f_data, how);
971         return(0);
972 }
973
974 static
975 int
976 slfileop_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
977 {
978         return(EINVAL);
979 }
980
981 static
982 int
983 slfileop_poll (struct file *fp, int events, struct ucred *cred)
984 {
985         return(0);
986 }
987
988 static
989 int
990 slfileop_kqfilter(struct file *fp, struct knote *kn)
991 {
992         return(0);
993 }
994
995 /************************************************************************
996  *                          LOCAL MEMORY MAPPING                        *
997  ************************************************************************
998  *
999  * This feature is currently not implemented
1000  *
1001  */
1002
1003 static
1004 int
1005 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1006 {
1007         return (EOPNOTSUPP);
1008 }
1009
1010 static
1011 void
1012 sl_local_munmap(struct slmsg *slmsg)
1013 {
1014         /* empty */
1015 }
1016
1017 #if 0
1018
1019 static
1020 int
1021 sl_local_mmap(struct slmsg *slmsg, char *base, size_t len)
1022 {
1023         struct vmspace *vms = curproc->p_vmspace;
1024         vm_offset_t addr = (vm_offset_t)base;
1025
1026         /* XXX  check user address range */
1027         error = vm_map_replace(
1028                         &vma->vm_map,
1029                         (vm_offset_t)base, (vm_offset_t)base + len,
1030                         slmsg->xio.xio_pages[0]->object,
1031                         slmsg->xio.xio_pages[0]->pindex << PAGE_SHIFT,
1032                         VM_PROT_READ|VM_PROT_WRITE,
1033                         VM_PROT_READ|VM_PROT_WRITE,
1034                         MAP_DISABLE_SYNCER);
1035         }
1036         if (error == 0) {
1037                 slmsg->flags |= SLMSGF_LINMAP;
1038                 slmsg->vmbase = base;
1039                 slmsg->vmsize = len;
1040         }
1041         return (error);
1042 }
1043
1044 static
1045 void
1046 sl_local_munmap(struct slmsg *slmsg)
1047 {
1048         if (slmsg->flags & SLMSGF_LINMAP) {
1049                 vm_map_remove(&curproc->p_vmspace->vm_map,
1050                               slmsg->vmbase,
1051                               slmsg->vmbase + slcmd->vmsize);
1052                 slmsg->flags &= ~SLMSGF_LINMAP;
1053         }
1054 }
1055
1056 #endif
1057
1058 /************************************************************************
1059  *                          MESSAGE VALIDATION                          *
1060  ************************************************************************
1061  *
1062  * Validate that the syslink message.  Check that all headers and elements
1063  * conform.  Correct the endian if necessary.
1064  *
1065  * NOTE: If reverse endian needs to be corrected, SE_CMDF_UNTRANSLATED
1066  * is recursively flipped on all syslink_elm's in the message.  As the
1067  * message traverses the mesh, multiple flips may occur.  It is
1068  * up to the RPC protocol layer to correct opaque data payloads and
1069  * SE_CMDF_UNTRANSLATED prevents the protocol layer from misinterpreting
1070  * a command or reply element which has not been endian-corrected.
1071  */
1072 static
1073 int
1074 syslink_validate_msg(struct syslink_msg *msg, int bytes)
1075 {
1076         int aligned_reclen;
1077         int swapit;
1078         int error;
1079
1080         /*
1081          * The raw message must be properly-aligned.
1082          */
1083         if (bytes & SL_ALIGNMASK)
1084                 return (EINVAL);
1085
1086         while (bytes) {
1087                 /*
1088                  * The message must at least contain the msgid, bytes, and
1089                  * protoid.
1090                  */
1091                 if (bytes < SL_MIN_PAD_SIZE)
1092                         return (EINVAL);
1093
1094                 /*
1095                  * Fix the endian if it is reversed.
1096                  */
1097                 if (msg->sm_proto & SM_PROTO_ENDIAN_REV) {
1098                         msg->sm_msgid = bswap64(msg->sm_msgid);
1099                         msg->sm_sessid = bswap64(msg->sm_sessid);
1100                         msg->sm_bytes = bswap16(msg->sm_bytes);
1101                         msg->sm_proto = bswap16(msg->sm_proto);
1102                         msg->sm_rlabel = bswap32(msg->sm_rlabel);
1103                         if (msg->sm_proto & SM_PROTO_ENDIAN_REV)
1104                                 return (EINVAL);
1105                         swapit = 1;
1106                 } else {
1107                         swapit = 0;
1108                 }
1109
1110                 /*
1111                  * Validate the contents.  For PADs, the entire payload is
1112                  * ignored and the minimum message size can be as small as
1113                  * 8 bytes.
1114                  */
1115                 if (msg->sm_proto == SMPROTO_PAD) {
1116                         if (msg->sm_bytes < SL_MIN_PAD_SIZE ||
1117                             msg->sm_bytes > bytes) {
1118                                 return (EINVAL);
1119                         }
1120                         /* ignore the entire payload, it can be garbage */
1121                 } else {
1122                         if (msg->sm_bytes < SL_MIN_MSG_SIZE ||
1123                             msg->sm_bytes > bytes) {
1124                                 return (EINVAL);
1125                         }
1126                         error = syslink_validate_elm(
1127                                     &msg->sm_head,
1128                                     msg->sm_bytes - 
1129                                         offsetof(struct syslink_msg,
1130                                                  sm_head),
1131                                     swapit, SL_MAXDEPTH);
1132                         if (error)
1133                                 return (error);
1134                 }
1135
1136                 /*
1137                  * The aligned payload size must be used to locate the
1138                  * next syslink_msg in the buffer.
1139                  */
1140                 aligned_reclen = SL_MSG_ALIGN(msg->sm_bytes);
1141                 bytes -= aligned_reclen;
1142                 msg = (void *)((char *)msg + aligned_reclen);
1143         }
1144         return(0);
1145 }
1146
1147 static
1148 int
1149 syslink_validate_elm(struct syslink_elm *elm, sl_reclen_t bytes,
1150                      int swapit, int depth)
1151 {
1152         int aligned_reclen;
1153
1154         /*
1155          * If the buffer isn't big enough to fit the header, stop now!
1156          */
1157         if (bytes < SL_MIN_ELM_SIZE)
1158                 return (EINVAL);
1159         /*
1160          * All syslink_elm headers are recursively endian-adjusted.  Opaque
1161          * data payloads are not.
1162          */
1163         if (swapit) {
1164                 elm->se_cmd = bswap16(elm->se_cmd) ^ SE_CMDF_UNTRANSLATED;
1165                 elm->se_bytes = bswap16(elm->se_bytes);
1166                 elm->se_aux = bswap32(elm->se_aux);
1167         }
1168
1169         /*
1170          * Check element size requirements.
1171          */
1172         if (elm->se_bytes < SL_MIN_ELM_SIZE || elm->se_bytes > bytes)
1173                 return (EINVAL);
1174
1175         /*
1176          * Recursively check structured payloads.  A structured payload may
1177          * contain as few as 0 recursive elements.
1178          */
1179         if (elm->se_cmd & SE_CMDF_STRUCTURED) {
1180                 if (depth == 0)
1181                         return (EINVAL);
1182                 bytes -= SL_MIN_ELM_SIZE;
1183                 ++elm;
1184                 while (bytes > 0) {
1185                         if (syslink_validate_elm(elm, bytes, swapit, depth - 1))
1186                                 return (EINVAL);
1187                         aligned_reclen = SL_MSG_ALIGN(elm->se_bytes);
1188                         elm = (void *)((char *)elm + aligned_reclen);
1189                         bytes -= aligned_reclen;
1190                 }
1191         }
1192         return(0);
1193 }
1194
1195 /************************************************************************
1196  *                  BACKEND FUNCTIONS - USER DESCRIPTOR                 *
1197  ************************************************************************
1198  *
1199  * Peer backend links are primarily used when userland creates a pair
1200  * of linked descriptors.
1201  */
1202
1203 /*
1204  * Do any required blocking / nbio handling for attempts to write to
1205  * a sldesc associated with a user descriptor.
1206  */
1207 static
1208 int
1209 backend_wblocked_user(struct sldesc *sl, int nbio, sl_proto_t proto)
1210 {
1211         int error = 0;
1212         int *bytesp = (proto & SM_PROTO_REPLY) ? &sl->repbytes : &sl->cmdbytes;
1213
1214         /*
1215          * Block until sufficient data is drained by the target.  It is
1216          * ok to have a MP race against cmdbytes.
1217          */
1218         if (*bytesp >= syslink_bufsize) {
1219                 spin_lock_wr(&sl->spin);
1220                 while (*bytesp >= syslink_bufsize) {
1221                         if (sl->flags & SLF_WSHUTDOWN) {
1222                                 error = EPIPE;
1223                                 break;
1224                         }
1225                         if (nbio) {
1226                                 error = EAGAIN;
1227                                 break;
1228                         }
1229                         ++sl->wblocked;
1230                         error = msleep(&sl->wblocked, &sl->spin,
1231                                        PCATCH, "slwmsg", 0);
1232                         if (error)
1233                                 break;
1234                 }
1235                 spin_unlock_wr(&sl->spin);
1236         }
1237         return (error);
1238 }
1239
1240 /*
1241  * Unconditionally write a syslink message to the sldesc associated with
1242  * a user descriptor.  Command messages are also placed in a red-black
1243  * tree so their DMA tag (if any) can be accessed and so they can be
1244  * linked to any reply message.
1245  */
1246 static
1247 int
1248 backend_write_user(struct sldesc *sl, struct slmsg *slmsg)
1249 {
1250         int error;
1251
1252         spin_lock_wr(&sl->spin);
1253         if (sl->flags & SLF_RSHUTDOWN) {
1254                 /*
1255                  * Not accepting new messages
1256                  */
1257                 error = EPIPE;
1258         } else if (slmsg->msg->sm_proto & SM_PROTO_REPLY) {
1259                 /*
1260                  * Write a reply
1261                  */
1262                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1263                 sl->repbytes += slmsg->maxsize;
1264                 slmsg->flags |= SLMSGF_ONINQ;
1265                 error = 0;
1266         } else if (RB_INSERT(slmsg_rb_tree, &sl->reply_rb_root, slmsg)) {
1267                 /*
1268                  * Write a command, but there was a msgid collision when
1269                  * we tried to insert it into the RB tree.
1270                  */
1271                 error = EEXIST;
1272         } else {
1273                 /*
1274                  * Write a command, successful insertion into the RB tree.
1275                  */
1276                 TAILQ_INSERT_TAIL(&sl->inq, slmsg, tqnode);
1277                 sl->cmdbytes += slmsg->maxsize;
1278                 slmsg->flags |= SLMSGF_ONINQ;
1279                 error = 0;
1280         }
1281         spin_unlock_wr(&sl->spin);
1282         if (sl->rwaiters)
1283                 wakeup(&sl->rwaiters);
1284         return(error);
1285 }
1286
1287 /*
1288  * Our peer is replying a command we previously sent it back to us, along
1289  * with the reply message (if not NULL).  We just queue the reply to
1290  * userland and free of the command.
1291  */
1292 static
1293 void
1294 backend_reply_user(struct sldesc *sl, struct slmsg *slcmd, struct slmsg *slrep)
1295 {
1296         int error;
1297
1298         slmsg_put(slcmd);
1299         if (slrep) {
1300                 spin_lock_wr(&sl->spin);
1301                 if ((sl->flags & SLF_RSHUTDOWN) == 0) {
1302                         TAILQ_INSERT_TAIL(&sl->inq, slrep, tqnode);
1303                         sl->repbytes += slrep->maxsize;
1304                         error = 0;
1305                 } else {
1306                         error = EPIPE;
1307                 }
1308                 spin_unlock_wr(&sl->spin);
1309                 if (error)
1310                         sl->peer->backend_dispose(sl->peer, slrep);
1311                 else if (sl->rwaiters)
1312                         wakeup(&sl->rwaiters);
1313         }
1314 }
1315
1316 static
1317 void
1318 backend_dispose_user(struct sldesc *sl, struct slmsg *slmsg)
1319 {
1320         slmsg_put(slmsg);
1321 }
1322
1323 /************************************************************************
1324  *                      KERNEL DRIVER OR FILESYSTEM API                 *
1325  ************************************************************************
1326  *
1327  */
1328
1329 /*
1330  * Create a user<->kernel link, returning the user descriptor in *fdp
1331  * and the kernel descriptor in *kslp.  0 is returned on success, and an
1332  * error code is returned on failure.
1333  */
1334 int
1335 syslink_ukbackend(int *fdp, struct sldesc **kslp)
1336 {
1337         struct proc *p = curproc;
1338         struct file *fp;
1339         struct sldesc *usl;
1340         struct sldesc *ksl;
1341         int error;
1342         int fd;
1343
1344         *fdp = -1;
1345         *kslp = NULL;
1346
1347         error = falloc(p, &fp, &fd);
1348         if (error)
1349                 return(error);
1350         usl = allocsldesc(NULL);
1351         usl->backend_wblocked = backend_wblocked_user;
1352         usl->backend_write = backend_write_user;
1353         usl->backend_reply = backend_reply_user;
1354         usl->backend_dispose = backend_dispose_user;
1355
1356         ksl = allocsldesc(usl->common);
1357         ksl->peer = usl;
1358         ksl->backend_wblocked = backend_wblocked_kern;
1359         ksl->backend_write = backend_write_kern;
1360         ksl->backend_reply = backend_reply_kern;
1361         ksl->backend_dispose = backend_dispose_kern;
1362
1363         usl->peer = ksl;
1364
1365         setsldescfp(usl, fp);
1366         fsetfd(p, fp, fd);
1367         fdrop(fp);
1368
1369         *fdp = fd;
1370         *kslp = ksl;
1371         return(0);
1372 }
1373
1374 /*
1375  * Assign a unique message id, issue a syslink message to userland,
1376  * and wait for a reply.
1377  */
1378 int
1379 syslink_kdomsg(struct sldesc *ksl, struct slmsg *slmsg)
1380 {
1381         struct syslink_msg *msg;
1382         int error;
1383
1384         /*
1385          * Finish initializing slmsg and post it to the red-black tree for
1386          * reply matching.  If the message id is already in use we return
1387          * EEXIST, giving the originator the chance to roll a new msgid.
1388          */
1389         msg = slmsg->msg;
1390         slmsg->msgsize = msg->sm_bytes;
1391         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1392                 return (error);
1393         msg->sm_msgid = allocsysid();
1394
1395         /*
1396          * Issue the request and wait for a matching reply or failure,
1397          * then remove the message from the matching tree and return.
1398          */
1399         error = ksl->peer->backend_write(ksl->peer, slmsg);
1400         spin_lock_wr(&ksl->spin);
1401         if (error == 0) {
1402                 while (slmsg->rep == NULL) {
1403                         error = msleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1404                         /* XXX ignore error for now */
1405                 }
1406                 if (slmsg->rep == (struct slmsg *)-1) {
1407                         error = EIO;
1408                         slmsg->rep = NULL;
1409                 } else {
1410                         error = slmsg->rep->msg->sm_head.se_aux;
1411                 }
1412         }
1413         spin_unlock_wr(&ksl->spin);
1414         return(error);
1415 }
1416
1417 /*
1418  * Similar to syslink_kdomsg but return immediately instead of
1419  * waiting for a reply.  The kernel must supply a callback function
1420  * which will be made in the context of the user process replying
1421  * to the message.
1422  */
1423 int
1424 syslink_ksendmsg(struct sldesc *ksl, struct slmsg *slmsg,
1425                  void (*func)(struct slmsg *, void *, int), void *arg)
1426 {
1427         struct syslink_msg *msg;
1428         int error;
1429
1430         /*
1431          * Finish initializing slmsg and post it to the red-black tree for
1432          * reply matching.  If the message id is already in use we return
1433          * EEXIST, giving the originator the chance to roll a new msgid.
1434          */
1435         msg = slmsg->msg;
1436         slmsg->msgsize = msg->sm_bytes;
1437         slmsg->callback_func = func;
1438         slmsg->callback_data = arg;
1439         if ((error = syslink_validate_msg(msg, msg->sm_bytes)) != 0)
1440                 return (error);
1441         msg->sm_msgid = allocsysid();
1442
1443         /*
1444          * Issue the request.  If no error occured the operation will be
1445          * in progress, otherwise the operation is considered to have failed
1446          * and the caller can deallocate the slmsg.
1447          */
1448         error = ksl->peer->backend_write(ksl->peer, slmsg);
1449         return (error);
1450 }
1451
1452 int
1453 syslink_kwaitmsg(struct sldesc *ksl, struct slmsg *slmsg)
1454 {
1455         int error;
1456
1457         spin_lock_wr(&ksl->spin);
1458         while (slmsg->rep == NULL) {
1459                 error = msleep(slmsg, &ksl->spin, 0, "kwtmsg", 0);
1460                 /* XXX ignore error for now */
1461         }
1462         if (slmsg->rep == (struct slmsg *)-1) {
1463                 error = EIO;
1464                 slmsg->rep = NULL;
1465         } else {
1466                 error = slmsg->rep->msg->sm_head.se_aux;
1467         }
1468         spin_unlock_wr(&ksl->spin);
1469         return(error);
1470 }
1471
1472 struct slmsg *
1473 syslink_kallocmsg(void)
1474 {
1475         return(objcache_get(sl_objcache_small, M_WAITOK));
1476 }
1477
1478 void
1479 syslink_kfreemsg(struct sldesc *ksl, struct slmsg *slmsg)
1480 {
1481         struct slmsg *rep;
1482
1483         if ((rep = slmsg->rep) != NULL) {
1484                 slmsg->rep = NULL;
1485                 ksl->peer->backend_dispose(ksl->peer, rep);
1486         }
1487         slmsg->callback_func = NULL;
1488         slmsg_put(slmsg);
1489 }
1490
1491 void
1492 syslink_kshutdown(struct sldesc *ksl, int how)
1493 {
1494         shutdownsldesc(ksl, how);
1495 }
1496
1497 void
1498 syslink_kclose(struct sldesc *ksl)
1499 {
1500         shutdownsldesc(ksl, SHUT_RDWR);
1501         sldrop(ksl);
1502 }
1503
1504 /*
1505  * Associate a DMA buffer with a kernel syslink message prior to it
1506  * being sent to userland.  The DMA buffer is set up from the point
1507  * of view of the target.
1508  */
1509 int
1510 syslink_kdmabuf_pages(struct slmsg *slmsg, struct vm_page **mbase, int npages)
1511 {
1512         int xflags;
1513         int error;
1514
1515         xflags = XIOF_VMLINEAR;
1516         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1517                 xflags |= XIOF_READ | XIOF_WRITE;
1518         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1519                 xflags |= XIOF_READ;
1520         error = xio_init_pages(&slmsg->xio, mbase, npages, xflags);
1521         slmsg->flags |= SLMSGF_HASXIO;
1522         return (error);
1523 }
1524
1525 /*
1526  * Associate a DMA buffer with a kernel syslink message prior to it
1527  * being sent to userland.  The DMA buffer is set up from the point
1528  * of view of the target.
1529  */
1530 int
1531 syslink_kdmabuf_data(struct slmsg *slmsg, char *base, int bytes)
1532 {
1533         int xflags;
1534
1535         xflags = XIOF_VMLINEAR;
1536         if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAR)
1537                 xflags |= XIOF_READ | XIOF_WRITE;
1538         else if (slmsg->msg->sm_head.se_cmd & SE_CMDF_DMAW)
1539                 xflags |= XIOF_READ;
1540         xio_init_kbuf(&slmsg->xio, base, bytes);
1541         slmsg->xio.xio_flags |= xflags;
1542         slmsg->flags |= SLMSGF_HASXIO;
1543         return(0);
1544 }
1545
1546 /************************************************************************
1547  *                  BACKEND FUNCTIONS FOR KERNEL API                    *
1548  ************************************************************************
1549  *
1550  * These are the backend functions for a sldesc associated with a kernel
1551  * API.
1552  */
1553
1554 /*
1555  * Our peer wants to write a syslink message to us and is asking us to
1556  * block if our input queue is full.  We don't implement command reception
1557  * so don't block right now.
1558  */
1559 static
1560 int
1561 backend_wblocked_kern(struct sldesc *ksl, int nbio, sl_proto_t proto)
1562 {
1563         /* never blocks */
1564         return(0);
1565 }
1566
1567 /*
1568  * Our peer is writing a request to the kernel.  At the moment we do not
1569  * accept commands.
1570  */
1571 static
1572 int
1573 backend_write_kern(struct sldesc *ksl, struct slmsg *slmsg)
1574 {
1575         return(EOPNOTSUPP);
1576 }
1577
1578 /*
1579  * Our peer wants to reply to a syslink message we sent it earlier.  The
1580  * original command (that we passed to our peer), and the peer's reply
1581  * is specified.  If the peer has failed slrep will be NULL.
1582  */
1583 static
1584 void
1585 backend_reply_kern(struct sldesc *ksl, struct slmsg *slcmd, struct slmsg *slrep)
1586 {
1587         int error;
1588
1589         spin_lock_wr(&ksl->spin);
1590         if (slrep == NULL) {
1591                 slcmd->rep = (struct slmsg *)-1;
1592                 error = EIO;
1593         } else {
1594                 slcmd->rep = slrep;
1595                 error = slrep->msg->sm_head.se_aux;
1596         }
1597         spin_unlock_wr(&ksl->spin);
1598
1599         /*
1600          * Issue callback or wakeup a synchronous waiter.
1601          */
1602         if (slcmd->callback_func) {
1603                 slcmd->callback_func(slcmd, slcmd->callback_data, error);
1604         } else {
1605                 wakeup(slcmd);
1606         }
1607 }
1608
1609 /*
1610  * Any reply messages we sent to our peer are returned to us for disposal.
1611  * Since we do not currently accept commands from our peer, there will not
1612  * be any replies returned to the peer to dispose of.
1613  */
1614 static
1615 void
1616 backend_dispose_kern(struct sldesc *ksl, struct slmsg *slmsg)
1617 {
1618         panic("backend_dispose_kern: kernel can't accept commands so it "
1619               "certainly did not reply to one!");
1620 }
1621