hammer2 - more userland msg API work
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 7 Aug 2012 23:15:48 +0000 (16:15 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 7 Aug 2012 23:15:48 +0000 (16:15 -0700)
* Simplify the messaging API further.

* Add a state function callback for state machine transitions

* Open a LNK_CONN transaction on connect, allowing daemon<->daemon spanning
  tree operation (untested).

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/network.h

index 6dda52b..da2a1db 100644 (file)
@@ -37,9 +37,8 @@
 
 #define SHOW_TAB       2
 
-static void shell_recv(hammer2_iocom_t *iocom);
-static void shell_send(hammer2_iocom_t *iocom);
-static void shell_tty(hammer2_iocom_t *iocom);
+static void shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void shell_ttymsg(hammer2_iocom_t *iocom);
 static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /************************************************************************
@@ -94,13 +93,12 @@ cmd_shell(const char *hostname)
        /*
         * Run the session.  The remote end transmits our prompt.
         */
-       hammer2_iocom_init(&iocom, fd, 0);
+       hammer2_iocom_init(&iocom, fd, 0, NULL, shell_rcvmsg, shell_ttymsg);
        printf("debug: connected\n");
 
        msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
        hammer2_msg_write(&iocom, msg, NULL, NULL, NULL);
-
-       hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
+       hammer2_iocom_core(&iocom);
        fprintf(stderr, "debug: disconnected\n");
        close(fd);
        return 0;
@@ -112,68 +110,46 @@ cmd_shell(const char *hostname)
  */
 static
 void
-shell_recv(hammer2_iocom_t *iocom)
+shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       hammer2_msg_t *msg;
-
-       while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
-              (msg = hammer2_ioq_read(iocom)) != NULL) {
-
-               switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
-               case HAMMER2_LNK_ERROR:
-               case HAMMER2_LNK_ERROR | HAMMER2_MSGF_REPLY:
-                       if (msg->any.head.error) {
-                               fprintf(stderr,
-                                       "Link Error: %d\n",
-                                       msg->any.head.error);
-                       } else {
-                               write(1, "debug> ", 7);
-                       }
-                       break;
-               case HAMMER2_DBG_SHELL:
-                       /*
-                        * We send the commands, not accept them.
-                        */
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
-                       break;
-               case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
-                       /*
-                        * A reply from the remote is data we copy to stdout.
-                        */
-                       if (msg->aux_size) {
-                               msg->aux_data[msg->aux_size - 1] = 0;
-                               write(1, msg->aux_data, strlen(msg->aux_data));
-                       }
-                       break;
-               default:
-                       fprintf(stderr, "Unknown message: %08x\n",
-                               msg->any.head.cmd);
-                       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
-                       break;
+       switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+       case HAMMER2_LNK_ERROR:
+       case HAMMER2_LNK_ERROR | HAMMER2_MSGF_REPLY:
+               if (msg->any.head.error) {
+                       fprintf(stderr,
+                               "Link Error: %d\n",
+                               msg->any.head.error);
+               } else {
+                       write(1, "debug> ", 7);
                }
-               hammer2_state_cleanuprx(iocom, msg);
-       }
-       if (iocom->ioq_rx.error) {
-               fprintf(stderr, "node_master_recv: comm error %d\n",
-                       iocom->ioq_rx.error);
+               break;
+       case HAMMER2_DBG_SHELL:
+               /*
+                * We send the commands, not accept them.
+                */
+               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               break;
+       case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
+               /*
+                * A reply from the remote is data we copy to stdout.
+                */
+               if (msg->aux_size) {
+                       msg->aux_data[msg->aux_size - 1] = 0;
+                       write(1, msg->aux_data, strlen(msg->aux_data));
+               }
+               break;
+       default:
+               fprintf(stderr, "Unknown message: %08x\n",
+                       msg->any.head.cmd);
+               assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               break;
        }
 }
 
-/*
- * Callback from hammer2_iocom_core() when messages might be transmittable
- * to the socket.
- */
-static
-void
-shell_send(hammer2_iocom_t *iocom)
-{
-       hammer2_iocom_flush1(iocom);
-}
-
 static
 void
