hammer2 - cluster / hammer2 service daemon work
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:43:28 +0000 (15:43 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:43:28 +0000 (15:43 -0800)
* Reformulate the service demon to use the updated libdmsg API

* Add code to track remote block device SPANs and to interface to the
  local /dev/xdisk to create local block devices for the remotes.

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_rsa.c
sbin/hammer2/cmd_service.c

index 1d9f3ef..56448a9 100644 (file)
@@ -69,7 +69,7 @@ cmd_shell(const char *hostname)
        fcntl(0, F_SETFL, O_NONBLOCK);
        printf("debug: connected\n");
 
-       msg = dmsg_msg_alloc(iocom.router, 0, DMSG_DBG_SHELL, NULL, NULL);
+       msg = dmsg_msg_alloc(&iocom.circuit0, 0, DMSG_DBG_SHELL, NULL, NULL);
        dmsg_msg_write(msg);
        dmsg_iocom_core(&iocom);
        fprintf(stderr, "debug: disconnected\n");
@@ -153,7 +153,7 @@ shell_ttymsg(dmsg_iocom_t *iocom)
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
-               msg = dmsg_msg_alloc(iocom->router, len, DMSG_DBG_SHELL,
+               msg = dmsg_msg_alloc(&iocom->circuit0, len, DMSG_DBG_SHELL,
                                     NULL, NULL);
                bcopy(buf, msg->aux_data, len);
                dmsg_msg_write(msg);
@@ -168,32 +168,32 @@ shell_ttymsg(dmsg_iocom_t *iocom)
        }
 }
 
-static void shell_span(dmsg_router_t *router, char *cmdbuf);
+static void shell_span(dmsg_circuit_t *circuit, char *cmdbuf);
 
 void
 hammer2_shell_parse(dmsg_msg_t *msg)
 {
-       dmsg_router_t *router = msg->router;
+       dmsg_circuit_t *circuit = msg->circuit;
        char *cmdbuf = msg->aux_data;
        char *cmd = strsep(&cmdbuf, " \t");
 
        if (cmd == NULL || *cmd == 0) {
                ;
        } else if (strcmp(cmd, "span") == 0) {
-               shell_span(router, cmdbuf);
+               shell_span(circuit, cmdbuf);
        } else if (strcmp(cmd, "tree") == 0) {
-               dmsg_shell_tree(router, cmdbuf); /* dump spanning tree */
+               dmsg_shell_tree(circuit, cmdbuf); /* dump spanning tree */
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
-               dmsg_router_printf(router, "help            Command help\n");
-               dmsg_router_printf(router, "span <host>     Span to target host\n");
-               dmsg_router_printf(router, "tree            Dump spanning tree\n");
+               dmsg_circuit_printf(circuit, "help            Command help\n");
+               dmsg_circuit_printf(circuit, "span <host>     Span to target host\n");
+               dmsg_circuit_printf(circuit, "tree            Dump spanning tree\n");
        } else {
-               dmsg_router_printf(router, "Unrecognized command: %s\n", cmd);
+               dmsg_circuit_printf(circuit, "Unrecognized command: %s\n", cmd);
        }
 }
 
 static void
