Merge branch 'master' of ssh://crater.dragonflybsd.org/repository/git/dragonfly
[dragonfly.git] / bin / cpdup / hclink.c
1 /*
2  * HCLINK.C
3  *
4  * This module implements a simple remote control protocol
5  *
6  * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $
7  */
8
9 #include "cpdup.h"
10 #include "hclink.h"
11 #include "hcproto.h"
12
13 #if USE_PTHREADS
14 static void * hcc_reader_thread(void *arg);
15 #endif
16 static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
17 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
18
19 int
20 hcc_connect(struct HostConf *hc)
21 {
22     int fdin[2];
23     int fdout[2];
24     const char *av[32];
25
26     if (hc == NULL || hc->host == NULL)
27         return(0);
28
29     if (pipe(fdin) < 0)
30         return(-1);
31     if (pipe(fdout) < 0) {
32         close(fdin[0]);
33         close(fdin[1]);
34         return(-1);
35     }
36     if ((hc->pid = fork()) == 0) {
37         /*
38          * Child process
39          */
40         int n, m;
41
42         dup2(fdin[1], 1);
43         close(fdin[0]);
44         close(fdin[1]);
45         dup2(fdout[0], 0);
46         close(fdout[0]);
47         close(fdout[1]);
48
49         n = 0;
50         av[n++] = "ssh";
51         if (CompressOpt)
52             av[n++] = "-C";
53         for (m = 0; m < ssh_argc; m++)
54             av[n++] = ssh_argv[m];
55         av[n++] = "-T";
56         av[n++] = hc->host;
57         av[n++] = "cpdup";
58         av[n++] = "-S";
59         av[n++] = NULL;
60
61         execv("/usr/bin/ssh", (void *)av);
62         _exit(1);
63     } else if (hc->pid < 0) {
64         return(-1);
65     } else {
66         /*
67          * Parent process.  Do the initial handshake to make sure we are
68          * actually talking to a cpdup slave.
69          */
70         close(fdin[1]);
71         hc->fdin = fdin[0];
72         close(fdout[0]);
73         hc->fdout = fdout[1];
74 #if USE_PTHREADS
75         pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
76 #endif
77         return(0);
78     }
79 }
80
81 static int
82 rc_badop(hctransaction_t trans __unused, struct HCHead *head)
83 {
84     head->error = EOPNOTSUPP;
85     return(0);
86 }
87
88 int
89 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count)
90 {
91     struct HostConf hcslave;
92     struct HCHead *head;
93     struct HCHead *whead;
94     struct HCTransaction trans;
95     int (*dispatch[256])(hctransaction_t, struct HCHead *);
96     int aligned_bytes;
97     int i;
98     int r;
99
100     bzero(&hcslave, sizeof(hcslave));
101     bzero(&trans, sizeof(trans));
102     for (i = 0; i < count; ++i) {
103         struct HCDesc *desc = &descs[i];
104         assert(desc->cmd >= 0 && desc->cmd < 256);
105         dispatch[desc->cmd] = desc->func;
106     }
107     for (i = 0; i < 256; ++i) {
108         if (dispatch[i] == NULL)
109             dispatch[i] = rc_badop;
110     }
111     hcslave.fdin = fdin;
112     hcslave.fdout = fdout;
113     trans.hc = &hcslave;
114
115 #if USE_PTHREADS
116     pthread_mutex_unlock(&MasterMutex);
117 #endif
118     /*
119      * Process commands on fdin and write out results on fdout
120      */
121     for (;;) {
122         /*
123          * Get the command
124          */
125         head = hcc_read_command(trans.hc, &trans);
126         if (head == NULL)
127             break;
128
129         /*
130          * Start the reply and dispatch, then process the return code.
131          */
132         head->error = 0;
133         hcc_start_reply(&trans, head);
134
135         r = dispatch[head->cmd & 255](&trans, head);
136
137         switch(r) {
138         case -2:
139                 head->error = EINVAL;
140                 break;
141         case -1:
142                 head->error = errno;
143                 break;
144         case 0:
145                 break;
146         default:
147                 assert(0);
148                 break;
149         }
150
151         /*
152          * Write out the reply
153          */
154         whead = (void *)trans.wbuf;
155         whead->bytes = trans.windex;
156         whead->error = head->error;
157         aligned_bytes = HCC_ALIGN(trans.windex);
158 #ifdef DEBUG
159         hcc_debug_dump(whead);
160 #endif
161         if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes)
162             break;
163     }
164     return(0);
165 }
166
167 #if USE_PTHREADS
168 /*
169  * This thread collects responses from the link.  It is run without
170  * the MasterMutex.
171  */
172 static void *
173 hcc_reader_thread(void *arg)
174 {
175     struct HostConf *hc = arg;
176     struct HCHead *rhead;
177     hctransaction_t scan;
178     int i;
179
180     pthread_detach(pthread_self());
181     while (hcc_read_command(hc, NULL) != NULL)
182         ;
183     hc->reader_thread = NULL;
184
185     /*
186      * Clean up any threads stuck waiting for a reply.
187      */
188     pthread_mutex_lock(&MasterMutex);
189     for (i = 0; i < HCTHASH_SIZE; ++i) {
190         pthread_mutex_lock(&hc->hct_mutex[i]);
191         for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
192             if (scan->state == HCT_SENT) {
193                 scan->state = HCT_REPLIED;
194                 rhead = (void *)scan->rbuf;
195                 rhead->error = ENOTCONN;
196                 if (scan->waiting)
197                     pthread_cond_signal(&scan->cond);
198             }
199         }
200         pthread_mutex_unlock(&hc->hct_mutex[i]);
201     }
202     pthread_mutex_unlock(&MasterMutex);
203     return(NULL);
204 }
205
206 #endif
207
208 /*
209  * This reads a command from fdin, fixes up the byte ordering, and returns
210  * a pointer to HCHead.
211  *
212  * The MasterMutex may or may not be held.  When threaded this command
213  * is serialized by a reader thread.
214  */
215 static
216 struct HCHead *
217 hcc_read_command(struct HostConf *hc, hctransaction_t trans)
218 {
219     hctransaction_t fill;
220     struct HCHead tmp;
221     int aligned_bytes;
222     int n;
223     int r;
224
225     n = 0;
226     while (n < (int)sizeof(struct HCHead)) {
227         r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
228         if (r <= 0)
229             goto fail;
230         n += r;
231     }
232
233     assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
234     assert(tmp.magic == HCMAGIC);
235
236     if (trans) {
237         fill = trans;
238     } else {
239 #if USE_PTHREADS
240         pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
241         for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
242              fill;
243              fill = fill->next)
244         {
245             if (fill->state == HCT_SENT && fill->id == tmp.id)
246                     break;
247         }
248         pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
249         if (fill == NULL)
250 #endif
251         {
252             fprintf(stderr, 
253                     "cpdup hlink protocol error with %s (%04x)\n",
254                     hc->host, tmp.id);
255             exit(1);
256         }
257     }
258
259     bcopy(&tmp, fill->rbuf, n);
260     aligned_bytes = HCC_ALIGN(tmp.bytes);
261
262     while (n < aligned_bytes) {
263         r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
264         if (r <= 0)
265             goto fail;
266         n += r;
267     }
268 #ifdef DEBUG
269     hcc_debug_dump(head);
270 #endif
271 #if USE_PTHREADS
272     pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
273 #endif
274     fill->state = HCT_REPLIED;
275 #if USE_PTHREADS
276     if (fill->waiting)
277         pthread_cond_signal(&fill->cond);
278     pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
279 #endif
280     return((void *)fill->rbuf);
281 fail:
282     return(NULL);
283 }
284
285 #if USE_PTHREADS
286
287 static
288 hctransaction_t
289 hcc_get_trans(struct HostConf *hc)
290 {
291     hctransaction_t trans;
292     hctransaction_t scan;
293     pthread_t tid = pthread_self();
294     int i;
295
296     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
297
298     pthread_mutex_lock(&hc->hct_mutex[i]);
299     for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
300         if (trans->tid == tid)
301                 break;
302     }
303     if (trans == NULL) {
304         trans = malloc(sizeof(*trans));
305         bzero(trans, sizeof(*trans));
306         trans->tid = tid;
307         trans->id = i;
308         pthread_cond_init(&trans->cond, NULL);
309         do {
310                 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
311                         if (scan->id == trans->id) {
312                                 trans->id += HCTHASH_SIZE;
313                                 break;
314                         }
315                 }
316         } while (scan != NULL);
317
318         trans->next = hc->hct_hash[i];
319         hc->hct_hash[i] = trans;
320     }
321     pthread_mutex_unlock(&hc->hct_mutex[i]);
322     return(trans);
323 }
324
325 void
326 hcc_free_trans(struct HostConf *hc)
327 {
328     hctransaction_t trans;
329     hctransaction_t *transp;
330     pthread_t tid = pthread_self();
331     int i;
332
333     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
334
335     pthread_mutex_lock(&hc->hct_mutex[i]);
336     for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
337         trans = *transp;
338         if (trans->tid == tid) {
339                 *transp = trans->next;
340                 pthread_cond_destroy(&trans->cond);
341                 free(trans);
342                 break;
343         }
344     }
345     pthread_mutex_unlock(&hc->hct_mutex[i]);
346 }
347
348 #else
349
350 static
351 hctransaction_t
352 hcc_get_trans(struct HostConf *hc)
353 {
354     return(&hc->trans);
355 }
356
357 void
358 hcc_free_trans(struct HostConf *hc __unused)
359 {
360     /* nop */
361 }
362
363 #endif
364
365 /*
366  * Initialize for a new command
367  */
368 hctransaction_t
369 hcc_start_command(struct HostConf *hc, int16_t cmd)
370 {
371     struct HCHead *whead;
372     hctransaction_t trans;
373
374     trans = hcc_get_trans(hc);
375
376     whead = (void *)trans->wbuf;
377     whead->magic = HCMAGIC;
378     whead->bytes = 0;
379     whead->cmd = cmd;
380     whead->id = trans->id;
381     whead->error = 0;
382
383     trans->windex = sizeof(*whead);
384     trans->hc = hc;
385     trans->state = HCT_IDLE;
386
387     return(trans);
388 }
389
390 static void
391 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead)
392 {
393     struct HCHead *whead = (void *)trans->wbuf;
394
395     whead->magic = HCMAGIC;
396     whead->bytes = 0;
397     whead->cmd = rhead->cmd | HCF_REPLY;
398     whead->id = rhead->id;
399     whead->error = 0;
400
401     trans->windex = sizeof(*whead);
402 }
403
404 /*
405  * Finish constructing a command, transmit it, and await the reply.
406  * Return the HCHead of the reply.
407  */
408 struct HCHead *
409 hcc_finish_command(hctransaction_t trans)
410 {
411     struct HostConf *hc;
412     struct HCHead *whead;
413     struct HCHead *rhead;
414     int aligned_bytes;
415     int16_t wcmd;
416
417     hc = trans->hc;
418     whead = (void *)trans->wbuf;
419     whead->bytes = trans->windex;
420     aligned_bytes = HCC_ALIGN(trans->windex);
421
422     trans->state = HCT_SENT;
423
424     if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
425 #ifdef __error
426         *__error = EIO;
427 #else
428         errno = EIO;
429 #endif
430         if (whead->cmd < 0x0010)
431                 return(NULL);
432         fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
433         exit(1);
434     }
435
436     wcmd = whead->cmd;
437
438     /*
439      * whead is invalid when we call hcc_read_command() because
440      * we may switch to another thread.
441      */
442 #if USE_PTHREADS
443     pthread_mutex_unlock(&MasterMutex);
444     while (trans->state != HCT_REPLIED && hc->reader_thread) {
445         pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
446         pthread_mutex_lock(mtxp);
447         trans->waiting = 1;
448         if (trans->state != HCT_REPLIED && hc->reader_thread)
449                 pthread_cond_wait(&trans->cond, mtxp);
450         trans->waiting = 0;
451         pthread_mutex_unlock(mtxp);
452     }
453     pthread_mutex_lock(&MasterMutex);
454     rhead = (void *)trans->rbuf;
455 #else
456     rhead = hcc_read_command(hc, trans);
457 #endif
458     if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
459 #ifdef __error
460         *__error = EIO;
461 #else
462         errno = EIO;
463 #endif
464         if (wcmd < 0x0010)
465                 return(NULL);
466         fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
467         exit(1);
468     }
469     trans->state = HCT_DONE;
470
471     if (rhead->error) {
472 #ifdef __error
473         *__error = rhead->error;
474 #else
475         errno = rhead->error;
476 #endif
477     }
478     return (rhead);
479 }
480
481 void
482 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str)
483 {
484     struct HCLeaf *item;
485     int bytes = strlen(str) + 1;
486
487     item = (void *)(trans->wbuf + trans->windex);
488     assert(trans->windex + sizeof(*item) + bytes < 65536);
489     item->leafid = leafid;
490     item->reserved = 0;
491     item->bytes = sizeof(*item) + bytes;
492     bcopy(str, item + 1, bytes);
493     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
494 }
495
496 void
497 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes)
498 {
499     struct HCLeaf *item;
500
501     item = (void *)(trans->wbuf + trans->windex);
502     assert(trans->windex + sizeof(*item) + bytes < 65536);
503     item->leafid = leafid;
504     item->reserved = 0;
505     item->bytes = sizeof(*item) + bytes;
506     bcopy(ptr, item + 1, bytes);
507     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
508 }
509
510 void
511 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value)
512 {
513     struct HCLeaf *item;
514
515     item = (void *)(trans->wbuf + trans->windex);
516     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
517     item->leafid = leafid;
518     item->reserved = 0;
519     item->bytes = sizeof(*item) + sizeof(value);
520     *(int32_t *)(item + 1) = value;
521     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
522 }
523
524 void
525 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value)
526 {
527     struct HCLeaf *item;
528
529     item = (void *)(trans->wbuf + trans->windex);
530     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
531     item->leafid = leafid;
532     item->reserved = 0;
533     item->bytes = sizeof(*item) + sizeof(value);
534     *(int64_t *)(item + 1) = value;
535     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
536 }
537
538 int
539 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type)
540 {
541     struct HCHostDesc *hd;
542     struct HCHostDesc *hnew;
543
544     hnew = malloc(sizeof(struct HCHostDesc));
545     hnew->type = type;
546     hnew->data = ptr;
547
548     if ((hd = hc->hostdescs) != NULL) {
549         hnew->desc = hd->desc + 1;
550     } else {
551         hnew->desc = 1;
552     }
553     hnew->next = hd;
554     hc->hostdescs = hnew;
555     return(hnew->desc);
556 }
557
558 void *
559 hcc_get_descriptor(struct HostConf *hc, int desc, int type)
560 {
561     struct HCHostDesc *hd;
562
563     for (hd = hc->hostdescs; hd; hd = hd->next) {
564         if (hd->desc == desc && hd->type == type)
565             return(hd->data);
566     }
567     return(NULL);
568 }
569
570 void
571 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type)
572 {
573     struct HCHostDesc *hd;
574     struct HCHostDesc **hdp;
575
576     for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) {
577         if (hd->desc == desc) {
578             if (ptr) {
579                 hd->data = ptr;
580                 hd->type = type;
581             } else {
582                 *hdp = hd->next;
583                 free(hd);
584             }
585             return;
586         }
587     }
588     if (ptr) {
589         hd = malloc(sizeof(*hd));
590         hd->desc = desc;
591         hd->type = type;
592         hd->data = ptr;
593         hd->next = hc->hostdescs;
594         hc->hostdescs = hd;
595     }
596 }
597
598 struct HCLeaf *
599 hcc_firstitem(struct HCHead *head)
600 {
601     struct HCLeaf *item;
602     int offset;
603
604     offset = sizeof(*head);
605     if (offset == head->bytes)
606         return(NULL);
607     assert(head->bytes >= offset + (int)sizeof(*item));
608     item = (void *)(head + 1);
609     assert(head->bytes >= offset + item->bytes);
610     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
611     return (item);
612 }
613
614 struct HCLeaf *
615 hcc_nextitem(struct HCHead *head, struct HCLeaf *item)
616 {
617     int offset;
618
619     item = (void *)((char *)item + HCC_ALIGN(item->bytes));
620     offset = (char *)item - (char *)head;
621     if (offset == head->bytes)
622         return(NULL);
623     assert(head->bytes >= offset + (int)sizeof(*item));
624     assert(head->bytes >= offset + item->bytes);
625     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
626     return (item);
627 }
628
629 #ifdef DEBUG
630
631 void
632 hcc_debug_dump(struct HCHead *head)
633 {
634     struct HCLeaf *item;
635     int aligned_bytes = HCC_ALIGN(head->bytes);
636
637     fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes);
638     if (head->cmd & HCF_REPLY)
639         fprintf(stderr, " error %d", head->error);
640     fprintf(stderr, "\n");
641     for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
642         fprintf(stderr, "    ITEM %04x DATA ", item->leafid);
643         switch(item->leafid & LCF_TYPEMASK) {
644         case LCF_INT32:
645             fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1));
646             break;
647         case LCF_INT64:
648             fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1));
649             break;
650         case LCF_STRING:
651             fprintf(stderr, "\"%s\"\n", (char *)(item + 1));
652             break;
653         case LCF_BINARY:
654             fprintf(stderr, "(binary)\n");
655             break;
656         default:
657             printf("?\n");
658         }
659     }
660 }
661
662 #endif