4 * This module implements a simple remote control protocol
6 * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $
14 static void * hcc_reader_thread(void *arg);
16 static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
17 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
20 hcc_connect(struct HostConf *hc)
26 if (hc == NULL || hc->host == NULL)
31 if (pipe(fdout) < 0) {
36 if ((hc->pid = fork()) == 0) {
53 for (m = 0; m < ssh_argc; m++)
54 av[n++] = ssh_argv[m];
61 execv("/usr/bin/ssh", (void *)av);
63 } else if (hc->pid < 0) {
67 * Parent process. Do the initial handshake to make sure we are
68 * actually talking to a cpdup slave.
75 pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
82 rc_badop(hctransaction_t trans __unused, struct HCHead *head)
84 head->error = EOPNOTSUPP;
89 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count)
91 struct HostConf hcslave;
94 struct HCTransaction trans;
95 int (*dispatch[256])(hctransaction_t, struct HCHead *);
100 bzero(&hcslave, sizeof(hcslave));
101 bzero(&trans, sizeof(trans));
102 for (i = 0; i < count; ++i) {
103 struct HCDesc *desc = &descs[i];
104 assert(desc->cmd >= 0 && desc->cmd < 256);
105 dispatch[desc->cmd] = desc->func;
107 for (i = 0; i < 256; ++i) {
108 if (dispatch[i] == NULL)
109 dispatch[i] = rc_badop;
112 hcslave.fdout = fdout;
116 pthread_mutex_unlock(&MasterMutex);
119 * Process commands on fdin and write out results on fdout
125 head = hcc_read_command(trans.hc, &trans);
130 * Start the reply and dispatch, then process the return code.
133 hcc_start_reply(&trans, head);
135 r = dispatch[head->cmd & 255](&trans, head);
139 head->error = EINVAL;
152 * Write out the reply
154 whead = (void *)trans.wbuf;
155 whead->bytes = trans.windex;
156 whead->error = head->error;
157 aligned_bytes = HCC_ALIGN(trans.windex);
159 hcc_debug_dump(whead);
161 if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes)
169 * This thread collects responses from the link. It is run without
173 hcc_reader_thread(void *arg)
175 struct HostConf *hc = arg;
176 struct HCHead *rhead;
177 hctransaction_t scan;
180 pthread_detach(pthread_self());
181 while (hcc_read_command(hc, NULL) != NULL)
183 hc->reader_thread = NULL;
186 * Clean up any threads stuck waiting for a reply.
188 pthread_mutex_lock(&MasterMutex);
189 for (i = 0; i < HCTHASH_SIZE; ++i) {
190 pthread_mutex_lock(&hc->hct_mutex[i]);
191 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
192 if (scan->state == HCT_SENT) {
193 scan->state = HCT_REPLIED;
194 rhead = (void *)scan->rbuf;
195 rhead->error = ENOTCONN;
197 pthread_cond_signal(&scan->cond);
200 pthread_mutex_unlock(&hc->hct_mutex[i]);
202 pthread_mutex_unlock(&MasterMutex);
209 * This reads a command from fdin, fixes up the byte ordering, and returns
210 * a pointer to HCHead.
212 * The MasterMutex may or may not be held. When threaded this command
213 * is serialized by a reader thread.
217 hcc_read_command(struct HostConf *hc, hctransaction_t trans)
219 hctransaction_t fill;
226 while (n < (int)sizeof(struct HCHead)) {
227 r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
233 assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
234 assert(tmp.magic == HCMAGIC);
240 pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
241 for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
245 if (fill->state == HCT_SENT && fill->id == tmp.id)
248 pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
253 "cpdup hlink protocol error with %s (%04x)\n",
259 bcopy(&tmp, fill->rbuf, n);
260 aligned_bytes = HCC_ALIGN(tmp.bytes);
262 while (n < aligned_bytes) {
263 r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
269 hcc_debug_dump(head);
272 pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
274 fill->state = HCT_REPLIED;
277 pthread_cond_signal(&fill->cond);
278 pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
280 return((void *)fill->rbuf);
289 hcc_get_trans(struct HostConf *hc)
291 hctransaction_t trans;
292 hctransaction_t scan;
293 pthread_t tid = pthread_self();
296 i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
298 pthread_mutex_lock(&hc->hct_mutex[i]);
299 for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
300 if (trans->tid == tid)
304 trans = malloc(sizeof(*trans));
305 bzero(trans, sizeof(*trans));
308 pthread_cond_init(&trans->cond, NULL);
310 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
311 if (scan->id == trans->id) {
312 trans->id += HCTHASH_SIZE;
316 } while (scan != NULL);
318 trans->next = hc->hct_hash[i];
319 hc->hct_hash[i] = trans;
321 pthread_mutex_unlock(&hc->hct_mutex[i]);
326 hcc_free_trans(struct HostConf *hc)
328 hctransaction_t trans;
329 hctransaction_t *transp;
330 pthread_t tid = pthread_self();
333 i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
335 pthread_mutex_lock(&hc->hct_mutex[i]);
336 for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
338 if (trans->tid == tid) {
339 *transp = trans->next;
340 pthread_cond_destroy(&trans->cond);
345 pthread_mutex_unlock(&hc->hct_mutex[i]);
352 hcc_get_trans(struct HostConf *hc)
358 hcc_free_trans(struct HostConf *hc __unused)
366 * Initialize for a new command
369 hcc_start_command(struct HostConf *hc, int16_t cmd)
371 struct HCHead *whead;
372 hctransaction_t trans;
374 trans = hcc_get_trans(hc);
376 whead = (void *)trans->wbuf;
377 whead->magic = HCMAGIC;
380 whead->id = trans->id;
383 trans->windex = sizeof(*whead);
385 trans->state = HCT_IDLE;
391 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead)
393 struct HCHead *whead = (void *)trans->wbuf;
395 whead->magic = HCMAGIC;
397 whead->cmd = rhead->cmd | HCF_REPLY;
398 whead->id = rhead->id;
401 trans->windex = sizeof(*whead);
405 * Finish constructing a command, transmit it, and await the reply.
406 * Return the HCHead of the reply.
409 hcc_finish_command(hctransaction_t trans)
412 struct HCHead *whead;
413 struct HCHead *rhead;
418 whead = (void *)trans->wbuf;
419 whead->bytes = trans->windex;
420 aligned_bytes = HCC_ALIGN(trans->windex);
422 trans->state = HCT_SENT;
424 if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
430 if (whead->cmd < 0x0010)
432 fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
439 * whead is invalid when we call hcc_read_command() because
440 * we may switch to another thread.
443 pthread_mutex_unlock(&MasterMutex);
444 while (trans->state != HCT_REPLIED && hc->reader_thread) {
445 pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
446 pthread_mutex_lock(mtxp);
448 if (trans->state != HCT_REPLIED && hc->reader_thread)
449 pthread_cond_wait(&trans->cond, mtxp);
451 pthread_mutex_unlock(mtxp);
453 pthread_mutex_lock(&MasterMutex);
454 rhead = (void *)trans->rbuf;
456 rhead = hcc_read_command(hc, trans);
458 if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
466 fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
469 trans->state = HCT_DONE;
473 *__error = rhead->error;
475 errno = rhead->error;
482 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str)
485 int bytes = strlen(str) + 1;
487 item = (void *)(trans->wbuf + trans->windex);
488 assert(trans->windex + sizeof(*item) + bytes < 65536);
489 item->leafid = leafid;
491 item->bytes = sizeof(*item) + bytes;
492 bcopy(str, item + 1, bytes);
493 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
497 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes)
501 item = (void *)(trans->wbuf + trans->windex);
502 assert(trans->windex + sizeof(*item) + bytes < 65536);
503 item->leafid = leafid;
505 item->bytes = sizeof(*item) + bytes;
506 bcopy(ptr, item + 1, bytes);
507 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
511 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value)
515 item = (void *)(trans->wbuf + trans->windex);
516 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
517 item->leafid = leafid;
519 item->bytes = sizeof(*item) + sizeof(value);
520 *(int32_t *)(item + 1) = value;
521 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
525 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value)
529 item = (void *)(trans->wbuf + trans->windex);
530 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
531 item->leafid = leafid;
533 item->bytes = sizeof(*item) + sizeof(value);
534 *(int64_t *)(item + 1) = value;
535 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
539 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type)
541 struct HCHostDesc *hd;
542 struct HCHostDesc *hnew;
544 hnew = malloc(sizeof(struct HCHostDesc));
548 if ((hd = hc->hostdescs) != NULL) {
549 hnew->desc = hd->desc + 1;
554 hc->hostdescs = hnew;
559 hcc_get_descriptor(struct HostConf *hc, int desc, int type)
561 struct HCHostDesc *hd;
563 for (hd = hc->hostdescs; hd; hd = hd->next) {
564 if (hd->desc == desc && hd->type == type)
571 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type)
573 struct HCHostDesc *hd;
574 struct HCHostDesc **hdp;
576 for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) {
577 if (hd->desc == desc) {
589 hd = malloc(sizeof(*hd));
593 hd->next = hc->hostdescs;
599 hcc_firstitem(struct HCHead *head)
604 offset = sizeof(*head);
605 if (offset == head->bytes)
607 assert(head->bytes >= offset + (int)sizeof(*item));
608 item = (void *)(head + 1);
609 assert(head->bytes >= offset + item->bytes);
610 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
615 hcc_nextitem(struct HCHead *head, struct HCLeaf *item)
619 item = (void *)((char *)item + HCC_ALIGN(item->bytes));
620 offset = (char *)item - (char *)head;
621 if (offset == head->bytes)
623 assert(head->bytes >= offset + (int)sizeof(*item));
624 assert(head->bytes >= offset + item->bytes);
625 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
632 hcc_debug_dump(struct HCHead *head)
635 int aligned_bytes = HCC_ALIGN(head->bytes);
637 fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes);
638 if (head->cmd & HCF_REPLY)
639 fprintf(stderr, " error %d", head->error);
640 fprintf(stderr, "\n");
641 for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
642 fprintf(stderr, " ITEM %04x DATA ", item->leafid);
643 switch(item->leafid & LCF_TYPEMASK) {
645 fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1));
648 fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1));
651 fprintf(stderr, "\"%s\"\n", (char *)(item + 1));
654 fprintf(stderr, "(binary)\n");