static void master_link_rx(hammer2_iocom_t *iocom);
static void master_link_tx(hammer2_iocom_t *iocom);
+static void hammer2_lnk_span(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+
/*
* Start-up the master listener daemon for the machine.
*
* AUTHENTICATION *
************************************************************************
*
+ * Callback via hammer2_iocom_core().
+ *
* Additional messaging-based authentication must occur before normal
* message operation. The connection has already been encrypted at
* this point.
iocom->sendmsg_callback = master_link_tx;
}
-/*
- * Callback from hammer2_iocom_core() when messages might be present
- * on the socket.
+/************************************************************************
+ * POST-AUTHENTICATION SERVICE MSGS *
+ ************************************************************************
+ *
+ * Callback via hammer2_iocom_core().
*/
static
void
master_link_rx(hammer2_iocom_t *iocom)
{
hammer2_msg_t *msg;
+ uint32_t cmd;
while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
(msg = hammer2_ioq_read(iocom)) != NULL) {
- fprintf(stderr, "MSG RECEIVED: %08x error %d\n",
- msg->any.head.cmd, msg->any.head.error);
- switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+ /*
+ * Switch on the transactional cmd, that is the original
+ * msg->any.head.cmd that opened the transaction. The actual
+ * msg might be different. The original msg cannot have
+ * REPLY set by definition (but of course the currenet msg
+ * might), so we don't bother with case statements for REPLY
+ * for command sequences we expet to be transactional.
+ *
+ * Non-transactional one-off messages, on the otherhand,
+ * might have REPLY set.
+ */
+ if (msg->state) {
+ cmd = msg->state->msg->any.head.cmd;
+ fprintf(stderr,
+ "MSGRX persist=%08x cmd=%08x error %d\n",
+ cmd, msg->any.head.cmd, msg->any.head.error);
+ } else {
+ cmd = msg->any.head.cmd;
+ fprintf(stderr,
+ "MSGRX persist=-------- cmd=%08x error %d\n",
+ cmd, msg->any.head.error);
+ }
+
+ switch(cmd & HAMMER2_MSGF_CMDSWMASK) {
case HAMMER2_LNK_ERROR:
+ /*
+ * A non-transactional error is formulated when
+ * the socket or pipe disconnects. Ignore it.
+ */
+ break;
+ case HAMMER2_LNK_SPAN:
+ /*
+ * Messages related to the LNK_SPAN transaction.
+ */
+ hammer2_lnk_span(iocom, msg);
break;
case HAMMER2_DBG_SHELL:
case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
+ /*
+ * Non-transactional DBG messages.
+ */
hammer2_shell_remote(iocom, msg);
break;
default:
{
hammer2_iocom_flush(iocom);
}
+
+/*
+ * Receive a message which is part of a LNK_SPAN transaction. Keep in
+ * mind that only the original CREATE is utilizing the lnk_span message
+ * header.
+ *
+ * We will get called for CREATE, DELETE, and intermediate states (including
+ * errors), and in particular we will get called with an error if the link
+ * is lost in the middle of the transaction.
+ */
+static
+void
+hammer2_lnk_span(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg)
+{
+ char *alloc = NULL;
+
+ switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
+ case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
+ fprintf(stderr,
+ "LNK_SPAN: %s/%s\n",
+ hammer2_uuid_to_str(&msg->any.lnk_span.pfs_id, &alloc),
+ msg->any.lnk_span.label);
+ free(alloc);
+ break;
+ case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
+ fprintf(stderr, "LNK_SPAN: Terminated with error\n");
+ break;
+ default:
+ fprintf(stderr,
+ "LNK_SPAN: Unknown msg %08x\n", msg->any.head.cmd);
+ break;
+ }
+}
* Caller should retry on a read-event when NULL is returned.
*
* If an error occurs during reception a HAMMER2_LNK_ERROR msg will
- * be returned (and the caller must not call us again after that).
+ * be returned for each open transaction, then the ioq and iocom
+ * will be errored out and a non-transactional HAMMER2_LNK_ERROR
+ * msg will be returned as the final message. The caller should not call
+ * us again after the final message is returned.
*/
hammer2_msg_t *
hammer2_ioq_read(hammer2_iocom_t *iocom)
hammer2_ioq_t *ioq = &iocom->ioq_rx;
hammer2_msg_t *msg;
hammer2_msg_hdr_t *head;
+ hammer2_state_t *state;
ssize_t n;
size_t bytes;
size_t nmax;
}
break;
case HAMMER2_MSGQ_STATE_ERROR:
+ /*
+ * Continued calls to drain recorded transactions (returning
+ * a LNK_ERROR for each one), before we return the final
+ * LNK_ERROR.
+ */
+ assert(msg == NULL);
+ break;
default:
/*
* We don't double-return errors, the caller should not
*/
if (ioq->error) {
/*
- * An unrecoverable error occured during processing,
- * return a special error message. Try to leave the
- * ioq state alone for post-mortem debugging.
+ * An unrecoverable error causes all active receive
+ * transactions to be terminated with a LNK_ERROR message.
*
- * Link error messages are returned as one-way messages,
- * so no flags get set. Source and target is 0 (link-level),
- * msgid is 0 (link-level). All we really need to do is
- * set up magic, cmd, and error.
+ * Once all active transactions are exhausted we set the
+ * iocom ERROR flag and return a non-transactional LNK_ERROR
+ * message, which should cause master processing loops to
+ * terminate.
*/
assert(ioq->msg == msg);
- if (msg == NULL)
- msg = hammer2_msg_alloc(iocom, 0, 0);
- else
+ if (msg) {
+ hammer2_msg_free(iocom, msg);
ioq->msg = NULL;
-
- if (msg->aux_data) {
- free(msg->aux_data);
- msg->aux_data = NULL;
- msg->aux_size = 0;
}
+
+ /*
+ * No more I/O read processing
+ */
+ ioq->state = HAMMER2_MSGQ_STATE_ERROR;
+
+ /*
+ * Return LNK_ERROR for any open transaction, and finally
+ * as a non-transactional message when no transactions are
+ * left.
+ */
+ msg = hammer2_msg_alloc(iocom, 0, 0);
bzero(&msg->any.head, sizeof(msg->any.head));
msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
msg->any.head.cmd = HAMMER2_LNK_ERROR;
msg->any.head.error = ioq->error;
- ioq->state = HAMMER2_MSGQ_STATE_ERROR;
- iocom->flags |= HAMMER2_IOCOMF_EOF;
+
+ if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
+ /*
+ * Active transactions are still present. Simulate
+ * the other end sending us a DELETE.
+ */
+ state->txcmd |= HAMMER2_MSGF_DELETE;
+ msg->state = state;
+ msg->any.head.source = state->source;
+ msg->any.head.target = state->target;
+ msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
+ HAMMER2_MSGF_DELETE;
+ } else {
+ /*
+ * No active transactions remain
+ */
+ msg->state = NULL;
+ iocom->flags |= HAMMER2_IOCOMF_EOF;
+ }
} else if (msg == NULL) {
/*
* Insufficient data received to finish building the message,
hammer2_state_t *state;
if ((state = msg->state) == NULL) {
+ /*
+ * Free a non-transactional message, there is no state
+ * to worry about.
+ */
hammer2_msg_free(iocom, msg);
} else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ /*
+ * Message terminating transaction, destroy the related
+ * state, the original message, and this message (if it
+ * isn't the original message due to a CREATE|DELETE).
+ */
/*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
state->rxcmd |= HAMMER2_MSGF_DELETE;
if (state->txcmd & HAMMER2_MSGF_DELETE) {
}
hammer2_msg_free(iocom, msg);
} else if (state->msg != msg) {
+ /*
+ * Message not terminating transaction, leave state intact
+ * and free message if it isn't the CREATE message.
+ */
hammer2_msg_free(iocom, msg);
}
}
msg = state->msg;
state->msg = NULL;
- free(state);
if (msg)
hammer2_msg_free(iocom, msg);
free(state);