-shell_tty(hammer2_iocom_t *iocom)
+shell_ttymsg(hammer2_iocom_t *iocom)
 {
        hammer2_msg_t *msg;
        char buf[256];
index 848a1e6..76d1153 100644 (file)
 
 static void *master_accept(void *data);
 static void *master_service(void *data);
-static void master_auth_rx(hammer2_iocom_t *iocom);
-static void master_auth_tx(hammer2_iocom_t *iocom);
-static void master_link_rx(hammer2_iocom_t *iocom);
-static void master_link_tx(hammer2_iocom_t *iocom);
+static void master_auth_state(hammer2_iocom_t *iocom);
+static void master_auth_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void master_link_state(hammer2_iocom_t *iocom);
+static void master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /*
  * Start-up the master listener daemon for the machine.
@@ -163,8 +163,9 @@ master_service(void *data)
        int fd;
 
        fd = (int)(intptr_t)data;
-       hammer2_iocom_init(&iocom, fd, -1);
-       hammer2_iocom_core(&iocom, master_auth_rx, master_auth_tx, NULL);
+       hammer2_iocom_init(&iocom, fd, -1,
+                          master_auth_state, master_auth_rxmsg, NULL);
+       hammer2_iocom_core(&iocom);
 
        fprintf(stderr,
                "iocom on fd %d terminated error rx=%d, tx=%d\n",
@@ -184,22 +185,41 @@ master_service(void *data)
  * message operation.  The connection has already been encrypted at
  * this point.
  */
+static void master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg);
+
+static
+void
+master_auth_state(hammer2_iocom_t *iocom __unused)
+{
+       hammer2_msg_t *msg;
+
+       /*
+        * Transmit LNK_CONN, enabling the SPAN protocol if both sides
+        * agree.
+        *
+        * XXX put additional authentication states here
+        */
+       msg = hammer2_msg_alloc(iocom, 0, HAMMER2_LNK_CONN |
+                                         HAMMER2_MSGF_CREATE);
+       snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*");
+       hammer2_msg_write(iocom, msg, master_auth_conn_rx, NULL, NULL);
+
+       hammer2_iocom_restate(iocom,
+                             master_link_state, master_link_rxmsg, NULL);
+}
+
 static
 void
-master_auth_rx(hammer2_iocom_t *iocom __unused)
+master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg)
 {
-       printf("AUTHRX\n");
-       iocom->recvmsg_callback = master_link_rx;
-       iocom->sendmsg_callback = master_link_tx;
+       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
+               hammer2_msg_reply(state->iocom, msg, 0);
 }
 
 static
 void
-master_auth_tx(hammer2_iocom_t *iocom __unused)
+master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused)
 {
-       printf("AUTHTX\n");
-       iocom->recvmsg_callback = master_link_rx;
-       iocom->sendmsg_callback = master_link_tx;
 }
 
 /************************************************************************
@@ -210,67 +230,53 @@ master_auth_tx(hammer2_iocom_t *iocom __unused)
  */
 static
 void
-master_link_rx(hammer2_iocom_t *iocom)
+master_link_state(hammer2_iocom_t *iocom __unused)
 {
-       hammer2_msg_t *msg;
-       hammer2_state_t *state;
-       uint32_t cmd;
-
-       while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
-              (msg = hammer2_ioq_read(iocom)) != NULL) {
-               /*
-                * If the message state has a function established we just
-                * call the function, otherwise we call the appropriate
-                * link-level protocol related to the original command and
-                * let it sort it out.
-                *
-                * Non-transactional one-off messages, on the otherhand,
-                * might have REPLY set.
-                */
-               state = msg->state;
-               if (state) {
-                       cmd = state->msg->any.head.cmd;
-                       fprintf(stderr,
-                               "MSGRX persist=%08x cmd=%08x error %d\n",
-                               cmd, msg->any.head.cmd, msg->any.head.error);
-               } else {
-                       cmd = msg->any.head.cmd;
-                       fprintf(stderr,
-                               "MSGRX persist=-------- cmd=%08x error %d\n",
-                               cmd, msg->any.head.error);
-               }
-               if (state && state->func) {
-                       state->func(state, msg);
-               } else {
-                       switch(cmd & HAMMER2_MSGF_PROTOS) {
-                       case HAMMER2_MSG_PROTO_LNK:
-                               hammer2_msg_lnk(iocom, msg);
-                               break;
-                       case HAMMER2_MSG_PROTO_DBG:
-                               hammer2_msg_dbg(iocom, msg);
-                               break;
-                       default:
-                               hammer2_msg_reply(iocom, msg,
-                                                 HAMMER2_MSG_ERR_UNKNOWN);
-                               break;
-                       }
-               }
-               hammer2_state_cleanuprx(iocom, msg);
-       }
-       if (iocom->ioq_rx.error) {
-               fprintf(stderr,
-                       "master_recv: comm error %d\n",
-                       iocom->ioq_rx.error);
-       }
 }
 