-shell_span(dmsg_router_t *router, char *cmdbuf)
+shell_span(dmsg_circuit_t *circuit, char *cmdbuf)
 {
        dmsg_master_service_info_t *info;
        const char *hostname = strsep(&cmdbuf, " \t");
@@ -213,9 +213,9 @@ shell_span(dmsg_router_t *router, char *cmdbuf)
         * Start master service
         */
        if (fd < 0) {
-               dmsg_router_printf(router, "Connection to %s failed\n", hostname);
+               dmsg_circuit_printf(circuit, "Connection to %s failed\n", hostname);
        } else {
-               dmsg_router_printf(router, "Connected to %s\n", hostname);
+               dmsg_circuit_printf(circuit, "Connected to %s\n", hostname);
 
                info = malloc(sizeof(*info));
                bzero(info, sizeof(*info));
index e99c5ea..40ed685 100644 (file)
@@ -71,10 +71,9 @@ cmd_rsainit(const char *dir_path)
        asprintf(&str1, "%s/rsa.prv", dir_path);
        asprintf(&str2, "%s/rsa.pub", dir_path);
 
+       old_umask = umask(077);
        if (stat(str1, &st) < 0) {
-               old_umask = umask(077);
                asprintf(&cmd, "openssl genrsa -out %s 2048", str1);
-               umask(old_umask);
                ecode = system(cmd);
                free(cmd);
                chmod(str1, 0400);
@@ -83,6 +82,7 @@ cmd_rsainit(const char *dir_path)
                                "hammer2 rsainit: private key gen failed\n");
                        free(str2);
                        free(str1);
+                       umask(old_umask);
                        return 1;
                }
                printf("hammer2 rsainit: created %s\n", str1);
@@ -101,12 +101,14 @@ cmd_rsainit(const char *dir_path)
                                "hammer2 rsainit: public key gen failed\n");
                        free(str2);
                        free(str1);
+                       umask(old_umask);
                        return 1;
                }
                printf("hammer2 rsainit: created %s\n", str2);
        } else {
                printf("hammer2 rsainit: both keys already exist\n");
        }
+       umask(old_umask);
        free(str2);
        free(str1);
 
index de396e8..2c8c74d 100644 (file)
 
 #include "hammer2.h"
 
+#include <sys/xdiskioctl.h>
+
 struct diskcon {
        TAILQ_ENTRY(diskcon) entry;
        char    *disk;
 };
 
+struct service_node_opaque {
+       char    cl_label[64];
+       char    fs_label[64];
+       dmsg_media_block_t block;
+       int     attached;
+       int     servicing;
+       int     servicefd;
+};
+
 #define WS " \r\n"
 
 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
@@ -51,6 +62,11 @@ static void master_reconnect(const char *mntpt);
 static void disk_reconnect(const char *disk);
 static void disk_disconnect(void *handle);
 static void udev_check_disks(void);
+static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
+
+static void xdisk_reconnect(struct service_node_opaque *info);
+static void xdisk_disconnect(void *handle);
+static void *xdisk_attach_tmpthread(void *data);
 
 /*
  * Start-up the master listener daemon for the machine.
@@ -147,6 +163,8 @@ service_thread(void *data)
        setproctitle("hammer2 master listen");
        pthread_detach(pthread_self());
 
+       dmsg_node_handler = service_node_handler;
+
        /*
         * Start up a thread to handle block device monitoring
         */
@@ -188,6 +206,61 @@ service_thread(void *data)
 }
 
 /*
+ * Node discovery code on received SPANs (or loss of SPANs).  This code
+ * is used to track the availability of remote block devices and install
+ * or deinstall them using the xdisk driver (/dev/xdisk).
+ *
+ * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
+ * the data handed to it.  When opened, a virtual circuit is forged and
+ * maintained to the block device server via DMSG.  Temporary failures
+ * stall the device until successfully reconnected or explicitly destroyed.
+ */
+static
+void
+service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
+{
+       struct service_node_opaque *info = *opaquep;
+
+       switch(op) {
+       case DMSG_NODEOP_ADD:
+               if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
+                       break;
+               if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
+                       break;
+               if (info == NULL) {
+                       info = malloc(sizeof(*info));
+                       bzero(info, sizeof(*info));
+                       *opaquep = info;
+               }
+               snprintf(info->cl_label, sizeof(info->cl_label),
+                        "%s", msg->any.lnk_span.cl_label);
+               snprintf(info->fs_label, sizeof(info->fs_label),
+                        "%s", msg->any.lnk_span.fs_label);
+               info->block = msg->any.lnk_span.media.block;
+               fprintf(stderr, "NODE ADD %s serno %s\n",
+                       info->cl_label, info->fs_label);
+               xdisk_reconnect(info);
+               break;
+       case DMSG_NODEOP_DEL:
+               if (info) {
+                       fprintf(stderr, "NODE DEL %s serno %s\n",
+                               info->cl_label, info->fs_label);
+                       pthread_mutex_lock(&diskmtx);
+                       *opaquep = NULL;
+                       info->attached = 0;
+                       if (info->servicing == 0)
+                               free(info);
+                       else
+                               shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
+                       pthread_mutex_unlock(&diskmtx);
+               }
+               break;
+       default:
+               break;
+       }
+}
+
+/*
  * Monitor block devices.  Currently polls every ~10 seconds or so.
  */
 static
