struct xa_softc *xa;
dmsg_blk_error_t status;
kdmsg_state_t *state;
+ kdmsg_circuit_t *circ;
struct bio *bio;
- uint64_t circuit;
int running; /* transaction running */
int waitseq; /* streaming reply */
int done; /* final (transaction closed) */
static int
xdisk_open(struct dev_open_args *ap)
{
- kprintf("XDISK_OPEN\n");
lwkt_gettoken(&xdisk_token);
++xdisk_opencount;
lwkt_reltoken(&xdisk_token);
static int
xdisk_close(struct dev_close_args *ap)
{
- kprintf("XDISK_CLOSE\n");
lwkt_gettoken(&xdisk_token);
--xdisk_opencount;
lwkt_reltoken(&xdisk_token);
char devname[64];
cdev_t dev;
- kprintf("xdisk attach %d %jd/%d %s %s\n",
- xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize,
- xaioc->cl_label, xaioc->fs_label);
-
/*
* Normalize ioctl params
*/
++unit;
}
xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
- kprintf("ALLOCATE XA %p\n", xa);
xa->unit = unit;
xa->serializing = 1;
lwkt_token_init(&xa->tok, "xa");
}
TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
}
+
+ /*
+ * (xa) is now serializing.
+ */
xa->xaioc = *xaioc;
xa->attached = 1;
lwkt_reltoken(&xdisk_token);
tsleep(xa, 0, "xadet", hz / 10);
}
if (xa) {
- kprintf("DETACHING XA\n");
kdmsg_iocom_uninit(&xa->iocom);
xa->serializing = 0;
}
{
struct xa_softc *xa = iocom->handle;
- kprintf("XA_EXIT UNIT %d\n", xa->unit);
+ lwkt_gettoken(&xa->tok);
+ lwkt_gettoken(&xdisk_token);
+
+ /*
+ * We must wait for any I/O's to complete to ensure that all
+ * state structure references are cleaned up before returning.
+ */
+ xa->attached = -1; /* force deferral or failure */
+ while (TAILQ_FIRST(&xa->tag_pendq)) {
+ tsleep(xa, 0, "xabiow", hz / 10);
+ }
- if (xa->serializing == 0)
+ /*
+ * All serializing code checks for de-initialization so only
+ * do it if we aren't already serializing.
+ */
+ if (xa->serializing == 0) {
+ xa->serializing = 1;
kdmsg_iocom_uninit(iocom);
+ xa->serializing = 0;
+ }
/*
* If the drive is not in use and no longer attach it can be
* destroyed.
*/
- lwkt_gettoken(&xdisk_token);
xa->attached = 0;
xa_terminate_check(xa);
lwkt_reltoken(&xdisk_token);
+ lwkt_reltoken(&xa->tok);
}
/*
xa_terminate_check(struct xa_softc *xa)
{
xa_tag_t *tag;
+ struct bio *bio;
if (xa->opencnt || xa->attached || xa->serializing)
return;
xa->serializing = 1;
- kprintf("TERMINATE XA %p %d\n", xa, xa->unit);
kdmsg_iocom_uninit(&xa->iocom);
+
+ /*
+ * When destroying an xa make sure all pending I/O (typically
+ * from the disk probe) is done.
+ *
+ * XXX what about new I/O initiated prior to disk_destroy().
+ */
+ while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
+ TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
+ if ((bio = tag->bio) != NULL) {
+ tag->bio = NULL;
+ bio->bio_buf->b_error = ENXIO;
+ bio->bio_buf->b_flags |= B_ERROR;
+ biodone(bio);
+ }
+ TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
+ }
if (xa->dev) {
disk_destroy(&xa->disk);
xa->dev->si_drv1 = NULL;
xa->dev = NULL;
}
- kprintf("REMOVEQ XA %p %d\n", xa, xa->unit);
KKASSERT(xa->opencnt == 0 && xa->attached == 0);
- kprintf("IOCOMUN XA %p %d\n", xa, xa->unit);
while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
tag->xa = NULL;
KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
kfree(xa, M_XDISK);
- kprintf("xa_close: destroy unreferenced disk\n");
}
/*
static void
xa_autodmsg(kdmsg_msg_t *msg)
{
- struct xa_softc *xa = msg->iocom->handle;
+ xa_softc_t *xa = msg->iocom->handle;
+
kdmsg_circuit_t *circ;
kdmsg_circuit_t *cscan;
uint32_t xcmd;
/*
* Track established circuits
*/
- kprintf("XA: Received autodmsg: CREATE+REPLY\n");
circ = msg->state->any.circ;
lwkt_gettoken(&xa->tok);
if (circ->recorded == 0) {
lwkt_reltoken(&xa->tok);
break;
case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
- kprintf("XA: Received autodmsg: DELETE+REPLY\n");
+ /*
+ * Losing virtual circuit. Scan pending tags.
+ */
circ = msg->state->any.circ;
lwkt_gettoken(&xa->tok);
if (circ->recorded) {
*/
if (msg->aux_data) {
msg->aux_data[msg->aux_size - 1] = 0;
- kprintf("DEBUGMSG: %s\n", msg->aux_data);
+ kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
}
break;
default:
tsleep(xa, 0, "xarace", hz / 10);
goto again;
}
+ if (xa->attached == 0) {
+ lwkt_reltoken(&xdisk_token);
+ return ENXIO; /* raced destruction */
+ }
/*
* Serialize initial open
xa->serializing = 1;
lwkt_reltoken(&xdisk_token);
- kprintf("XA OPEN COMMAND\n");
-
tag = xa_setup_cmd(xa, NULL);
if (tag == NULL) {
lwkt_gettoken(&xdisk_token);
lwkt_reltoken(&xdisk_token);
return(ENXIO);
}
- msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+ msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
DMSG_BLK_OPEN | DMSGF_CREATE,
xa_sync_completion, tag);
msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
xa_start(tag, msg);
if (xa_wait(tag, 0) == 0) {
- kprintf("XA OPEN GOOD\n");
xa->keyid = tag->status.keyid;
xa->opentag = tag; /* leave tag open */
xa->serializing = 0;
error = 0;
} else {
- kprintf("XA OPEN BAD\n");
xa_done(tag, 0);
lwkt_gettoken(&xdisk_token);
KKASSERT(xa->opencnt > 0);
xa_tag_t *tag;
struct bio *bio = ap->a_bio;
-#if 0
- bio->bio_buf->b_error = ENXIO;
- bio->bio_buf->b_flags |= B_ERROR;
- biodone(bio);
- return(0);
-#endif
+ /*
+ * Allow potentially temporary link failures to fail the I/Os
+ * only if the device is not open. That is, we allow the disk
+ * probe code prior to mount to fail.
+ */
+ if (xa->attached == 0 && xa->opencnt == 0) {
+ bio->bio_buf->b_error = ENXIO;
+ bio->bio_buf->b_flags |= B_ERROR;
+ biodone(bio);
+ return(0);
+ }
tag = xa_setup_cmd(xa, bio);
if (tag)
* Only get a tag if we have a valid virtual circuit to the server.
*/
lwkt_gettoken(&xa->tok);
- if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) {
+ if ((circ = TAILQ_FIRST(&xa->circq)) == NULL || xa->attached <= 0) {
tag = NULL;
} else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
tag->bio = bio;
- tag->circuit = circ->circ_state->msgid;
+ tag->circ = circ;
+ kdmsg_circ_hold(circ);
+ TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
}
/*
switch(bp->b_cmd) {
case BUF_CMD_READ:
- msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+ msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
DMSG_BLK_READ |
DMSGF_CREATE | DMSGF_DELETE,
xa_bio_completion, tag);
msg->any.blk_read.bytes = bp->b_bcount;
break;
case BUF_CMD_WRITE:
- msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+ msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
DMSG_BLK_WRITE |
DMSGF_CREATE | DMSGF_DELETE,
xa_bio_completion, tag);
msg->aux_size = bp->b_bcount;
break;
case BUF_CMD_FLUSH:
- msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+ msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
DMSG_BLK_FLUSH |
DMSGF_CREATE | DMSGF_DELETE,
xa_bio_completion, tag);
msg->any.blk_flush.bytes = bp->b_bcount;
break;
case BUF_CMD_FREEBLKS:
- msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+ msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
DMSG_BLK_FREEBLKS |
DMSGF_CREATE | DMSGF_DELETE,
xa_bio_completion, tag);
tag->done = 0;
tag->waitseq = 0;
if (msg) {
-#if 0
- lwkt_gettoken(&xa->tok);
- TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
-#endif
tag->state = msg->state;
-#if 0
- lwkt_reltoken(&xa->tok);
-#endif
kdmsg_msg_write(msg);
} else {
xa_done(tag, 1);
KKASSERT(tag->bio == NULL);
tag->done = 1;
+ tag->state = NULL;
lwkt_gettoken(&xa->tok);
if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
lwkt_reltoken(&xa->tok);
xa_start(tag, NULL);
} else {
+ if (tag->circ) {
+ kdmsg_circ_drop(tag->circ);
+ tag->circ = NULL;
+ }
+ TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
lwkt_reltoken(&xa->tok);
}
tag->status = msg->any.blk_error;
break;
}
- kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n",
- tag->status.head.error, tag->status.resid);
if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
kdmsg_msg_reply(msg, 0); /* terminate our side */
tag->done = 1;
tag->status = msg->any.blk_error;
break;
}
- kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n",
- tag->status.head.error, tag->status.resid);
/*
* Process bio completion
/*
* Handle completion of the transaction. If the bioq is not empty
* we can initiate another bio on the same tag.
+ *
+ * NOTE: Most of our transactions will be single-message
+ * CREATE+DELETEs, so we won't have to terminate the
+ * transaction separately, here. But just in case they
+ * aren't be sure to terminate the transaction.
*/
handle_done:
- if (msg->any.head.cmd & DMSGF_DELETE)
+ if (msg->any.head.cmd & DMSGF_DELETE) {
xa_done(tag, 1);
+ if ((state->txcmd & DMSGF_DELETE) == 0) {
+ kdmsg_msg_reply(msg, 0);
+ }
+ }
return (0);
}
tag = xa_setup_cmd(xa, NULL);
if (tag == NULL)
break;
- kprintf("xa: Restart BIO %p on %s\n",
- bio, xa->iocom.auto_lnk_conn.fs_label);
TAILQ_REMOVE(&xa->bioq, bio, bio_act);
tag->bio = bio;
xa_start(tag, NULL);
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+/*
+ * TODO: txcmd CREATE state is deferred by txmsgq, need to calculate
+ * a streaming response. See subr_diskiocom()'s diskiodone().
+ */
#include <sys/param.h>
#include <sys/types.h>
#include <sys/kernel.h>
#include <sys/dmsg.h>
RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
+RB_GENERATE(kdmsg_circuit_tree, kdmsg_circuit, rbnode, kdmsg_circuit_cmp);
+
+static int kdmsg_msg_receive_handling(kdmsg_msg_t *msg);
+static int kdmsg_circ_msgrx(kdmsg_msg_t *msg);
+static int kdmsg_state_msgrx(kdmsg_msg_t *msg);
+static int kdmsg_state_msgtx(kdmsg_msg_t *msg);
+static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
+static void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
+static void kdmsg_state_abort(kdmsg_state_t *state);
+static void kdmsg_state_free(kdmsg_state_t *state);
-static void kdmsg_circ_free_check(kdmsg_circuit_t *circ);
static void kdmsg_iocom_thread_rd(void *arg);
static void kdmsg_iocom_thread_wr(void *arg);
static int kdmsg_autorxmsg(kdmsg_msg_t *msg);
static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token);
+void
+kdmsg_circ_hold(kdmsg_circuit_t *circ)
+{
+ atomic_add_int(&circ->refs, 1);
+}
+
+void
+kdmsg_circ_drop(kdmsg_circuit_t *circ)
+{
+ kdmsg_iocom_t *iocom;
+
+ if (atomic_fetchadd_int(&circ->refs, -1) == 1) {
+ KKASSERT(circ->span_state == NULL &&
+ circ->circ_state == NULL &&
+ circ->rcirc_state == NULL &&
+ circ->recorded == 0);
+ iocom = circ->iocom;
+ circ->iocom = NULL;
+ kfree(circ, iocom->mmsg);
+ }
+}
+
+
/*
* Initialize the roll-up communications structure for a network
* messaging session. This function does not install the socket.
iocom->flags = flags;
lockinit(&iocom->msglk, "h2msg", 0, 0);
TAILQ_INIT(&iocom->msgq);
+ RB_INIT(&iocom->circ_tree);
RB_INIT(&iocom->staterd_tree);
RB_INIT(&iocom->statewr_tree);
}
/*
* Destroy the current connection
*/
+ lockmgr(&iocom->msglk, LK_EXCLUSIVE);
atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
while (iocom->msgrd_td || iocom->msgwr_td) {
wakeup(&iocom->msg_ctl);
- tsleep(iocom, 0, "clstrkl", hz);
+ lksleep(iocom, &iocom->msglk, 0, "clstrkl", hz);
}
/*
iocom->msg_ctl = 0;
iocom->msg_fp = fp;
iocom->msg_seq = 0;
+ iocom->flags &= ~KDMSG_IOCOMF_EXITNOACC;
lwkt_create(kdmsg_iocom_thread_rd, iocom, &iocom->msgrd_td,
NULL, 0, -1, "%s-msgrd", subsysname);
lwkt_create(kdmsg_iocom_thread_wr, iocom, &iocom->msgwr_td,
NULL, 0, -1, "%s-msgwr", subsysname);
+ lockmgr(&iocom->msglk, LK_RELEASE);
}
/*
iocom->auto_callback = auto_callback;
- msg = kdmsg_msg_alloc(iocom, 0,
+ msg = kdmsg_msg_alloc(iocom, NULL,
DMSG_LNK_CONN | DMSGF_CREATE,
kdmsg_lnk_conn_reply, NULL);
iocom->auto_lnk_conn.head = msg->any.head;
kdmsg_msg_t *rmsg;
if (msg->any.head.cmd & DMSGF_CREATE) {
- rmsg = kdmsg_msg_alloc(iocom, 0,
+ rmsg = kdmsg_msg_alloc(iocom, NULL,
DMSG_LNK_SPAN | DMSGF_CREATE,
kdmsg_lnk_span_reply, NULL);
iocom->auto_lnk_span.head = rmsg->any.head;
rmsg->any.lnk_span = iocom->auto_lnk_span;
kdmsg_msg_write(rmsg);
}
+
+ /*
+ * Process shim after the CONN is acknowledged and before the CONN
+ * transaction is deleted. For deletions this gives device drivers
+ * the ability to interlock new operations on the circuit before
+ * it becomes illegal and panics.
+ */
+ if (iocom->auto_callback)
+ iocom->auto_callback(msg);
+
if ((state->txcmd & DMSGF_DELETE) == 0 &&
(msg->any.head.cmd & DMSGF_DELETE)) {
iocom->conn_state = NULL;
kdmsg_msg_reply(msg, 0);
}
- if (iocom->auto_callback)
- iocom->auto_callback(msg);
return (0);
}
int
kdmsg_lnk_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
{
- /*kdmsg_iocom_t *iocom = state->iocom;*/
+ /*
+ * Be sure to process shim before terminating the SPAN
+ * transaction. Gives device drivers the ability to
+ * interlock new operations on the circuit before it
+ * becomes illegal and panics.
+ */
+ if (state->iocom->auto_callback)
+ state->iocom->auto_callback(msg);
if ((state->txcmd & DMSGF_DELETE) == 0 &&
(msg->any.head.cmd & DMSGF_DELETE)) {
kdmsg_msg_reply(msg, 0);
}
- if (state->iocom->auto_callback)
- state->iocom->auto_callback(msg);
return (0);
}
/*
* Ask the cluster controller to go away
*/
+ lockmgr(&iocom->msglk, LK_EXCLUSIVE);
atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
while (iocom->msgrd_td || iocom->msgwr_td) {
wakeup(&iocom->msg_ctl);
- tsleep(iocom, 0, "clstrkl", hz);
+ lksleep(iocom, &iocom->msglk, 0, "clstrkl", hz);
}
/*
fdrop(iocom->msg_fp);
iocom->msg_fp = NULL;
}
+ lockmgr(&iocom->msglk, LK_RELEASE);
}
/*
{
kdmsg_iocom_t *iocom = arg;
dmsg_hdr_t hdr;
- kdmsg_msg_t *msg;
+ kdmsg_msg_t *msg = NULL;
kdmsg_state_t *state;
size_t hbytes;
+ size_t abytes;
int error = 0;
while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0) {
break;
}
/* XXX messy: mask cmd to avoid allocating state */
- msg = kdmsg_msg_alloc(iocom, 0,
+ msg = kdmsg_msg_alloc(iocom, NULL,
hdr.cmd & DMSGF_BASECMDMASK,
NULL, NULL);
msg->any.head = hdr;
break;
}
}
- msg->aux_size = hdr.aux_bytes * DMSG_ALIGN;
+ msg->aux_size = hdr.aux_bytes;
if (msg->aux_size > DMSG_AUX_MAX) {
kprintf("kdmsg: illegal msg payload size %zd\n",
msg->aux_size);
break;
}
if (msg->aux_size) {
- msg->aux_data = kmalloc(msg->aux_size, iocom->mmsg,
- M_WAITOK | M_ZERO);
+ abytes = DMSG_DOALIGN(msg->aux_size);
+ msg->aux_data = kmalloc(abytes, iocom->mmsg, M_WAITOK);
msg->flags |= KDMSG_FLAG_AUXALLOC;
error = fp_read(iocom->msg_fp, msg->aux_data,
- msg->aux_size,
- NULL, 1, UIO_SYSSPACE);
+ abytes, NULL, 1, UIO_SYSSPACE);
if (error) {
kprintf("kdmsg: short msg payload received\n");
break;
}
}
- /*
- * State machine tracking, state assignment for msg,
- * returns error and discard status. Errors are fatal
- * to the connection except for EALREADY which forces
- * a discard without execution.
- */
- error = kdmsg_state_msgrx(msg);
- if (error) {
- /*
- * Raw protocol or connection error
- */
- kdmsg_msg_free(msg);
- if (error == EALREADY)
- error = 0;
- } else if (msg->state && msg->state->func) {
- /*
- * Message related to state which already has a
- * handling function installed for it.
- */
- error = msg->state->func(msg->state, msg);
- kdmsg_state_cleanuprx(msg);
- } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
- error = kdmsg_autorxmsg(msg);
- kdmsg_state_cleanuprx(msg);
- } else {
- error = iocom->rcvmsg(msg);
- kdmsg_state_cleanuprx(msg);
- }
+ (void)kdmsg_circ_msgrx(msg);
+ error = kdmsg_msg_receive_handling(msg);
msg = NULL;
}
kprintf("kdmsg: read failed error %d\n", error);
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
- if (msg) {
- if (msg->state && msg->state->msg == msg)
- msg->state->msg = NULL;
+ if (msg)
kdmsg_msg_free(msg);
- }
if ((state = iocom->freerd_state) != NULL) {
iocom->freerd_state = NULL;
kdmsg_msg_t *msg;
kdmsg_state_t *state;
ssize_t res;
+ size_t abytes;
int error = 0;
int retries = 20;
/*
* Dump the message to the pipe or socket.
+ *
+ * We have to clean up the message as if the transmit
+ * succeeded even if it failed.
*/
error = fp_write(iocom->msg_fp, &msg->any,
msg->hdr_size, &res, UIO_SYSSPACE);
if (error || res != msg->hdr_size) {
if (error == 0)
error = EINVAL;
+ kdmsg_state_cleanuptx(msg);
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
break;
}
if (msg->aux_size) {
+ abytes = DMSG_DOALIGN(msg->aux_size);
error = fp_write(iocom->msg_fp,
- msg->aux_data, msg->aux_size,
+ msg->aux_data, abytes,
&res, UIO_SYSSPACE);
- if (error || res != msg->aux_size) {
+ if (error || res != abytes) {
if (error == 0)
error = EINVAL;
+ kdmsg_state_cleanuptx(msg);
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
break;
}
*/
if (error)
kprintf("kdmsg: write failed error %d\n", error);
-
- if (msg) {
- if (msg->state && msg->state->msg == msg)
- msg->state->msg = NULL;
- kdmsg_msg_free(msg);
- }
+ kprintf("thread_wr: Terminating iocom\n");
/*
* Shutdown the socket. This will cause the rx thread to get an
/*
* Simulate received MSGF_DELETE's for any remaining states.
+ * (For remote masters).
+ *
+ * Drain the message queue to handle any device initiated writes
+ * due to state callbacks.
*/
cleanuprd:
+ kdmsg_drain_msgq(iocom);
RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
- if (state->func &&
- (state->rxcmd & DMSGF_DELETE) == 0) {
+ if ((state->rxcmd & DMSGF_DELETE) == 0) {
lockmgr(&iocom->msglk, LK_RELEASE);
- msg = kdmsg_msg_alloc(iocom, state->circuit,
- DMSG_LNK_ERROR,
- NULL, NULL);
- if ((state->rxcmd & DMSGF_CREATE) == 0)
- msg->any.head.cmd |= DMSGF_CREATE;
- msg->any.head.cmd |= DMSGF_DELETE;
- msg->any.head.error = DMSG_ERR_LOSTLINK;
- msg->state = state;
- state->rxcmd = msg->any.head.cmd &
- ~DMSGF_DELETE;
- msg->state->func(state, msg);
- kdmsg_state_cleanuprx(msg);
+ kdmsg_state_abort(state);
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
goto cleanuprd;
}
- if (state->func == NULL) {
- state->flags &= ~KDMSG_STATE_INSERTED;
- RB_REMOVE(kdmsg_state_tree,
- &iocom->staterd_tree, state);
- kdmsg_state_free(state);
- goto cleanuprd;
- }
}
/*
- * NOTE: We have to drain the msgq to handle situations
- * where received states have built up output
- * messages, to avoid creating messages with
- * duplicate CREATE/DELETE flags.
+ * Simulate received MSGF_DELETE's for any remaining states.
+ * (For local masters).
*/
cleanupwr:
kdmsg_drain_msgq(iocom);
RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
- if (state->func &&
- (state->rxcmd & DMSGF_DELETE) == 0) {
+ if ((state->rxcmd & DMSGF_DELETE) == 0) {
lockmgr(&iocom->msglk, LK_RELEASE);
- msg = kdmsg_msg_alloc(iocom, state->circuit,
- DMSG_LNK_ERROR,
- NULL, NULL);
- if ((state->rxcmd & DMSGF_CREATE) == 0)
- msg->any.head.cmd |= DMSGF_CREATE;
- msg->any.head.cmd |= DMSGF_DELETE |
- DMSGF_REPLY;
- msg->any.head.error = DMSG_ERR_LOSTLINK;
- msg->state = state;
- state->rxcmd = msg->any.head.cmd &
- ~DMSGF_DELETE;
- msg->state->func(state, msg);
- kdmsg_state_cleanuprx(msg);
+ kdmsg_state_abort(state);
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
goto cleanupwr;
}
- if (state->func == NULL) {
- state->flags &= ~KDMSG_STATE_INSERTED;
- RB_REMOVE(kdmsg_state_tree,
- &iocom->statewr_tree, state);
- kdmsg_state_free(state);
- goto cleanupwr;
- }
}
- kdmsg_drain_msgq(iocom);
+ /*
+ * Retry until all work is done
+ */
if (--retries == 0)
panic("kdmsg: comm thread shutdown couldn't drain");
- if (RB_ROOT(&iocom->statewr_tree))
- goto cleanupwr;
+ if (TAILQ_FIRST(&iocom->msgq) ||
+ RB_ROOT(&iocom->staterd_tree) ||
+ RB_ROOT(&iocom->statewr_tree)) {
+ goto cleanuprd;
+ }
+ iocom->flags |= KDMSG_IOCOMF_EXITNOACC;
if ((state = iocom->freewr_state) != NULL) {
iocom->freewr_state = NULL;
while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
TAILQ_REMOVE(&iocom->msgq, msg, qentry);
lockmgr(&iocom->msglk, LK_RELEASE);
- if (msg->state && msg->state->msg == msg)
- msg->state->msg = NULL;
if (kdmsg_state_msgtx(msg))
kdmsg_msg_free(msg);
else
}
/*
+ * Do all processing required to handle a freshly received message
+ * after its low level header has been validated.
+ */
+static
+int
+kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
+{
+ kdmsg_iocom_t *iocom = msg->iocom;
+ int error;
+
+ /*
+ * State machine tracking, state assignment for msg,
+ * returns error and discard status. Errors are fatal
+ * to the connection except for EALREADY which forces
+ * a discard without execution.
+ */
+ error = kdmsg_state_msgrx(msg);
+ if (error) {
+ /*
+ * Raw protocol or connection error
+ */
+ kdmsg_msg_free(msg);
+ if (error == EALREADY)
+ error = 0;
+ } else if (msg->state && msg->state->func) {
+ /*
+ * Message related to state which already has a
+ * handling function installed for it.
+ */
+ error = msg->state->func(msg->state, msg);
+ kdmsg_state_cleanuprx(msg);
+ } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
+ error = kdmsg_autorxmsg(msg);
+ kdmsg_state_cleanuprx(msg);
+ } else {
+ error = iocom->rcvmsg(msg);
+ kdmsg_state_cleanuprx(msg);
+ }
+ return error;
+}
+
+/*
+ * Process circuit tracking (NEEDS WORK)
+ *
+ * Called with msglk held and the msg dequeued.
+ */
+static
+int
+kdmsg_circ_msgrx(kdmsg_msg_t *msg)
+{
+ kdmsg_circuit_t dummy;
+ kdmsg_circuit_t *circ;
+ int error = 0;
+
+ if (msg->any.head.circuit) {
+ dummy.msgid = msg->any.head.circuit;
+ lwkt_gettoken(&kdmsg_token);
+ circ = RB_FIND(kdmsg_circuit_tree, &msg->iocom->circ_tree,
+ &dummy);
+ if (circ) {
+ msg->circ = circ;
+ kdmsg_circ_hold(circ);
+ }
+ if (circ == NULL) {
+ kprintf("KDMSG_CIRC_MSGRX CMD %08x: IOCOM %p "
+ "Bad circuit %016jx\n",
+ msg->any.head.cmd,
+ msg->iocom,
+ (intmax_t)msg->any.head.circuit);
+ kprintf("KDMSG_CIRC_MSGRX: Avail circuits: ");
+ RB_FOREACH(circ, kdmsg_circuit_tree,
+ &msg->iocom->circ_tree) {
+ kprintf(" %016jx", (intmax_t)circ->msgid);
+ }
+ kprintf("\n");
+ error = EINVAL;
+ }
+ lwkt_reltoken(&kdmsg_token);
+ }
+ return (error);
+}
+
+/*
* Process state tracking for a message after reception, prior to
* execution.
*
* one-off message is a command or reply. For example, one-off replies
* will typically just contain status updates.
*/
+static
int
kdmsg_state_msgrx(kdmsg_msg_t *msg)
{
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
state->msgid = msg->any.head.msgid;
- state->circuit = msg->any.head.circuit;
+ state->circ = msg->circ;
state->iocom = iocom;
if (msg->any.head.cmd & DMSGF_REPLY)
state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree, state);
state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
state->txcmd = DMSGF_REPLY;
state->msgid = msg->any.head.msgid;
- state->circuit = msg->any.head.circuit;
+ if ((state->circ = msg->circ) != NULL)
+ kdmsg_circ_hold(state->circ);
RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
state->flags |= KDMSG_STATE_INSERTED;
error = 0;
if (msg->any.head.cmd & DMSGF_ABORT) {
error = EALREADY;
} else {
- kprintf("kdmsg_state_msgrx: no state "
- "for DELETE\n");
+ kprintf("kdmsg_state_msgrx: "
+ "no state for DELETE\n");
error = EINVAL;
}
break;
if (msg->any.head.cmd & DMSGF_ABORT) {
error = EALREADY;
} else {
- kprintf("kdmsg_state_msgrx: state reused "
- "for DELETE\n");
+ kprintf("kdmsg_state_msgrx: "
+ "state reused for DELETE\n");
error = EINVAL;
}
break;
kdmsg_autorxmsg(kdmsg_msg_t *msg)
{
kdmsg_iocom_t *iocom = msg->iocom;
+ kdmsg_circuit_t *circ;
int error = 0;
+ uint32_t cmd;
- switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+ /*
+ * Process a combination of the transaction command and the message
+ * flags. For the purposes of this routine, the message command is
+ * only relevant when it initiates a transaction (where it is
+ * recorded in icmd).
+ */
+ cmd = (msg->state ? msg->state->icmd : msg->any.head.cmd) &
+ DMSGF_BASECMDMASK;
+ cmd |= msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY);
+
+ switch(cmd) {
case DMSG_LNK_CONN | DMSGF_CREATE:
+ case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
/*
* Received LNK_CONN transaction. Transmit response and
* leave transaction open, which allows the other end to
* start to the SPAN protocol.
+ *
+ * Handle shim after acknowledging the CONN.
+ */
+ if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+ if (iocom->flags & KDMSG_IOCOMF_AUTOCONN) {
+ kdmsg_msg_result(msg, 0);
+ if (iocom->auto_callback)
+ iocom->auto_callback(msg);
+ } else {
+ error = iocom->rcvmsg(msg);
+ }
+ break;
+ }
+ /* fall through */
+ case DMSG_LNK_CONN | DMSGF_DELETE:
+ /*
+ * This message is usually simulated after a link is lost
+ * to clean up the transaction.
*/
if (iocom->flags & KDMSG_IOCOMF_AUTOCONN) {
- kdmsg_msg_result(msg, 0);
if (iocom->auto_callback)
iocom->auto_callback(msg);
+ kdmsg_msg_reply(msg, 0);
} else {
error = iocom->rcvmsg(msg);
}
* but we must leave the transaction open.
*
* If AUTOCIRC is set automatically initiate a virtual circuit
- * to the span. This will attach a kdmsg_circuit to the
- * SPAN state.
+ * to the received span. This will attach a kdmsg_circuit
+ * to the SPAN state. The circuit is lost when the span is
+ * lost.
+ *
+ * Handle shim after acknowledging the SPAN.
*/
if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
}
/* fall through */
case DMSG_LNK_SPAN | DMSGF_DELETE:
+ /*
+ * Process shims (auto_callback) before cleaning up the
+ * circuit structure and closing the transactions. Device
+ * driver should ensure that the circuit is not used after
+ * the auto_callback() returns.
+ *
+ * Handle shim before closing the SPAN transaction.
+ */
if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
+ if (iocom->auto_callback)
+ iocom->auto_callback(msg);
if (iocom->flags & KDMSG_IOCOMF_AUTOFORGE)
kdmsg_autocirc(msg);
kdmsg_msg_reply(msg, 0);
- if (iocom->auto_callback)
- iocom->auto_callback(msg);
} else {
error = iocom->rcvmsg(msg);
}
* leave the transaction open, allowing the circuit. The
* remote can start issuing commands to us over the circuit
* even before we respond.
- *
- * Theoretically we should track the circuit id but it
- * should not be necessary, kernel devices don't usually
- * care how many circuits are forged to a device, only how
- * many OPENs are active.
*/
if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
- kprintf("kdmsg: CREATE LINK_CIRC\n");
- kdmsg_msg_result(msg, 0);
if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+ circ = kmalloc(sizeof(*circ), iocom->mmsg,
+ M_WAITOK | M_ZERO);
+ msg->state->any.circ = circ;
+ circ->iocom = iocom;
+ circ->rcirc_state = msg->state;
+ kdmsg_circ_hold(circ); /* for rcirc_state */
+ circ->weight = 0;
+ circ->msgid = circ->rcirc_state->msgid;
+ /* XXX no span link for received circuits */
+ kdmsg_circ_hold(circ); /* for circ_state */
+#if 0
+ kprintf("KDMSG VC: RECEIVE CIRC CREATE "
+ "IOCOM %p MSGID %016jx\n",
+ msg->iocom, circ->msgid);
+#endif
+
+ if (RB_INSERT(kdmsg_circuit_tree,
+ &iocom->circ_tree, circ)) {
+ panic("duplicate circuitid allocated");
+ }
+ kdmsg_msg_result(msg, 0);
+
+ /*
+ * Handle shim after adding the circuit and
+ * after acknowledging the CIRC.
+ */
if (iocom->auto_callback)
iocom->auto_callback(msg);
break;
/* fall through */
case DMSG_LNK_CIRC | DMSGF_DELETE:
if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
- kprintf("kdmsg: DELETE LINK_CIRC\n");
+ circ = msg->state->any.circ;
+ if (circ == NULL)
+ break;
+
+ /*
+ * Handle shim before terminating the circuit.
+ */
+#if 0
+ kprintf("KDMSG VC: RECEIVE CIRC DELETE "
+ "IOCOM %p MSGID %016jx\n",
+ msg->iocom, circ->msgid);
+#endif
if (iocom->auto_callback)
iocom->auto_callback(msg);
+
+ KKASSERT(circ->rcirc_state == msg->state);
+ circ->rcirc_state = NULL;
+ msg->state->any.circ = NULL;
+ lwkt_gettoken(&kdmsg_token);
+ RB_REMOVE(kdmsg_circuit_tree, &iocom->circ_tree, circ);
+ lwkt_reltoken(&kdmsg_token);
+ kdmsg_circ_drop(circ); /* for rcirc_state */
kdmsg_msg_reply(msg, 0);
} else {
error = iocom->rcvmsg(msg);
}
/*
- * Handle automatic management of virtual circuits for received SPANs.
+ * Handle automatic forging of virtual circuits based on received SPANs.
+ * (AUTOFORGE). Note that other code handles tracking received circuit
+ * transactions (AUTOCIRC).
*
* We can ignore non-transactions here. Use trans->icmd to test the
* transactional command (once past the CREATE the individual message
* commands are not usually the icmd).
+ *
+ * XXX locks
*/
static
void
/*
* Gaining the SPAN, automatically forge a circuit to the target.
+ *
+ * NOTE!! The shim is not executed until we receive an acknowlegement
+ * to our forged LNK_CIRC (see kdmsg_autocirc_reply()).
*/
if (msg->state->icmd == DMSG_LNK_SPAN &&
(msg->any.head.cmd & DMSGF_CREATE)) {
- kprintf("KDMSG VC: CREATE SPAN->CIRC MSGID %016jx\n",
- (intmax_t)msg->any.head.msgid);
circ = kmalloc(sizeof(*circ), iocom->mmsg, M_WAITOK | M_ZERO);
msg->state->any.circ = circ;
circ->iocom = iocom;
circ->span_state = msg->state;
- xmsg = kdmsg_msg_alloc(iocom, 0,
+ kdmsg_circ_hold(circ); /* for span_state */
+ xmsg = kdmsg_msg_alloc(iocom, NULL,
DMSG_LNK_CIRC | DMSGF_CREATE,
kdmsg_autocirc_reply, circ);
circ->circ_state = xmsg->state;
circ->weight = msg->any.lnk_span.dist;
+ circ->msgid = circ->circ_state->msgid;
+ kdmsg_circ_hold(circ); /* for circ_state */
+#if 0
+ kprintf("KDMSG VC: CREATE SPAN->CIRC IOCOM %p MSGID %016jx\n",
+ msg->iocom, circ->msgid);
+#endif
+
+ if (RB_INSERT(kdmsg_circuit_tree, &iocom->circ_tree, circ))
+ panic("duplicate circuitid allocated");
+
xmsg->any.lnk_circ.target = msg->any.head.msgid;
kdmsg_msg_write(xmsg);
}
if (msg->state->icmd == DMSG_LNK_SPAN &&
(msg->any.head.cmd & DMSGF_DELETE) &&
msg->state->any.circ) {
- kprintf("KDMSG VC: DELETE SPAN->CIRC\n");
circ = msg->state->any.circ;
lwkt_gettoken(&kdmsg_token);
circ->span_state = NULL;
msg->state->any.circ = NULL;
- kdmsg_circ_free_check(circ);
+ RB_REMOVE(kdmsg_circuit_tree, &iocom->circ_tree, circ);
+#if 0
+ kprintf("KDMSG VC: DELETE SPAN->CIRC IOCOM %p MSGID %016jx\n",
+ msg->iocom, (intmax_t)circ->msgid);
+#endif
+ kdmsg_circ_drop(circ); /* for span_state */
lwkt_reltoken(&kdmsg_token);
}
}
kdmsg_circuit_t *circ = state->any.circ;
/*
- * Shim is typically used by the end point to record the circuit
- * on CREATE and erase it on DELETE.
+ * Call shim after receiving an acknowlegement to our forged
+ * circuit and before processing a received termination.
*/
if (iocom->auto_callback)
iocom->auto_callback(msg);
*/
if ((state->txcmd & DMSGF_DELETE) == 0 &&
(msg->any.head.cmd & DMSGF_DELETE)) {
+#if 0
kprintf("KDMSG VC: DELETE CIRC FROM REMOTE\n");
+#endif
lwkt_gettoken(&kdmsg_token);
circ->circ_state = NULL;
state->any.circ = NULL;
- kdmsg_circ_free_check(circ);
+ kdmsg_circ_drop(circ); /* for circ_state */
lwkt_reltoken(&kdmsg_token);
kdmsg_msg_reply(msg, 0);
}
return (0);
}
+/*
+ * Post-receive-handling message and state cleanup. This routine is called
+ * after the state function handling/callback to properly dispose of the
+ * message and update or dispose of the state.
+ */
static
void
-kdmsg_circ_free_check(kdmsg_circuit_t *circ)
-{
- kdmsg_iocom_t *iocom = circ->iocom;
-
- if (circ->span_state == NULL &&
- circ->circ_state == NULL &&
- circ->recorded == 0) {
- circ->iocom = NULL;
- kfree(circ, iocom->mmsg);
- }
-}
-
-void
kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
{
kdmsg_iocom_t *iocom = msg->iocom;
&iocom->staterd_tree, state);
}
state->flags &= ~KDMSG_STATE_INSERTED;
+ if (msg != state->msg)
+ kdmsg_msg_free(msg);
lockmgr(&iocom->msglk, LK_RELEASE);
kdmsg_state_free(state);
} else {
- if (state->msg != msg)
+ if (msg != state->msg)
kdmsg_msg_free(msg);
lockmgr(&iocom->msglk, LK_RELEASE);
}
- } else if (state->msg != msg) {
+ } else if (msg != state->msg) {
kdmsg_msg_free(msg);
}
}
/*
+ * Simulate receiving a message which terminates an activate transaction
+ * state. Our simulated received message must set DELETE and may also
+ * have to set CREATE. It must also ensure that all fields are set such
+ * that the receive handling code can find the state (kdmsg_state_msgrx())
+ * or an endless loop will ensue.
+ *
+ * This is used when the other end of the link or virtual circuit is dead
+ * so the device driver gets a completed transaction for all pending states.
+ */
+static
+void
+kdmsg_state_abort(kdmsg_state_t *state)
+{
+ kdmsg_iocom_t *iocom = state->iocom;
+ kdmsg_msg_t *msg;
+
+ msg = kdmsg_msg_alloc(iocom, state->circ,
+ DMSG_LNK_ERROR,
+ NULL, NULL);
+ if ((state->rxcmd & DMSGF_CREATE) == 0)
+ msg->any.head.cmd |= DMSGF_CREATE;
+ msg->any.head.cmd |= DMSGF_DELETE | (state->rxcmd & DMSGF_REPLY);
+ msg->any.head.error = DMSG_ERR_LOSTLINK;
+ msg->any.head.msgid = state->msgid;
+ msg->state = state;
+ if ((msg->circ = state->circ) != NULL)
+ kdmsg_circ_hold(msg->circ);
+ kdmsg_msg_receive_handling(msg);
+}
+
+/*
* Process state tracking for a message prior to transmission.
*
- * Called with msglk held and the msg dequeued.
+ * Called with msglk held and the msg dequeued. Returns non-zero if
+ * the message is bad and should be deleted by the caller.
*
* One-off messages are usually with dummy state and msg->state may be NULL
* in this situation.
* May request that caller discard the message by setting *discardp to 1.
* A NULL state may be returned in this case.
*/
+static
int
kdmsg_state_msgtx(kdmsg_msg_t *msg)
{
return (error);
}
+static
void
kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
{
&iocom->statewr_tree, state);
}
state->flags &= ~KDMSG_STATE_INSERTED;
- msg->state = NULL;
+ if (msg != state->msg)
+ kdmsg_msg_free(msg);
lockmgr(&iocom->msglk, LK_RELEASE);
kdmsg_state_free(state);
} else {
- if (state->msg != msg)
+ if (msg != state->msg)
kdmsg_msg_free(msg);
lockmgr(&iocom->msglk, LK_RELEASE);
}
- } else if (state->msg != msg) {
+ } else if (msg != state->msg) {
kdmsg_msg_free(msg);
}
}
+static
void
kdmsg_state_free(kdmsg_state_t *state)
{
msg = state->msg;
state->msg = NULL;
kfree(state, iocom->mmsg);
- if (msg)
+ if (msg) {
+ msg->state = NULL;
kdmsg_msg_free(msg);
+ }
}
kdmsg_msg_t *
-kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
+kdmsg_msg_alloc(kdmsg_iocom_t *iocom, kdmsg_circuit_t *circ, uint32_t cmd,
int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
{
kdmsg_msg_t *msg;
msg->hdr_size = hbytes;
msg->iocom = iocom;
msg->any.head.magic = DMSG_HDR_MAGIC;
- msg->any.head.circuit = circuit;
msg->any.head.cmd = cmd;
+ if (circ) {
+ kdmsg_circ_hold(circ);
+ msg->circ = circ;
+ msg->any.head.circuit = circ->msgid;
+ }
if (cmd & DMSGF_CREATE) {
/*
state->any.any = data;
state->msg = msg;
state->msgid = (uint64_t)(uintptr_t)state;
- state->circuit = circuit;
+ state->circ = circ;
state->iocom = iocom;
msg->state = state;
+ if (circ)
+ kdmsg_circ_hold(circ);
/*msg->any.head.msgid = state->msgid;XXX*/
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
msg->any.head.msgid = state->msgid;
lockmgr(&iocom->msglk, LK_RELEASE);
}
-
return (msg);
}
+kdmsg_msg_t *
+kdmsg_msg_alloc_state(kdmsg_state_t *state, uint32_t cmd,
+ int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
+{
+ kdmsg_iocom_t *iocom = state->iocom;
+ kdmsg_msg_t *msg;
+ size_t hbytes;
+
+ KKASSERT(iocom != NULL);
+ hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
+ msg = kmalloc(offsetof(struct kdmsg_msg, any) + hbytes,
+ iocom->mmsg, M_WAITOK | M_ZERO);
+ msg->hdr_size = hbytes;
+ msg->iocom = iocom;
+ msg->any.head.magic = DMSG_HDR_MAGIC;
+ msg->any.head.cmd = cmd;
+ msg->state = state;
+ if (state->circ) {
+ kdmsg_circ_hold(state->circ);
+ msg->circ = state->circ;
+ msg->any.head.circuit = state->circ->msgid;
+ }
+ return(msg);
+}
+
void
kdmsg_msg_free(kdmsg_msg_t *msg)
{
kfree(msg->aux_data, iocom->mmsg);
msg->flags &= ~KDMSG_FLAG_AUXALLOC;
}
+ if (msg->circ) {
+ kdmsg_circ_drop(msg->circ);
+ msg->circ = NULL;
+ }
+ if (msg->state) {
+ if (msg->state->msg == msg)
+ msg->state->msg = NULL;
+ msg->state = NULL;
+ }
msg->aux_data = NULL;
msg->aux_size = 0;
msg->iocom = NULL;
- msg->any.head.circuit = -1;
kfree(msg, iocom->mmsg);
}
/*
+ * Circuits are tracked in a red-black tree by their circuit id (msgid).
+ */
+int
+kdmsg_circuit_cmp(kdmsg_circuit_t *circ1, kdmsg_circuit_t *circ2)
+{
+ if (circ1->msgid < circ2->msgid)
+ return(-1);
+ if (circ1->msgid > circ2->msgid)
+ return(1);
+ return (0);
+}
+
+/*
* Indexed messages are stored in a red-black tree indexed by their
* msgid. Only persistent messages are indexed.
*/
return(-1);
if (state1->iocom > state2->iocom)
return(1);
- if (state1->circuit < state2->circuit)
+ if (state1->circ < state2->circ)
return(-1);
- if (state1->circuit > state2->circuit)
+ if (state1->circ > state2->circ)
return(1);
if (state1->msgid < state2->msgid)
return(-1);
* between a possibly lost in-transaction message due to
* competing aborts and a real one-off message?)
*/
+ state = NULL;
msg->any.head.msgid = 0;
lockmgr(&iocom->msglk, LK_EXCLUSIVE);
}
/*
- * Finish up the msg fields
+ * With AUTOCIRC and AUTOFORGE it is possible for the circuit to
+ * get ripped out in the rxthread while some other thread is
+ * holding a ref on it inbetween allocating and sending a dmsg.
+ */
+ if (msg->circ && msg->circ->rcirc_state == NULL &&
+ (msg->circ->span_state == NULL || msg->circ->circ_state == NULL)) {
+ kprintf("kdmsg_msg_write: Attempt to write message to "
+ "terminated circuit: msg %08x\n", msg->any.head.cmd);
+ lockmgr(&iocom->msglk, LK_RELEASE);
+ if (kdmsg_state_msgtx(msg)) {
+ if (state == NULL || msg != state->msg)
+ kdmsg_msg_free(msg);
+ } else if ((msg->state->rxcmd & DMSGF_DELETE) == 0) {
+ /* XXX SMP races simulating a response here */
+ kdmsg_state_t *state = msg->state;
+ kdmsg_state_cleanuptx(msg);
+ kdmsg_state_abort(state);
+ } else {
+ kdmsg_state_cleanuptx(msg);
+ }
+ return;
+ }
+
+ /*
+ * This flag is not set until after the tx thread has drained
+ * the txmsgq and simulated responses. After that point the
+ * txthread is dead and can no longer simulate responses.
+ *
+ * Device drivers should never try to send a message once this
+ * flag is set. They should have detected (through the state
+ * closures) that the link is in trouble.
+ */
+ if (iocom->flags & KDMSG_IOCOMF_EXITNOACC) {
+ lockmgr(&iocom->msglk, LK_RELEASE);
+ panic("kdmsg_msg_write: Attempt to write message to "
+ "terminated iocom\n");
+ }
+
+ /*
+ * Finish up the msg fields. Note that msg->aux_size and the
+ * aux_bytes stored in the message header represent the unaligned
+ * (actual) bytes of data, but the buffer is sized to an aligned
+ * size and the CRC is generated over the aligned length.
*/
msg->any.head.salt = /* (random << 8) | */ (iocom->msg_seq & 255);
++iocom->msg_seq;
+ if (msg->aux_data && msg->aux_size) {
+ uint32_t abytes = DMSG_DOALIGN(msg->aux_size);
+
+ msg->any.head.aux_bytes = msg->aux_size;
+ msg->any.head.aux_crc = iscsi_crc32(msg->aux_data, abytes);
+ }
msg->any.head.hdr_crc = 0;
msg->any.head.hdr_crc = iscsi_crc32(msg->any.buf, msg->hdr_size);
}
/* XXX messy mask cmd to avoid allocating state */
- nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
- cmd & DMSGF_BASECMDMASK,
- NULL, NULL);
- nmsg->any.head.cmd = cmd;
+ nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
nmsg->any.head.error = error;
- nmsg->state = state;
kdmsg_msg_write(nmsg);
}
}
/* XXX messy mask cmd to avoid allocating state */
- nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
- cmd & DMSGF_BASECMDMASK,
- NULL, NULL);
- nmsg->any.head.cmd = cmd;
+ nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
nmsg->any.head.error = error;
- nmsg->state = state;
kdmsg_msg_write(nmsg);
}
}
/* XXX messy mask cmd to avoid allocating state */
- nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
- cmd & DMSGF_BASECMDMASK,
- NULL, NULL);
- nmsg->any.head.cmd = cmd;
+ nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
nmsg->any.head.error = error;
- nmsg->state = state;
kdmsg_msg_write(nmsg);
}
}
/* XXX messy mask cmd to avoid allocating state */
- nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
- cmd & DMSGF_BASECMDMASK,
- NULL, NULL);
- nmsg->any.head.cmd = cmd;
+ nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
nmsg->any.head.error = error;
- nmsg->state = state;
kdmsg_msg_write(nmsg);
}
#include <sys/thread.h>
#include <sys/queue.h>
#include <sys/lock.h>
+#include <sys/stat.h>
#include <sys/uuid.h>
#include <sys/dmsg.h>
#include <sys/msgport2.h>
#include <sys/thread2.h>
+struct dios_open {
+ int openrd;
+ int openwr;
+};
+
+struct dios_io {
+ int count;
+ int eof;
+};
+
static MALLOC_DEFINE(M_DMSG_DISK, "dmsg_disk", "disk dmsg");
static int disk_iocom_reconnect(struct disk *dp, struct file *fp);
static int disk_rcvdmsg(kdmsg_msg_t *msg);
+static void disk_blk_open(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_read(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_write(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_flush(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_freeblks(struct disk *dp, kdmsg_msg_t *msg);
+static void diskiodone(struct bio *bio);
+
void
disk_iocom_init(struct disk *dp)
{
{
struct disk *dp = msg->iocom->handle;
- switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+ /*
+ * Handle debug messages (these might not be in transactions)
+ */
+ switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
case DMSG_DBG_SHELL:
/*
* Execute shell command (not supported atm)
*/
kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
- break;
+ return(0);
case DMSG_DBG_SHELL | DMSGF_REPLY:
if (msg->aux_data) {
msg->aux_data[msg->aux_size - 1] = 0;
kprintf("diskiocom: DEBUGMSG: %s\n", msg->aux_data);
}
+ return(0);
+ }
+
+ /*
+ * All remaining messages must be in a transaction
+ *
+ * NOTE! We are switching on the first message's command. The
+ * actual message command within the transaction may be
+ * different (if streaming within a transaction).
+ */
+ if (msg->state == NULL) {
+ kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+ return(0);
+ }
+
+ switch(msg->state->rxcmd & DMSGF_CMDSWMASK) {
+ case DMSG_BLK_OPEN:
+ case DMSG_BLK_CLOSE:
+ disk_blk_open(dp, msg);
+ break;
+ case DMSG_BLK_READ:
+ disk_blk_read(dp, msg);
+ break;
+ case DMSG_BLK_WRITE:
+ disk_blk_write(dp, msg);
+ break;
+ case DMSG_BLK_FLUSH:
+ disk_blk_flush(dp, msg);
+ break;
+ case DMSG_BLK_FREEBLKS:
+ disk_blk_freeblks(dp, msg);
break;
- case DMSG_BLK_OPEN | DMSGF_CREATE:
- case DMSG_BLK_READ | DMSGF_CREATE:
- case DMSG_BLK_WRITE | DMSGF_CREATE:
- case DMSG_BLK_FLUSH | DMSGF_CREATE:
- case DMSG_BLK_FREEBLKS | DMSGF_CREATE:
default:
- kprintf("diskiocom: DISK ADHOC INPUT %s%d cmd %08x\n",
- dev_dname(dp->d_rawdev), dkunit(dp->d_rawdev),
- msg->any.head.cmd);
- if (msg->any.head.cmd & DMSGF_CREATE)
- kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+ if ((msg->any.head.cmd & DMSGF_REPLY) == 0) {
+ if (msg->any.head.cmd & DMSGF_DELETE)
+ kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+ else
+ kdmsg_msg_result(msg, DMSG_ERR_NOSUPP);
+ }
break;
}
return (0);
}
+
+static
+void
+disk_blk_open(struct disk *dp, kdmsg_msg_t *msg)
+{
+ struct dios_open *openst;
+ int error = DMSG_ERR_NOSUPP;
+ int fflags;
+
+ openst = msg->state->any.any;
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_OPEN) {
+ if (openst == NULL) {
+ openst = kmalloc(sizeof(*openst), M_DEVBUF,
+ M_WAITOK | M_ZERO);
+ msg->state->any.any = openst;
+ }
+ fflags = 0;
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+ fflags = FREAD;
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+ fflags |= FWRITE;
+ error = dev_dopen(dp->d_rawdev, fflags, S_IFCHR, proc0.p_ucred);
+ if (error) {
+ error = DMSG_ERR_IO;
+ } else {
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+ ++openst->openrd;
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+ ++openst->openwr;
+ }
+ }
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_CLOSE &&
+ openst) {
+ fflags = 0;
+ if ((msg->any.blk_open.modes & DMSG_BLKOPEN_RD) &&
+ openst->openrd) {
+ fflags = FREAD;
+ }
+ if ((msg->any.blk_open.modes & DMSG_BLKOPEN_WR) &&
+ openst->openwr) {
+ fflags |= FWRITE;
+ }
+ error = dev_dclose(dp->d_rawdev, fflags, S_IFCHR);
+ if (error) {
+ error = DMSG_ERR_IO;
+ } else {
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+ --openst->openrd;
+ if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+ --openst->openwr;
+ }
+ }
+ if (msg->any.head.cmd & DMSGF_DELETE) {
+ if (openst) {
+ while (openst->openrd && openst->openwr) {
+ --openst->openrd;
+ --openst->openwr;
+ dev_dclose(dp->d_rawdev, FREAD|FWRITE, S_IFCHR);
+ }
+ while (openst->openrd) {
+ --openst->openrd;
+ dev_dclose(dp->d_rawdev, FREAD, S_IFCHR);
+ }
+ while (openst->openwr) {
+ --openst->openwr;
+ dev_dclose(dp->d_rawdev, FWRITE, S_IFCHR);
+ }
+ kfree(openst, M_DEVBUF);
+ msg->state->any.any = NULL;
+ }
+ kdmsg_msg_reply(msg, error);
+ } else {
+ kdmsg_msg_result(msg, error);
+ }
+}
+
+static
+void
+disk_blk_read(struct disk *dp, kdmsg_msg_t *msg)
+{
+ struct dios_io *iost;
+ struct buf *bp;
+ struct bio *bio;
+ int error = DMSG_ERR_NOSUPP;
+ int reterr = 1;
+
+ /*
+ * Only DMSG_BLK_READ commands imply read ops.
+ */
+ iost = msg->state->any.any;
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_READ) {
+ if (msg->any.blk_read.bytes < DEV_BSIZE ||
+ msg->any.blk_read.bytes > MAXPHYS) {
+ error = DMSG_ERR_PARAM;
+ goto done;
+ }
+ if (iost == NULL) {
+ iost = kmalloc(sizeof(*iost), M_DEVBUF,
+ M_WAITOK | M_ZERO);
+ msg->state->any.any = iost;
+ }
+ reterr = 0;
+ bp = geteblk(msg->any.blk_read.bytes);
+ bio = &bp->b_bio1;
+ bp->b_cmd = BUF_CMD_READ;
+ bp->b_bcount = msg->any.blk_read.bytes;
+ bp->b_resid = bp->b_bcount;
+ bio->bio_offset = msg->any.blk_read.offset;
+ bio->bio_caller_info1.ptr = msg->state;
+ bio->bio_done = diskiodone;
+ /* kdmsg_state_hold(msg->state); */
+
+ atomic_add_int(&iost->count, 1);
+ if (msg->any.head.cmd & DMSGF_DELETE)
+ iost->eof = 1;
+ BUF_KERNPROC(bp);
+ dev_dstrategy(dp->d_rawdev, bio);
+ }
+done:
+ if (reterr) {
+ if (msg->any.head.cmd & DMSGF_DELETE) {
+ if (iost && iost->count == 0) {
+ kfree(iost, M_DEVBUF);
+ msg->state->any.any = NULL;
+ }
+ kdmsg_msg_reply(msg, error);
+ } else {
+ kdmsg_msg_result(msg, error);
+ }
+ }
+}
+
+static
+void
+disk_blk_write(struct disk *dp, kdmsg_msg_t *msg)
+{
+ struct dios_io *iost;
+ struct buf *bp;
+ struct bio *bio;
+ int error = DMSG_ERR_NOSUPP;
+ int reterr = 1;
+
+ /*
+ * Only DMSG_BLK_WRITE commands imply read ops.
+ */
+ iost = msg->state->any.any;
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_WRITE) {
+ if (msg->any.blk_write.bytes < DEV_BSIZE ||
+ msg->any.blk_write.bytes > MAXPHYS) {
+ error = DMSG_ERR_PARAM;
+ goto done;
+ }
+ if (iost == NULL) {
+ iost = kmalloc(sizeof(*iost), M_DEVBUF,
+ M_WAITOK | M_ZERO);
+ msg->state->any.any = iost;
+ }
+
+ /*
+ * Issue WRITE. Short data implies zeros. Try to optimize
+ * the buffer cache buffer for the case where we can just
+ * use the message's data pointer.
+ */
+ reterr = 0;
+ if (msg->aux_size >= msg->any.blk_write.bytes)
+ bp = getpbuf(NULL);
+ else
+ bp = geteblk(msg->any.blk_write.bytes);
+ bio = &bp->b_bio1;
+ bp->b_cmd = BUF_CMD_WRITE;
+ bp->b_bcount = msg->any.blk_write.bytes;
+ bp->b_resid = bp->b_bcount;
+ if (msg->aux_size >= msg->any.blk_write.bytes) {
+ bp->b_data = msg->aux_data;
+ } else {
+ bcopy(msg->aux_data, bp->b_data, msg->aux_size);
+ bzero(bp->b_data + msg->aux_size,
+ msg->any.blk_write.bytes - msg->aux_size);
+ }
+ bio->bio_offset = msg->any.blk_write.offset;
+ bio->bio_caller_info1.ptr = msg->state;
+ bio->bio_done = diskiodone;
+ /* kdmsg_state_hold(msg->state); */
+
+ atomic_add_int(&iost->count, 1);
+ if (msg->any.head.cmd & DMSGF_DELETE)
+ iost->eof = 1;
+ BUF_KERNPROC(bp);
+ dev_dstrategy(dp->d_rawdev, bio);
+ }
+done:
+ if (reterr) {
+ if (msg->any.head.cmd & DMSGF_DELETE) {
+ if (iost && iost->count == 0) {
+ kfree(iost, M_DEVBUF);
+ msg->state->any.any = NULL;
+ }
+ kdmsg_msg_reply(msg, error);
+ } else {
+ kdmsg_msg_result(msg, error);
+ }
+ }
+}
+
+static
+void
+disk_blk_flush(struct disk *dp, kdmsg_msg_t *msg)
+{
+ struct dios_io *iost;
+ struct buf *bp;
+ struct bio *bio;
+ int error = DMSG_ERR_NOSUPP;
+ int reterr = 1;
+
+ /*
+ * Only DMSG_BLK_FLUSH commands imply read ops.
+ */
+ iost = msg->state->any.any;
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_FLUSH) {
+ if (iost == NULL) {
+ iost = kmalloc(sizeof(*iost), M_DEVBUF,
+ M_WAITOK | M_ZERO);
+ msg->state->any.any = iost;
+ }
+ reterr = 0;
+ bp = getpbuf(NULL);
+ bio = &bp->b_bio1;
+ bp->b_cmd = BUF_CMD_FLUSH;
+ bp->b_bcount = msg->any.blk_flush.bytes;
+ bp->b_resid = 0;
+ bio->bio_offset = msg->any.blk_flush.offset;
+ bio->bio_caller_info1.ptr = msg->state;
+ bio->bio_done = diskiodone;
+ /* kdmsg_state_hold(msg->state); */
+
+ atomic_add_int(&iost->count, 1);
+ if (msg->any.head.cmd & DMSGF_DELETE)
+ iost->eof = 1;
+ BUF_KERNPROC(bp);
+ dev_dstrategy(dp->d_rawdev, bio);
+ }
+ if (reterr) {
+ if (msg->any.head.cmd & DMSGF_DELETE) {
+ if (iost && iost->count == 0) {
+ kfree(iost, M_DEVBUF);
+ msg->state->any.any = NULL;
+ }
+ kdmsg_msg_reply(msg, error);
+ } else {
+ kdmsg_msg_result(msg, error);
+ }
+ }
+}
+
+static
+void
+disk_blk_freeblks(struct disk *dp, kdmsg_msg_t *msg)
+{
+ struct dios_io *iost;
+ struct buf *bp;
+ struct bio *bio;
+ int error = DMSG_ERR_NOSUPP;
+ int reterr = 1;
+
+ /*
+ * Only DMSG_BLK_FREEBLKS commands imply read ops.
+ */
+ iost = msg->state->any.any;
+ if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_FREEBLKS) {
+ if (iost == NULL) {
+ iost = kmalloc(sizeof(*iost), M_DEVBUF,
+ M_WAITOK | M_ZERO);
+ msg->state->any.any = iost;
+ }
+ reterr = 0;
+ bp = getpbuf(NULL);
+ bio = &bp->b_bio1;
+ bp->b_cmd = BUF_CMD_FREEBLKS;
+ bp->b_bcount = msg->any.blk_freeblks.bytes;
+ bp->b_resid = 0;
+ bio->bio_offset = msg->any.blk_freeblks.offset;
+ bio->bio_caller_info1.ptr = msg->state;
+ bio->bio_done = diskiodone;
+ /* kdmsg_state_hold(msg->state); */
+
+ atomic_add_int(&iost->count, 1);
+ if (msg->any.head.cmd & DMSGF_DELETE)
+ iost->eof = 1;
+ BUF_KERNPROC(bp);
+ dev_dstrategy(dp->d_rawdev, bio);
+ }
+ if (reterr) {
+ if (msg->any.head.cmd & DMSGF_DELETE) {
+ if (iost && iost->count == 0) {
+ kfree(iost, M_DEVBUF);
+ msg->state->any.any = NULL;
+ }
+ kdmsg_msg_reply(msg, error);
+ } else {
+ kdmsg_msg_result(msg, error);
+ }
+ }
+}
+
+static
+void
+diskiodone(struct bio *bio)
+{
+ struct buf *bp = bio->bio_buf;
+ kdmsg_state_t *state = bio->bio_caller_info1.ptr;
+ kdmsg_msg_t *rmsg;
+ struct dios_io *iost = state->any.any;
+ int error;
+ int resid = 0;
+ int bytes;
+ uint32_t cmd;
+ void *data;
+
+ cmd = DMSG_LNK_ERROR;
+ data = NULL;
+ bytes = 0;
+
+ switch(bp->b_cmd) {
+ case BUF_CMD_READ:
+ cmd = DMSG_LNK_ERROR;
+ data = bp->b_data;
+ bytes = bp->b_bcount;
+ /* fall through */
+ case BUF_CMD_WRITE:
+ if (bp->b_flags & B_ERROR) {
+ error = bp->b_error;
+ } else {
+ error = 0;
+ resid = bp->b_resid;
+ }
+ break;
+ case BUF_CMD_FLUSH:
+ case BUF_CMD_FREEBLKS:
+ if (bp->b_flags & B_ERROR)
+ error = bp->b_error;
+ else
+ error = 0;
+ break;
+ default:
+ panic("diskiodone: Unknown bio cmd = %d\n",
+ bio->bio_buf->b_cmd);
+ break; /* NOT REACHED */
+ }
+
+ /*
+ * Convert LNK_ERROR or BLK_ERROR if non-zero resid. READS will
+ * have already converted cmd to BLK_ERROR and set up data to return.
+ */
+ if (resid && cmd == DMSG_LNK_ERROR)
+ cmd = DMSG_BLK_ERROR;
+ /* XXX txcmd is delayed so this won't work for streaming */
+ if ((state->txcmd & DMSGF_CREATE) == 0) /* assume serialized */
+ cmd |= DMSGF_CREATE;
+ if (iost->eof) {
+ if (atomic_fetchadd_int(&iost->count, -1) == 1)
+ cmd |= DMSGF_DELETE;
+ } else {
+ atomic_add_int(&iost->count, -1);
+ }
+ cmd |= DMSGF_REPLY;
+
+ /*
+ * Allocate a basic or extended reply. Be careful not to populate
+ * extended header fields unless we allocated an extended reply.
+ */
+ rmsg = kdmsg_msg_alloc_state(state, cmd, NULL, 0);
+ if (data) {
+ rmsg->aux_data = kmalloc(bytes, state->iocom->mmsg, M_INTWAIT);
+ rmsg->aux_size = bytes;
+ rmsg->flags |= KDMSG_FLAG_AUXALLOC;
+ bcopy(data, rmsg->aux_data, bytes);
+ }
+ rmsg->any.blk_error.head.error = error;
+ if ((cmd & DMSGF_BASECMDMASK) == DMSG_BLK_ERROR)
+ rmsg->any.blk_error.resid = resid;
+ bio->bio_caller_info1.ptr = NULL;
+ /* kdmsg_state_drop(state); */
+ kdmsg_msg_write(rmsg);
+ if (bp->b_flags & B_PAGING) {
+ relpbuf(bio->bio_buf, NULL);
+ } else {
+ bp->b_flags |= B_INVAL | B_AGE;
+ brelse(bp);
+ }
+}