-/*
- * Callback from hammer2_iocom_core() when messages might be transmittable
- * to the socket.
- */
 static
 void
-master_link_tx(hammer2_iocom_t *iocom)
+master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       hammer2_iocom_flush1(iocom);
+       hammer2_state_t *state;
+       uint32_t cmd;
+
+       /*
+        * If the message state has a function established we just
+        * call the function, otherwise we call the appropriate
+        * link-level protocol related to the original command and
+        * let it sort it out.
+        *
+        * Non-transactional one-off messages, on the otherhand,
+        * might have REPLY set.
+        */
+       state = msg->state;
+       if (state) {
+               cmd = state->msg->any.head.cmd;
+               fprintf(stderr,
+                       "MSGRX persist=%08x cmd=%08x error %d\n",
+                       cmd, msg->any.head.cmd, msg->any.head.error);
+       } else {
+               cmd = msg->any.head.cmd;
+               fprintf(stderr,
+                       "MSGRX persist=-------- cmd=%08x error %d\n",
+                       cmd, msg->any.head.error);
+       }
+       if (state && state->func) {
+               assert(state->func != NULL);
+               state->func(state, msg);
+       } else {
+               switch(cmd & HAMMER2_MSGF_PROTOS) {
+               case HAMMER2_MSG_PROTO_LNK:
+                       hammer2_msg_lnk(iocom, msg);
+                       break;
+               case HAMMER2_MSG_PROTO_DBG:
+                       hammer2_msg_dbg(iocom, msg);
+                       break;
+               default:
+                       hammer2_msg_reply(iocom, msg,
+                                         HAMMER2_MSG_ERR_UNKNOWN);
+                       break;
+               }
+       }
 }
index 62a8833..bdf72a4 100644 (file)
@@ -123,7 +123,14 @@ int cmd_rsadec(const char **keys, int nkeys);
 void hammer2_bswap_head(hammer2_msg_hdr_t *head);
 void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
-void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd);
+void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
+                       void (*state_func)(hammer2_iocom_t *),
+                       void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+                       void (*altmsg_func)(hammer2_iocom_t *));
+void hammer2_iocom_restate(hammer2_iocom_t *iocom,
+                       void (*state_func)(hammer2_iocom_t *),
+                       void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+                       void (*altmsg_func)(hammer2_iocom_t *));
 void hammer2_iocom_done(hammer2_iocom_t *iocom);
 hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
                        uint32_t cmd);
@@ -135,10 +142,7 @@ void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
 
 void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
-void hammer2_iocom_core(hammer2_iocom_t *iocom,
-                       void (*iocom_recvmsg)(hammer2_iocom_t *),
-                       void (*iocom_sendmsg)(hammer2_iocom_t *),
-                       void (*iocom_altmsg)(hammer2_iocom_t *));
+void hammer2_iocom_core(hammer2_iocom_t *iocom);
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
 void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                        void (*func)(hammer2_state_t *, hammer2_msg_t *),
index e406821..393933f 100644 (file)
@@ -71,13 +71,23 @@ hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 }
 
 /*
- * Initialize a low-level communications channel
+ * Initialize a low-level communications channel.
+ *
+ * NOTE: The state_func() is called at least once from the loop and can be
+ *      re-armed via hammer2_iocom_restate().
  */
 void
-hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
+hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
+                  void (*state_func)(hammer2_iocom_t *),
+                  void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+                  void (*altmsg_func)(hammer2_iocom_t *))
 {
        bzero(iocom, sizeof(*iocom));
 
+       iocom->state_callback = state_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
+
        pthread_mutex_init(&iocom->mtx, NULL);
        RB_INIT(&iocom->staterd_tree);
        RB_INIT(&iocom->statewr_tree);
@@ -88,6 +98,8 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ;
+       if (state_func)
+               iocom->flags |= HAMMER2_IOCOMF_SWORK;
        hammer2_ioq_init(iocom, &iocom->ioq_rx);
        hammer2_ioq_init(iocom, &iocom->ioq_tx);
        if (pipe(iocom->wakeupfds) < 0)
@@ -113,6 +125,27 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
 #endif
 }
 