@@ -311,7 +384,7 @@ master_reconnect(const char *mntpt)
 }
 
 /*
- * Reconnect a physical disk to the mesh.
+ * Reconnect a physical disk service to the mesh.
  */
 static
 void
@@ -332,6 +405,8 @@ disk_reconnect(const char *disk)
         */
        if (strncmp(disk, "md", 2) == 0)
                return;
+       if (strncmp(disk, "xa", 2) == 0)
+               return;
 
        /*
         * Check if already connected
@@ -404,3 +479,87 @@ disk_disconnect(void *handle)
        free(dc->disk);
        free(dc);
 }
+
+/*
+ * [re]connect a remote disk service to the local system via /dev/xdisk.
+ */
+static
+void
+xdisk_reconnect(struct service_node_opaque *xdisk)
+{
+       struct xdisk_attach_ioctl *xaioc;
+       dmsg_master_service_info_t *info;
+       pthread_t thread;
+       int pipefds[2];
+
+       if (pipe(pipefds) < 0) {
+               fprintf(stderr, "reconnect %s: pipe() failed\n",
+                       xdisk->cl_label);
+               return;
+       }
+
+       info = malloc(sizeof(*info));
+       bzero(info, sizeof(*info));
+       info->fd = pipefds[1];
+       info->detachme = 1;
+       info->dbgmsg_callback = hammer2_shell_parse;
+       info->exit_callback = xdisk_disconnect;
+       info->handle = xdisk;
+       xdisk->servicing = 1;
+       xdisk->servicefd = info->fd;
+       pthread_create(&thread, NULL, dmsg_master_service, info);
+
+       /*
+        * We have to run the attach in its own pthread because it will
+        * synchronously interact with the messaging subsystem over the
+        * pipe.  If we do it here we will deadlock.
+        */
+       xaioc = malloc(sizeof(*xaioc));
+       bzero(xaioc, sizeof(xaioc));
+       snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
+                "%s", xdisk->cl_label);
+       snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
+                "X-%s", xdisk->fs_label);
+       xaioc->bytes = xdisk->block.bytes;
+       xaioc->blksize = xdisk->block.blksize;
+       xaioc->fd = pipefds[0];
+
+       pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
+}
+
+static
+void *
+xdisk_attach_tmpthread(void *data)
+{
+       struct xdisk_attach_ioctl *xaioc = data;
+       int fd;
+
+       pthread_detach(pthread_self());
+
+       fd = open("/dev/xdisk", O_RDWR, 0600);
+       if (fd < 0) {
+               fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
+       }
+       if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
+               fprintf(stderr, "reconnect %s: xdisk attach failed\n",
+                       xaioc->cl_label);
+       }
+       close(xaioc->fd);
+       close(fd);
+       return (NULL);
+}
+
+static
+void
+xdisk_disconnect(void *handle)
+{
+       struct service_node_opaque *info = handle;
+
+       assert(info->servicing == 1);
+
+       pthread_mutex_lock(&diskmtx);
+       info->servicing = 0;
+       if (info->attached == 0)
+               free(info);
+       pthread_mutex_unlock(&diskmtx);
+}