4 * This module implements a simple remote control protocol
6 * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $
14 static void * hcc_reader_thread(void *arg);
16 static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
17 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
20 hcc_connect(struct HostConf *hc)
26 if (hc == NULL || hc->host == NULL)
31 if (pipe(fdout) < 0) {
36 if ((hc->pid = fork()) == 0) {
59 execv("/usr/bin/ssh", (void *)av);
61 } else if (hc->pid < 0) {
65 * Parent process. Do the initial handshake to make sure we are
66 * actually talking to a cpdup slave.
73 pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
80 rc_badop(hctransaction_t trans __unused, struct HCHead *head)
82 head->error = EOPNOTSUPP;
87 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count)
89 struct HostConf hcslave;
92 struct HCTransaction trans;
93 int (*dispatch[256])(hctransaction_t, struct HCHead *);
98 bzero(&hcslave, sizeof(hcslave));
99 bzero(&trans, sizeof(trans));
100 for (i = 0; i < count; ++i) {
101 struct HCDesc *desc = &descs[i];
102 assert(desc->cmd >= 0 && desc->cmd < 256);
103 dispatch[desc->cmd] = desc->func;
105 for (i = 0; i < 256; ++i) {
106 if (dispatch[i] == NULL)
107 dispatch[i] = rc_badop;
110 hcslave.fdout = fdout;
114 pthread_mutex_unlock(&MasterMutex);
117 * Process commands on fdin and write out results on fdout
123 head = hcc_read_command(trans.hc, &trans);
128 * Start the reply and dispatch, then process the return code.
131 hcc_start_reply(&trans, head);
133 r = dispatch[head->cmd & 255](&trans, head);
137 head->error = EINVAL;
150 * Write out the reply
152 whead = (void *)trans.wbuf;
153 whead->bytes = trans.windex;
154 whead->error = head->error;
155 aligned_bytes = HCC_ALIGN(trans.windex);
157 hcc_debug_dump(whead);
159 if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes)
167 * This thread collects responses from the link. It is run without
171 hcc_reader_thread(void *arg)
173 struct HostConf *hc = arg;
174 struct HCHead *rhead;
175 hctransaction_t scan;
178 pthread_detach(pthread_self());
179 while (hcc_read_command(hc, NULL) != NULL)
181 hc->reader_thread = NULL;
184 * Clean up any threads stuck waiting for a reply.
186 pthread_mutex_lock(&MasterMutex);
187 for (i = 0; i < HCTHASH_SIZE; ++i) {
188 pthread_mutex_lock(&hc->hct_mutex[i]);
189 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
190 if (scan->state == HCT_SENT) {
191 scan->state = HCT_REPLIED;
192 rhead = (void *)scan->rbuf;
193 rhead->error = ENOTCONN;
195 pthread_cond_signal(&scan->cond);
198 pthread_mutex_unlock(&hc->hct_mutex[i]);
200 pthread_mutex_unlock(&MasterMutex);
207 * This reads a command from fdin, fixes up the byte ordering, and returns
208 * a pointer to HCHead.
210 * The MasterMutex may or may not be held. When threaded this command
211 * is serialized by a reader thread.
215 hcc_read_command(struct HostConf *hc, hctransaction_t trans)
217 hctransaction_t fill;
224 while (n < (int)sizeof(struct HCHead)) {
225 r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
231 assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
232 assert(tmp.magic == HCMAGIC);
238 pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
239 for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
243 if (fill->state == HCT_SENT && fill->id == tmp.id)
246 pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
251 "cpdup hlink protocol error with %s (%04x %04x)\n",
252 hc->host, trans->id, tmp.id);
257 bcopy(&tmp, fill->rbuf, n);
258 aligned_bytes = HCC_ALIGN(tmp.bytes);
260 while (n < aligned_bytes) {
261 r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
267 hcc_debug_dump(head);
270 pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
272 fill->state = HCT_REPLIED;
275 pthread_cond_signal(&fill->cond);
276 pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
278 return((void *)fill->rbuf);
287 hcc_get_trans(struct HostConf *hc)
289 hctransaction_t trans;
290 hctransaction_t scan;
291 pthread_t tid = pthread_self();
294 i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
296 pthread_mutex_lock(&hc->hct_mutex[i]);
297 for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
298 if (trans->tid == tid)
302 trans = malloc(sizeof(*trans));
303 bzero(trans, sizeof(*trans));
306 pthread_cond_init(&trans->cond, NULL);
308 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
309 if (scan->id == trans->id) {
310 trans->id += HCTHASH_SIZE;
314 } while (scan != NULL);
316 trans->next = hc->hct_hash[i];
317 hc->hct_hash[i] = trans;
319 pthread_mutex_unlock(&hc->hct_mutex[i]);
324 hcc_free_trans(struct HostConf *hc)
326 hctransaction_t trans;
327 hctransaction_t *transp;
328 pthread_t tid = pthread_self();
331 i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
333 pthread_mutex_lock(&hc->hct_mutex[i]);
334 for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
336 if (trans->tid == tid) {
337 *transp = trans->next;
338 pthread_cond_destroy(&trans->cond);
343 pthread_mutex_unlock(&hc->hct_mutex[i]);
350 hcc_get_trans(struct HostConf *hc)
356 hcc_free_trans(struct HostConf *hc __unused)
364 * Initialize for a new command
367 hcc_start_command(struct HostConf *hc, int16_t cmd)
369 struct HCHead *whead;
370 hctransaction_t trans;
372 trans = hcc_get_trans(hc);
374 whead = (void *)trans->wbuf;
375 whead->magic = HCMAGIC;
378 whead->id = trans->id;
381 trans->windex = sizeof(*whead);
383 trans->state = HCT_IDLE;
389 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead)
391 struct HCHead *whead = (void *)trans->wbuf;
393 whead->magic = HCMAGIC;
395 whead->cmd = rhead->cmd | HCF_REPLY;
396 whead->id = rhead->id;
399 trans->windex = sizeof(*whead);
403 * Finish constructing a command, transmit it, and await the reply.
404 * Return the HCHead of the reply.
407 hcc_finish_command(hctransaction_t trans)
410 struct HCHead *whead;
411 struct HCHead *rhead;
416 whead = (void *)trans->wbuf;
417 whead->bytes = trans->windex;
418 aligned_bytes = HCC_ALIGN(trans->windex);
420 trans->state = HCT_SENT;
422 if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
428 if (whead->cmd < 0x0010)
430 fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
437 * whead is invalid when we call hcc_read_command() because
438 * we may switch to another thread.
441 pthread_mutex_unlock(&MasterMutex);
442 while (trans->state != HCT_REPLIED && hc->reader_thread) {
443 pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
444 pthread_mutex_lock(mtxp);
446 if (trans->state != HCT_REPLIED && hc->reader_thread)
447 pthread_cond_wait(&trans->cond, mtxp);
449 pthread_mutex_unlock(mtxp);
451 pthread_mutex_lock(&MasterMutex);
452 rhead = (void *)trans->rbuf;
454 rhead = hcc_read_command(hc, trans);
456 if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
464 fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
467 trans->state = HCT_DONE;
471 *__error = rhead->error;
473 errno = rhead->error;
480 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str)
483 int bytes = strlen(str) + 1;
485 item = (void *)(trans->wbuf + trans->windex);
486 assert(trans->windex + sizeof(*item) + bytes < 65536);
487 item->leafid = leafid;
489 item->bytes = sizeof(*item) + bytes;
490 bcopy(str, item + 1, bytes);
491 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
495 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes)
499 item = (void *)(trans->wbuf + trans->windex);
500 assert(trans->windex + sizeof(*item) + bytes < 65536);
501 item->leafid = leafid;
503 item->bytes = sizeof(*item) + bytes;
504 bcopy(ptr, item + 1, bytes);
505 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
509 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value)
513 item = (void *)(trans->wbuf + trans->windex);
514 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
515 item->leafid = leafid;
517 item->bytes = sizeof(*item) + sizeof(value);
518 *(int32_t *)(item + 1) = value;
519 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
523 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value)
527 item = (void *)(trans->wbuf + trans->windex);
528 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
529 item->leafid = leafid;
531 item->bytes = sizeof(*item) + sizeof(value);
532 *(int64_t *)(item + 1) = value;
533 trans->windex = HCC_ALIGN(trans->windex + item->bytes);
537 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type)
539 struct HCHostDesc *hd;
540 struct HCHostDesc *hnew;
542 hnew = malloc(sizeof(struct HCHostDesc));
546 if ((hd = hc->hostdescs) != NULL) {
547 hnew->desc = hd->desc + 1;
552 hc->hostdescs = hnew;
557 hcc_get_descriptor(struct HostConf *hc, int desc, int type)
559 struct HCHostDesc *hd;
561 for (hd = hc->hostdescs; hd; hd = hd->next) {
562 if (hd->desc == desc && hd->type == type)
569 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type)
571 struct HCHostDesc *hd;
572 struct HCHostDesc **hdp;
574 for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) {
575 if (hd->desc == desc) {
587 hd = malloc(sizeof(*hd));
591 hd->next = hc->hostdescs;
597 hcc_firstitem(struct HCHead *head)
602 offset = sizeof(*head);
603 if (offset == head->bytes)
605 assert(head->bytes >= offset + (int)sizeof(*item));
606 item = (void *)(head + 1);
607 assert(head->bytes >= offset + item->bytes);
608 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
613 hcc_nextitem(struct HCHead *head, struct HCLeaf *item)
617 item = (void *)((char *)item + HCC_ALIGN(item->bytes));
618 offset = (char *)item - (char *)head;
619 if (offset == head->bytes)
621 assert(head->bytes >= offset + (int)sizeof(*item));
622 assert(head->bytes >= offset + item->bytes);
623 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
630 hcc_debug_dump(struct HCHead *head)
633 int aligned_bytes = HCC_ALIGN(head->bytes);
635 fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes);
636 if (head->cmd & HCF_REPLY)
637 fprintf(stderr, " error %d", head->error);
638 fprintf(stderr, "\n");
639 for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
640 fprintf(stderr, " ITEM %04x DATA ", item->leafid);
641 switch(item->leafid & LCF_TYPEMASK) {
643 fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1));
646 fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1));
649 fprintf(stderr, "\"%s\"\n", (char *)(item + 1));
652 fprintf(stderr, "(binary)\n");