+/*
+ * May only be called from a callback from iocom_core.
+ *
+ * Adjust state machine functions, set flags to guarantee that both
+ * the recevmsg_func and the sendmsg_func is called at least once.
+ */
+void
+hammer2_iocom_restate(hammer2_iocom_t *iocom,
+                  void (*state_func)(hammer2_iocom_t *),
+                  void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+                  void (*altmsg_func)(hammer2_iocom_t *))
+{
+       iocom->state_callback = state_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
+       if (state_func)
+               iocom->flags |= HAMMER2_IOCOMF_SWORK;
+       else
+               iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+}
+
 /*
  * Cleanup a terminating iocom.
  *
@@ -233,27 +266,22 @@ hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
  * Thread localized, iocom->mtx not held.
  */
 void
-hammer2_iocom_core(hammer2_iocom_t *iocom,
-                  void (*recvmsg_func)(hammer2_iocom_t *),
-                  void (*sendmsg_func)(hammer2_iocom_t *),
-                  void (*altmsg_func)(hammer2_iocom_t *))
+hammer2_iocom_core(hammer2_iocom_t *iocom)
 {
        struct pollfd fds[3];
        char dummybuf[256];
+       hammer2_msg_t *msg;
        int timeout;
        int count;
        int wi; /* wakeup pipe */
        int si; /* socket */
        int ai; /* alt bulk path socket */
 
-       iocom->recvmsg_callback = recvmsg_func;
-       iocom->sendmsg_callback = sendmsg_func;
-       iocom->altmsg_callback = altmsg_func;
-
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
                if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
                                     HAMMER2_IOCOMF_WWORK |
                                     HAMMER2_IOCOMF_PWORK |
+                                    HAMMER2_IOCOMF_SWORK |
                                     HAMMER2_IOCOMF_ARWORK |
                                     HAMMER2_IOCOMF_AWWORK)) == 0) {
                        /*
@@ -322,6 +350,11 @@ hammer2_iocom_core(hammer2_iocom_t *iocom,
                        iocom->flags |= HAMMER2_IOCOMF_PWORK;
                }
 
+               if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
+                       iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+                       iocom->state_callback(iocom);
+               }
+
                /*
                 * Pending message queues from other threads wake us up
                 * with a write to the wakeupfds[] pipe.  We have to clear
@@ -333,22 +366,31 @@ hammer2_iocom_core(hammer2_iocom_t *iocom,
                        iocom->flags |= HAMMER2_IOCOMF_RWORK;
                        iocom->flags |= HAMMER2_IOCOMF_WWORK;
                        if (TAILQ_FIRST(&iocom->txmsgq))
-                               iocom->sendmsg_callback(iocom);
+                               hammer2_iocom_flush1(iocom);
                }
 
                /*
                 * Message write sequencing
                 */
                if (iocom->flags & HAMMER2_IOCOMF_WWORK)
-                       iocom->sendmsg_callback(iocom);
+                       hammer2_iocom_flush1(iocom);
 
                /*
                 * Message read sequencing.  Run this after the write
                 * sequencing in case the write sequencing allowed another
                 * auto-DELETE to occur on the read side.
                 */
-               if (iocom->flags & HAMMER2_IOCOMF_RWORK)
-                       iocom->recvmsg_callback(iocom);
+               if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
+                       while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
+                              (msg = hammer2_ioq_read(iocom)) != NULL) {
+                               fprintf(stderr,
+                                       "receive msg cmd=%08x msgid=%016jx\n",
+                                       msg->any.head.cmd,
+                                       (intmax_t)msg->any.head.msgid);
+                               iocom->rcvmsg_callback(iocom, msg);
+                               hammer2_state_cleanuprx(iocom, msg);
+                       }
+               }
 
                if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
                        iocom->altmsg_callback(iocom);
@@ -766,6 +808,7 @@ again:
                msg->any.head.error = ioq->error;
 
                pthread_mutex_lock(&iocom->mtx);
