Add parallel transaction support for remote source or target specifications.
[dragonfly.git] / bin / cpdup / hclink.c
1 /*
2  * HCLINK.C
3  *
4  * This module implements a simple remote control protocol
5  *
6  * $DragonFly: src/bin/cpdup/hclink.c,v 1.5 2008/04/10 22:09:08 dillon Exp $
7  */
8
9 #include "cpdup.h"
10 #include "hclink.h"
11 #include "hcproto.h"
12
13 static struct HCHead *hcc_read_command(hctransaction_t trans, int anypkt);
14 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
15
16 int
17 hcc_connect(struct HostConf *hc)
18 {
19     int fdin[2];
20     int fdout[2];
21
22     if (hc == NULL || hc->host == NULL)
23         return(0);
24
25     if (pipe(fdin) < 0)
26         return(-1);
27     if (pipe(fdout) < 0) {
28         close(fdin[0]);
29         close(fdin[1]);
30         return(-1);
31     }
32     if ((hc->pid = fork()) == 0) {
33         /*
34          * Child process
35          */
36         dup2(fdin[1], 1);
37         close(fdin[0]);
38         close(fdin[1]);
39         dup2(fdout[0], 0);
40         close(fdout[0]);
41         close(fdout[1]);
42         execl("/usr/bin/ssh", "ssh", "-T", hc->host, "cpdup", "-S", (char *) NULL);
43         _exit(1);
44     } else if (hc->pid < 0) {
45         return(-1);
46     } else {
47         /*
48          * Parent process.  Do the initial handshake to make sure we are
49          * actually talking to a cpdup slave.
50          */
51         close(fdin[1]);
52         hc->fdin = fdin[0];
53         close(fdout[0]);
54         hc->fdout = fdout[1];
55         return(0);
56     }
57 }
58
59 static int
60 rc_badop(hctransaction_t trans __unused, struct HCHead *head)
61 {
62     head->error = EOPNOTSUPP;
63     return(0);
64 }
65
66 int
67 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count)
68 {
69     struct HostConf hcslave;
70     struct HCHead *head;
71     struct HCHead *whead;
72     struct HCTransaction trans;
73     int (*dispatch[256])(hctransaction_t, struct HCHead *);
74     int aligned_bytes;
75     int i;
76     int r;
77
78     bzero(&hcslave, sizeof(hcslave));
79     bzero(&trans, sizeof(trans));
80     for (i = 0; i < count; ++i) {
81         struct HCDesc *desc = &descs[i];
82         assert(desc->cmd >= 0 && desc->cmd < 256);
83         dispatch[desc->cmd] = desc->func;
84     }
85     for (i = 0; i < 256; ++i) {
86         if (dispatch[i] == NULL)
87             dispatch[i] = rc_badop;
88     }
89     hcslave.fdin = fdin;
90     hcslave.fdout = fdout;
91     trans.hc = &hcslave;
92
93     /*
94      * Process commands on fdin and write out results on fdout
95      */
96     for (;;) {
97         /*
98          * Get the command
99          */
100         head = hcc_read_command(&trans, 1);
101         if (head == NULL)
102             break;
103
104         /*
105          * Start the reply and dispatch, then process the return code.
106          */
107         head->error = 0;
108         hcc_start_reply(&trans, head);
109
110         r = dispatch[head->cmd & 255](&trans, head);
111
112         switch(r) {
113         case -2:
114                 head->error = EINVAL;
115                 break;
116         case -1:
117                 head->error = errno;
118                 break;
119         case 0:
120                 break;
121         default:
122                 assert(0);
123                 break;
124         }
125
126         /*
127          * Write out the reply
128          */
129         whead = (void *)trans.wbuf;
130         whead->bytes = trans.windex;
131         whead->error = head->error;
132         aligned_bytes = HCC_ALIGN(trans.windex);
133 #ifdef DEBUG
134         hcc_debug_dump(whead);
135 #endif
136         if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes)
137             break;
138     }
139     return(0);
140 }
141
142 /*
143  * This reads a command from fdin, fixes up the byte ordering, and returns
144  * a pointer to HCHead.
145  *
146  * If id is -1 we do not match response id's
147  */
148 static
149 struct HCHead *
150 hcc_read_command(hctransaction_t trans, int anypkt)
151 {
152     struct HostConf *hc = trans->hc;
153     hctransaction_t fill;
154     struct HCHead tmp;
155     int aligned_bytes;
156     int n;
157     int r;
158
159 #if USE_PTHREADS
160     /*
161      * Shortcut if the reply has already been read in by another thread.
162      */
163     if (trans->state == HCT_REPLIED) {
164         return((void *)trans->rbuf);
165     }
166     pthread_mutex_unlock(&MasterMutex);
167     pthread_mutex_lock(&hc->read_mutex);
168 #endif
169     while (trans->state != HCT_REPLIED) {
170         n = 0;
171         while (n < (int)sizeof(struct HCHead)) {
172             r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
173             if (r <= 0)
174                 goto fail;
175             n += r;
176         }
177
178         assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
179         assert(tmp.magic == HCMAGIC);
180
181         if (anypkt == 0 && tmp.id != trans->id && trans->hc->version > 0) {
182 #if USE_PTHREADS
183             for (fill = trans->hc->hct_hash[tmp.id & HCTHASH_MASK];
184                  fill;
185                  fill = fill->next)
186             {
187                 if (fill->state == HCT_SENT && fill->id == tmp.id)
188                         break;
189             }
190             if (fill == NULL)
191 #endif
192             {
193                 fprintf(stderr, 
194                         "cpdup hlink protocol error with %s (%04x %04x)\n",
195                         trans->hc->host, trans->id, tmp.id);
196                 exit(1);
197             }
198         } else {
199             fill = trans;
200         }
201
202         bcopy(&tmp, fill->rbuf, n);
203         aligned_bytes = HCC_ALIGN(tmp.bytes);
204
205         while (n < aligned_bytes) {
206             r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
207             if (r <= 0)
208                 goto fail;
209             n += r;
210         }
211 #ifdef DEBUG
212         hcc_debug_dump(head);
213 #endif
214         fill->state = HCT_REPLIED;
215     }
216 #if USE_PTHREADS
217     pthread_mutex_lock(&MasterMutex);
218     pthread_mutex_unlock(&hc->read_mutex);
219 #endif
220     return((void *)trans->rbuf);
221 fail:
222 #if USE_PTHREADS
223     pthread_mutex_lock(&MasterMutex);
224     pthread_mutex_unlock(&hc->read_mutex);
225 #endif
226     return(NULL);
227 }
228
229 #if USE_PTHREADS
230
231 static
232 hctransaction_t
233 hcc_get_trans(struct HostConf *hc)
234 {
235     hctransaction_t trans;
236     hctransaction_t scan;
237     pthread_t tid = pthread_self();
238     int i;
239
240     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
241
242     for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
243         if (trans->tid == tid)
244                 break;
245     }
246     if (trans == NULL) {
247         trans = malloc(sizeof(*trans));
248         bzero(trans, sizeof(*trans));
249         trans->tid = tid;
250         trans->id = i;
251         do {
252                 for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
253                         if (scan->id == trans->id) {
254                                 trans->id += HCTHASH_SIZE;
255                                 break;
256                         }
257                 }
258         } while (scan != NULL);
259
260         trans->next = hc->hct_hash[i];
261         hc->hct_hash[i] = trans;
262     }
263     return(trans);
264 }
265
266 #else
267
268 static
269 hctransaction_t
270 hcc_get_trans(struct HostConf *hc)
271 {
272     return(&hc->trans);
273 }
274
275 #endif
276
277 /*
278  * Initialize for a new command
279  */
280 hctransaction_t
281 hcc_start_command(struct HostConf *hc, int16_t cmd)
282 {
283     struct HCHead *whead;
284     hctransaction_t trans;
285
286     trans = hcc_get_trans(hc);
287
288     whead = (void *)trans->wbuf;
289     whead->magic = HCMAGIC;
290     whead->bytes = 0;
291     whead->cmd = cmd;
292     whead->id = trans->id;
293     whead->error = 0;
294
295     trans->windex = sizeof(*whead);
296     trans->hc = hc;
297     trans->state = HCT_IDLE;
298
299     return(trans);
300 }
301
302 static void
303 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead)
304 {
305     struct HCHead *whead = (void *)trans->wbuf;
306
307     whead->magic = HCMAGIC;
308     whead->bytes = 0;
309     whead->cmd = rhead->cmd | HCF_REPLY;
310     whead->id = rhead->id;
311     whead->error = 0;
312
313     trans->windex = sizeof(*whead);
314 }
315
316 /*
317  * Finish constructing a command, transmit it, and await the reply.
318  * Return the HCHead of the reply.
319  */
320 struct HCHead *
321 hcc_finish_command(hctransaction_t trans)
322 {
323     struct HCHead *whead;
324     struct HCHead *rhead;
325     int aligned_bytes;
326     int16_t wcmd;
327
328     whead = (void *)trans->wbuf;
329     whead->bytes = trans->windex;
330     aligned_bytes = HCC_ALIGN(trans->windex);
331
332     trans->state = HCT_SENT;
333
334     if (write(trans->hc->fdout, whead, aligned_bytes) != aligned_bytes) {
335 #ifdef __error
336         *__error = EIO;
337 #else
338         errno = EIO;
339 #endif
340         if (whead->cmd < 0x0010)
341                 return(NULL);
342         fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
343         exit(1);
344     }
345
346     wcmd = whead->cmd;
347
348     /*
349      * whead is invalid when we call hcc_read_command() because
350      * we may switch to another thread.
351      */
352     if ((rhead = hcc_read_command(trans, 0)) == NULL) {
353 #ifdef __error
354         *__error = EIO;
355 #else
356         errno = EIO;
357 #endif
358         if (wcmd < 0x0010)
359                 return(NULL);
360         fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
361         exit(1);
362     }
363
364     if (rhead->error) {
365 #ifdef __error
366         *__error = rhead->error;
367 #else
368         errno = rhead->error;
369 #endif
370     }
371     return (rhead);
372 }
373
374 void
375 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str)
376 {
377     struct HCLeaf *item;
378     int bytes = strlen(str) + 1;
379
380     item = (void *)(trans->wbuf + trans->windex);
381     assert(trans->windex + sizeof(*item) + bytes < 65536);
382     item->leafid = leafid;
383     item->reserved = 0;
384     item->bytes = sizeof(*item) + bytes;
385     bcopy(str, item + 1, bytes);
386     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
387 }
388
389 void
390 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes)
391 {
392     struct HCLeaf *item;
393
394     item = (void *)(trans->wbuf + trans->windex);
395     assert(trans->windex + sizeof(*item) + bytes < 65536);
396     item->leafid = leafid;
397     item->reserved = 0;
398     item->bytes = sizeof(*item) + bytes;
399     bcopy(ptr, item + 1, bytes);
400     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
401 }
402
403 void
404 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value)
405 {
406     struct HCLeaf *item;
407
408     item = (void *)(trans->wbuf + trans->windex);
409     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
410     item->leafid = leafid;
411     item->reserved = 0;
412     item->bytes = sizeof(*item) + sizeof(value);
413     *(int32_t *)(item + 1) = value;
414     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
415 }
416
417 void
418 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value)
419 {
420     struct HCLeaf *item;
421
422     item = (void *)(trans->wbuf + trans->windex);
423     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
424     item->leafid = leafid;
425     item->reserved = 0;
426     item->bytes = sizeof(*item) + sizeof(value);
427     *(int64_t *)(item + 1) = value;
428     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
429 }
430
431 int
432 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type)
433 {
434     struct HCHostDesc *hd;
435     struct HCHostDesc *hnew;
436
437     hnew = malloc(sizeof(struct HCHostDesc));
438     hnew->type = type;
439     hnew->data = ptr;
440
441     if ((hd = hc->hostdescs) != NULL) {
442         hnew->desc = hd->desc + 1;
443     } else {
444         hnew->desc = 1;
445     }
446     hnew->next = hd;
447     hc->hostdescs = hnew;
448     return(hnew->desc);
449 }
450
451 void *
452 hcc_get_descriptor(struct HostConf *hc, int desc, int type)
453 {
454     struct HCHostDesc *hd;
455
456     for (hd = hc->hostdescs; hd; hd = hd->next) {
457         if (hd->desc == desc && hd->type == type)
458             return(hd->data);
459     }
460     return(NULL);
461 }
462
463 void
464 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type)
465 {
466     struct HCHostDesc *hd;
467     struct HCHostDesc **hdp;
468
469     for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) {
470         if (hd->desc == desc) {
471             if (ptr) {
472                 hd->data = ptr;
473                 hd->type = type;
474             } else {
475                 *hdp = hd->next;
476                 free(hd);
477             }
478             return;
479         }
480     }
481     if (ptr) {
482         hd = malloc(sizeof(*hd));
483         hd->desc = desc;
484         hd->type = type;
485         hd->data = ptr;
486         hd->next = hc->hostdescs;
487         hc->hostdescs = hd;
488     }
489 }
490
491 struct HCLeaf *
492 hcc_firstitem(struct HCHead *head)
493 {
494     struct HCLeaf *item;
495     int offset;
496
497     offset = sizeof(*head);
498     if (offset == head->bytes)
499         return(NULL);
500     assert(head->bytes >= offset + (int)sizeof(*item));
501     item = (void *)(head + 1);
502     assert(head->bytes >= offset + item->bytes);
503     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
504     return (item);
505 }
506
507 struct HCLeaf *
508 hcc_nextitem(struct HCHead *head, struct HCLeaf *item)
509 {
510     int offset;
511
512     item = (void *)((char *)item + HCC_ALIGN(item->bytes));
513     offset = (char *)item - (char *)head;
514     if (offset == head->bytes)
515         return(NULL);
516     assert(head->bytes >= offset + (int)sizeof(*item));
517     assert(head->bytes >= offset + item->bytes);
518     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
519     return (item);
520 }
521
522 #ifdef DEBUG
523
524 void
525 hcc_debug_dump(struct HCHead *head)
526 {
527     struct HCLeaf *item;
528     int aligned_bytes = HCC_ALIGN(head->bytes);
529
530     fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes);
531     if (head->cmd & HCF_REPLY)
532         fprintf(stderr, " error %d", head->error);
533     fprintf(stderr, "\n");
534     for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
535         fprintf(stderr, "    ITEM %04x DATA ", item->leafid);
536         switch(item->leafid & LCF_TYPEMASK) {
537         case LCF_INT32:
538             fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1));
539             break;
540         case LCF_INT64:
541             fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1));
542             break;
543         case LCF_STRING:
544             fprintf(stderr, "\"%s\"\n", (char *)(item + 1));
545             break;
546         case LCF_BINARY:
547             fprintf(stderr, "(binary)\n");
548             break;
549         default:
550             printf("?\n");
551         }
552     }
553 }
554
555 #endif