int
cmd_shell(const char *hostname)
{
- struct sockaddr_in lsin;
struct hammer2_iocom iocom;
hammer2_msg_t *msg;
- struct hostent *hen;
int fd;
/*
- * Acquire socket and set options
- */
- if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- fprintf(stderr, "cmd_debug: socket(): %s\n",
- strerror(errno));
- return 1;
- }
-
- /*
* Connect to the target
*/
- bzero(&lsin, sizeof(lsin));
- lsin.sin_family = AF_INET;
- lsin.sin_addr.s_addr = 0;
- lsin.sin_port = htons(HAMMER2_LISTEN_PORT);
-
- if (hostname) {
- hen = gethostbyname2(hostname, AF_INET);
- if (hen == NULL) {
- if (inet_pton(AF_INET, hostname, &lsin.sin_addr) != 1) {
- fprintf(stderr,
- "Cannot resolve %s\n", hostname);
- return 1;
- }
- } else {
- bcopy(hen->h_addr, &lsin.sin_addr, hen->h_length);
- }
- }
- if (connect(fd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
- close(fd);
- fprintf(stderr, "debug: connect failed: %s\n",
- strerror(errno));
- return 0;
- }
+ fd = hammer2_connect(hostname);
+ if (fd < 0)
+ return 1;
/*
* Run the session. The remote end transmits our prompt.
*/
hammer2_iocom_init(&iocom, fd, 0, NULL, shell_rcvmsg, shell_ttymsg);
+ fcntl(0, F_SETFL, O_NONBLOCK);
printf("debug: connected\n");
msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
bcopy(buf, msg->aux_data, len);
hammer2_msg_write(iocom, msg, NULL, NULL, NULL);
- } else {
+ } else if (feof(stdin)) {
/*
* Set EOF flag without setting any error code for normal
* EOF.
*/
iocom->flags |= HAMMER2_IOCOMF_EOF;
+ } else {
+ clearerr(stdin);
}
}
if (cmd == NULL || *cmd == 0) {
;
+ } else if (strcmp(cmd, "span") == 0) {
+ const char *hostname = strsep(&cmdbuf, " \t");
+ pthread_t thread;
+ int fd;
+
+ /*
+ * Connect to the target
+ */
+ if (hostname == NULL) {
+ fd = -1;
+ } else {
+ fd = hammer2_connect(hostname);
+ }
+
+ /*
+ * Start master service
+ */
+ if (fd < 0) {
+ iocom_printf(iocom, 0, "Connection to %s failed\n",
+ hostname);
+ } else {
+ iocom_printf(iocom, 0, "Connected to %s\n", hostname);
+ pthread_create(&thread, NULL,
+ master_service, (void *)(intptr_t)fd);
+ /*pthread_join(thread, &res);*/
+ }
} else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
iocom_printf(iocom, 0, "help Command help\n");
+ iocom_printf(iocom, 0, "span <host> Span to target host\n");
} else {
iocom_printf(iocom, 0, "Unrecognized command: %s\n", cmd);
}
}
/************************************************************************
+ * DEBUGSPAN *
+ ************************************************************************
+ *
+ * Connect to the target manually (not via the cluster list embedded in
+ * a hammer2 filesystem) and initiate the SPAN protocol.
+ */
+int
+cmd_debugspan(const char *hostname)
+{
+ pthread_t thread;
+ int fd;
+ void *res;
+
+ /*
+ * Connect to the target
+ */
+ fd = hammer2_connect(hostname);
+ if (fd < 0)
+ return 1;
+
+ printf("debugspan: connected to %s, starting CONN/SPAN\n", hostname);
+ pthread_create(&thread, NULL, master_service, (void *)(intptr_t)fd);
+ pthread_join(thread, &res);
+ return(0);
+}
+
+/************************************************************************
* SHOW *
************************************************************************/
#include "hammer2.h"
static void *master_accept(void *data);
-static void *master_service(void *data);
static void master_auth_state(hammer2_iocom_t *iocom);
static void master_auth_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
static void master_link_state(hammer2_iocom_t *iocom);
/*
* Service an accepted connection (runs as a pthread)
+ *
+ * (also called from a couple of other places)
*/
-static
void *
master_service(void *data)
{
#include "hammer2.h"
/*
+ * Setup crypto for pthreads
+ */
+static pthread_mutex_t *crypto_locks;
+int crypto_count;
+
+static
+unsigned long
+hammer2_crypto_id_callback(void)
+{
+ return ((unsigned long)(uintptr_t)pthread_self());
+}
+
+static
+void
+hammer2_crypto_locking_callback(int mode, int type,
+ const char *file __unused, int line __unused)
+{
+ assert(type >= 0 && type < crypto_count);
+ if (mode & CRYPTO_LOCK) {
+ pthread_mutex_lock(&crypto_locks[type]);
+ } else {
+ pthread_mutex_unlock(&crypto_locks[type]);
+ }
+}
+
+void
+hammer2_crypto_setup(void)
+{
+ crypto_count = CRYPTO_num_locks();
+ crypto_locks = calloc(crypto_count, sizeof(crypto_locks[0]));
+ CRYPTO_set_id_callback(hammer2_crypto_id_callback);
+ CRYPTO_set_locking_callback(hammer2_crypto_locking_callback);
+}
+
+/*
* Synchronously negotiate crypto for a new session. This must occur
* within 10 seconds or the connection is error'd out.
*
int
hammer2_crypto_encrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
- struct iovec *iov, int n)
+ struct iovec *iov, int n, size_t *nmaxp)
{
int p_len;
int i;
continue;
}
p_len -= already;
+ p_len &= ~HAMMER2_AES_KEY_MASK;
if (p_len > nmax)
p_len = nmax;
EVP_EncryptUpdate(&ioq->ctx,
}
iov[0].iov_base = ioq->buf + ioq->fifo_beg;
iov[0].iov_len = ioq->fifo_cdx - ioq->fifo_beg;
+ *nmaxp = (size_t)(ioq->fifo_cdx - ioq->fifo_beg);
return (1);
}
int cmd_stat(int ac, const char **av);
int cmd_leaf(const char *sel_path);
int cmd_shell(const char *hostname);
+int cmd_debugspan(const char *hostname);
int cmd_show(const char *devpath);
int cmd_rsainit(const char *dir_path);
int cmd_rsaenc(const char **keys, int nkeys);
/*
* Crypto functions
*/
+void hammer2_crypto_setup(void);
void hammer2_crypto_negotiate(hammer2_iocom_t *iocom);
void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_crypto_decrypt_aux(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
hammer2_msg_t *msg, int already);
int hammer2_crypto_encrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
- struct iovec *iov, int n);
+ struct iovec *iov, int n, size_t *nmaxp);
void hammer2_crypto_encrypt_wrote(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
int nact);
const char *hammer2_iptype_to_str(uint8_t type);
const char *hammer2_pfstype_to_str(uint8_t type);
const char *sizetostr(hammer2_off_t size);
+int hammer2_connect(const char *hostname);
+
+void *master_service(void *data);
void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
void iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...);
/* */
/*****************************************************************/
-static uint32_t crc32Table[256] = {
+static const uint32_t crc32Table[256] = {
0x00000000L, 0xF26B8303L, 0xE13B70F7L, 0x1350F3F4L,
0xC79A971FL, 0x35F1141CL, 0x26A1E7E8L, 0xD4CA64EBL,
0x8AD958CFL, 0x78B2DBCCL, 0x6BE22838L, 0x9989AB3BL,
srandomdev();
signal(SIGPIPE, SIG_IGN);
+ hammer2_crypto_setup();
/*
* Core options
usage(1);
}
ecode = cmd_remote_connect(sel_path, av[1]);
+ } else if (strcmp(av[0], "debugspan") == 0) {
+ /*
+ * Debug connection to the target hammer2 service and run
+ * the CONN/SPAN protocol.
+ */
+ if (ac < 2) {
+ fprintf(stderr, "debugspan: requires hostname\n");
+ usage(1);
+ }
+ ecode = cmd_debugspan(av[1]);
} else if (strcmp(av[0], "disconnect") == 0) {
/*
* Remove cluster connection
" stat [<path>] Return inode quota & config\n"
" leaf Start pfs leaf daemon\n"
" shell [<host>] Connect to debug shell\n"
+ " debugspan <target> Connect to target, run CONN/SPAN\n"
" rsainit Initialize rsa fields\n"
" show devpath Raw hammer2 media dump\n"
);
}
}
- if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
+ if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
+ iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
iocom->altmsg_callback(iocom);
+ }
}
}
}
/*
- * Finally allocate the message and copy the core header
- * to the embedded extended header.
- *
- * Initialize msg->aux_size to 0 and use it to track
- * the amount of data copied from the stream.
+ * Allocate the message, the next state will fill it in.
*/
msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
ioq->msg = msg;
/*
- * We are either done or we fall-through
- */
- if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
- bcopy(head, &msg->any.head, sizeof(msg->any.head));
- ioq->fifo_beg += ioq->hbytes;
- break;
- }
-
- /*
* Fall through to the next state. Make sure that the
* extended header does not straddle the end of the buffer.
* We still want to issue larger reads into our buffer,
assert(msg != NULL);
if (bytes < ioq->hbytes) {
n = read(iocom->sock_fd,
- msg->any.buf + ioq->fifo_end,
+ ioq->buf + ioq->fifo_end,
nmax);
if (n <= 0) {
if (n == 0) {
head->hdr_crc = 0;
if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
ioq->error = HAMMER2_IOQ_ERROR_XCRC;
+ fprintf(stderr, "XCRC FAILED %08x %08x\n",
+ xcrc32, hammer2_icrc32(head, ioq->hbytes));
+ assert(0);
break;
}
head->hdr_crc = xcrc32;
hammer2_ioq_t *ioq = &iocom->ioq_tx;
hammer2_msg_t *msg;
ssize_t nmax;
+ ssize_t omax;
ssize_t nact;
struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
size_t hbytes;
size_t abytes;
- int hoff;
- int aoff;
+ size_t hoff;
+ size_t aoff;
int n;
if (ioq->error) {
/*
* Pump messages out the connection by building an iovec.
+ *
+ * ioq->hbytes/ioq->abytes tracks how much of the first message
+ * in the queue has been successfully written out, so we can
+ * resume writing.
*/
n = 0;
nmax = 0;
+ hoff = ioq->hbytes;
+ aoff = ioq->abytes;
TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
- hoff = 0;
hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
HAMMER2_MSG_ALIGN;
- aoff = 0;
abytes = msg->aux_size;
- if (n == 0) {
- hoff += ioq->hbytes;
- aoff += ioq->abytes;
- }
- if (hbytes - hoff > 0) {
+ assert(hoff <= hbytes && aoff <= abytes);
+
+ if (hoff < hbytes) {
iov[n].iov_base = (char *)&msg->any.head + hoff;
iov[n].iov_len = hbytes - hoff;
nmax += hbytes - hoff;
if (n == HAMMER2_IOQ_MAXIOVEC)
break;
}
- if (abytes - aoff > 0) {
+ if (aoff < abytes) {
assert(msg->aux_data != NULL);
- iov[n].iov_base = msg->aux_data + aoff;
+ iov[n].iov_base = (char *)msg->aux_data + aoff;
iov[n].iov_len = abytes - aoff;
nmax += abytes - aoff;
++n;
if (n == HAMMER2_IOQ_MAXIOVEC)
break;
}
+ hoff = 0;
+ aoff = 0;
}
if (n == 0)
return;
* data into the fifo and adjust the iov as necessary. If
* encryption is disabled the iov is left alone.
*
- * hammer2_crypto_encrypt_wrote()
+ * May return a smaller iov (thus a smaller n), with aggregated
+ * chunks. May reduce nmax to what fits in the FIFO.
*/
- n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
+ omax = nmax;
+ n = hammer2_crypto_encrypt(iocom, ioq, iov, n, &nmax);
/*
* Execute the writev() then figure out what happened.
}
/*
- * Indicate bytes written successfully. If we were unable to
- * write the entire iov array then set WREQ to wait for more
- * socket buffer space.
+ * Indicate bytes written successfully.
+ *
+ * If we were unable to write the entire iov array then set WREQ
+ * to wait for more socket buffer space.
+ *
+ * If the FIFO space was insufficient to fully drain all messages
+ * set WWORK to cause the core to call us again for the next batch.
*/
hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
if (nact != nmax)
iocom->flags |= HAMMER2_IOCOMF_WREQ;
+ else if (nmax != omax)
+ iocom->flags |= HAMMER2_IOCOMF_WWORK;
/*
* Clean out the transmit queue based on what we successfully
- * sent.
+ * sent. ioq->hbytes/abytes represents the portion of the first
+ * message previously sent.
*/
while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
if ((size_t)nact < hbytes - ioq->hbytes) {
ioq->hbytes += nact;
+ /* nact = 0; */
break;
}
nact -= hbytes - ioq->hbytes;
ioq->hbytes = hbytes;
if ((size_t)nact < abytes - ioq->abytes) {
ioq->abytes += nact;
+ /* nact = 0; */
break;
}
nact -= abytes - ioq->abytes;
hammer2_state_cleanuptx(iocom, msg);
}
+ assert(nact == 0);
if (ioq->error) {
hammer2_iocom_drain(iocom);
}
hammer2_msg_t *msg;
iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
+ ioq->hbytes = 0;
+ ioq->abytes = 0;
while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_relay_scan(h2span_node_t *node);
+static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
static void hammer2_relay_delete(h2span_relay_t *relay);
/*
TAILQ_INSERT_TAIL(&connq, conn, entry);
hammer2_msg_result(state->iocom, msg, 0);
+
+ /*
+ * Span-synchronize all nodes with the new connection
+ */
+ hammer2_relay_scan(conn, NULL);
}
/*
state->any.link = slink;
RB_INSERT(h2span_link_tree, &node->tree, slink);
- hammer2_relay_scan(node);
+ hammer2_relay_scan(NULL, node);
}
/*
* removed and there's nothing left to do.
*/
if (node)
- hammer2_relay_scan(node);
+ hammer2_relay_scan(NULL, node);
}
pthread_mutex_unlock(&cluster_mtx);
*
* Called with cluster_mtx held.
*/
-static void hammer2_relay_scan_conn(h2span_node_t *node,
- h2span_connect_t *conn);
+static void hammer2_relay_scan_specific(h2span_node_t *node,
+ h2span_connect_t *conn);
static void
-hammer2_relay_scan(h2span_node_t *node)
+hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node)
{
h2span_cluster_t *cls;
- h2span_connect_t *conn;
if (node) {
/*
* Iterate specific node
*/
TAILQ_FOREACH(conn, &connq, entry)
- hammer2_relay_scan_conn(node, conn);
+ hammer2_relay_scan_specific(node, conn);
} else {
/*
- * Full iteration (not currently implemented)
+ * Full iteration.
*
- * Iterate cluster ids
+ * Iterate cluster ids, nodes, and either a specific connection
+ * or all connections.
*/
- assert(0);
RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
/*
* Iterate node ids
* Synchronize the node's link (received SPANs)
* with each connection's relays.
*/
- TAILQ_FOREACH(conn, &connq, entry)
- hammer2_relay_scan_conn(node, conn);
+ if (conn) {
+ hammer2_relay_scan_specific(node, conn);
+ } else {
+ TAILQ_FOREACH(conn, &connq, entry) {
+ hammer2_relay_scan_specific(node,
+ conn);
+ }
+ assert(conn == NULL);
+ }
}
}
}
}
static void
-hammer2_relay_scan_conn(h2span_node_t *node, h2span_connect_t *conn)
+hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
{
struct relay_scan_info info;
h2span_relay_t *relay;
hammer2_msg_t *msg;
assert(relay == NULL ||
- slink->dist <= relay->link->dist);
+ relay->link->dist <= slink->dist);
relay = hammer2_alloc(sizeof(*relay));
relay->conn = conn;
relay->link = slink;
typedef struct hammer2_handshake hammer2_handshake_t;
+/*
+ * NOTE: HAMMER2_MSG_ALIGN (64) must be a multiple of HAMMER2_AES_KEY_SIZE.
+ */
#define HAMMER2_AES_KEY_SIZE 32
#define HAMMER2_AES_KEY_MASK (HAMMER2_AES_KEY_SIZE - 1)
#define HAMMER2_AES_TYPE aes_256_cbc
{
free(ptr);
}
+
+int
+hammer2_connect(const char *hostname)
+{
+ struct sockaddr_in lsin;
+ struct hostent *hen;
+ int fd;
+
+ /*
+ * Acquire socket and set options
+ */
+ if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "cmd_debug: socket(): %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ /*
+ * Connect to the target
+ */
+ bzero(&lsin, sizeof(lsin));
+ lsin.sin_family = AF_INET;
+ lsin.sin_addr.s_addr = 0;
+ lsin.sin_port = htons(HAMMER2_LISTEN_PORT);
+
+ if (hostname) {
+ hen = gethostbyname2(hostname, AF_INET);
+ if (hen == NULL) {
+ if (inet_pton(AF_INET, hostname, &lsin.sin_addr) != 1) {
+ fprintf(stderr,
+ "Cannot resolve %s\n", hostname);
+ return -1;
+ }
+ } else {
+ bcopy(hen->h_addr, &lsin.sin_addr, hen->h_length);
+ }
+ }
+ if (connect(fd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
+ close(fd);
+ fprintf(stderr, "debug: connect failed: %s\n",
+ strerror(errno));
+ return -1;
+ }
+ return (fd);
+}