+               fprintf(stderr, "CHECK REMAINING RXMSGS\n");
                if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
                        /*
                         * Active remote transactions are still present.
@@ -790,7 +833,7 @@ again:
                         * Simulate the other end sending us a DELETE.
                         */
                        if (state->rxcmd & HAMMER2_MSGF_DELETE) {
-                               fprintf(stderr, "SIMULATE DELETION WCONT\n");
+                               fprintf(stderr, "SIMULATE DELETION WCONT STATE->txcmd = %08x rxcmd = %08x msgid=%016jx\n", state->txcmd, state->rxcmd, state->msgid );
                                hammer2_msg_free(iocom, msg);
                                msg = NULL;
                        } else {
@@ -1397,13 +1440,6 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 
        dummy.msgid = msg->any.head.msgid;
        dummy.spanid = msg->any.head.spanid;
-#if 0
-       iocom_printf(iocom, msg->any.head.cmd,
-                    "received msg %08x msgid %jx spanid=%jx\n",
-                     msg->any.head.cmd,
-                     (intmax_t)msg->any.head.msgid,
-                     (intmax_t)msg->any.head.spanid);
-#endif
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
@@ -1435,9 +1471,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 * New persistant command received.
                 */
                if (state) {
-                       iocom_printf(iocom, msg->any.head.cmd,
-                                    "hammer2_state_msgrx: "
-                                    "duplicate transaction\n");
+                       fprintf(stderr, "hammer2_state_msgrx: "
+                                       "duplicate transaction\n");
                        error = HAMMER2_IOQ_ERROR_TRANS;
                        break;
                }
@@ -1466,9 +1501,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
                                error = HAMMER2_IOQ_ERROR_EALREADY;
                        } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgrx: "
-                                            "no state for DELETE\n");
+                               fprintf(stderr, "hammer2_state_msgrx: "
+                                               "no state for DELETE\n");
                                error = HAMMER2_IOQ_ERROR_TRANS;
                        }
                        break;
@@ -1482,9 +1516,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
                                error = HAMMER2_IOQ_ERROR_EALREADY;
                        } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgrx: "
-                                            "state reused for DELETE\n");
+                               fprintf(stderr, "hammer2_state_msgrx: "
+                                               "state reused for DELETE\n");
                                error = HAMMER2_IOQ_ERROR_TRANS;
                        }
                        break;
@@ -1512,10 +1545,11 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 * persistent state message should already exist.
                 */
                if (state == NULL) {
-                       iocom_printf(iocom, msg->any.head.cmd,
-                                    "hammer2_state_msgrx: "
-                                    "no state match for REPLY cmd=%08x\n",
-                                    msg->any.head.cmd);
+                       fprintf(stderr,
+                               "hammer2_state_msgrx: no state match for REPLY"
+                               " cmd=%08x msgid=%016jx\n",
+                               msg->any.head.cmd,
+                               (intmax_t)msg->any.head.msgid);
                        error = HAMMER2_IOQ_ERROR_TRANS;
                        break;
                }
@@ -1533,10 +1567,9 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
                                error = HAMMER2_IOQ_ERROR_EALREADY;
                        } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgrx: "
-                                            "no state match for "
-                                            "REPLY|DELETE\n");
+                               fprintf(stderr, "hammer2_state_msgrx: "
+                                               "no state match for "
+                                               "REPLY|DELETE\n");
                                error = HAMMER2_IOQ_ERROR_TRANS;
                        }
                        break;
@@ -1551,9 +1584,9 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
                                error = HAMMER2_IOQ_ERROR_EALREADY;
                        } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgrx: "
-                                            "state reused for REPLY|DELETE\n");
+                               fprintf(stderr, "hammer2_state_msgrx: "
+                                               "state reused for "
+                                               "REPLY|DELETE\n");
                                error = HAMMER2_IOQ_ERROR_TRANS;
                        }
                        break;
index 39c7689..6147f6f 100644 (file)
@@ -229,8 +229,9 @@ struct hammer2_iocom {
        hammer2_msg_queue_t freeq;              /* free msgs hdr only */
        hammer2_msg_queue_t freeq_aux;          /* free msgs w/aux_data */
        struct hammer2_address_queue  addrq;    /* source/target addrs */
-       void    (*recvmsg_callback)(struct hammer2_iocom *);
-       void    (*sendmsg_callback)(struct hammer2_iocom *);
+       void    (*state_callback)(struct hammer2_iocom *);
+       void    (*rcvmsg_callback)(struct hammer2_iocom *,
+                                  struct hammer2_msg *);
        void    (*altmsg_callback)(struct hammer2_iocom *);
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
@@ -255,4 +256,5 @@ typedef struct hammer2_iocom hammer2_iocom_t;
 #define HAMMER2_IOCOMF_PWORK   0x00000020      /* immediate work pending */
 #define HAMMER2_IOCOMF_ARWORK  0x00000040      /* immediate work pending */
 #define HAMMER2_IOCOMF_AWWORK  0x00000080      /* immediate work pending */
-#define HAMMER2_IOCOMF_CRYPTED 0x00000100      /* encrypt enabled */
+#define HAMMER2_IOCOMF_SWORK   0x00000100      /* immediate work pending */
+#define HAMMER2_IOCOMF_CRYPTED 0x00000200      /* encrypt enabled */