HAMMER Utilities: Sync with 60J
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.7 2008/07/10 04:44:58 dillon Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static void validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
51 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
52 static void mirror_usage(int code);
53
54 /*
55  * Generate a mirroring data stream from the specific source over the
56  * entire key range, but restricted to the specified transaction range.
57  *
58  * The HAMMER VFS does most of the work, we add a few new mrecord
59  * types to negotiate the TID ranges and verify that the entire
60  * stream made it to the destination.
61  */
62 void
63 hammer_cmd_mirror_read(char **av, int ac)
64 {
65         struct hammer_ioc_mirror_rw mirror;
66         struct hammer_ioc_pseudofs_rw pfs;
67         union hammer_ioc_mrecord_any mrec_tmp;
68         hammer_ioc_mrecord_any_t mrec;
69         hammer_tid_t sync_tid;
70         const char *filesystem;
71         char *buf = malloc(SERIALBUF_SIZE);
72         int interrupted = 0;
73         int error;
74         int fd;
75         int n;
76         time_t base_t = time(NULL);
77
78         if (ac > 2)
79                 mirror_usage(1);
80         filesystem = av[0];
81
82         bzero(&mirror, sizeof(mirror));
83         hammer_key_beg_init(&mirror.key_beg);
84         hammer_key_end_init(&mirror.key_end);
85
86         fd = getpfs(&pfs, filesystem);
87
88         /*
89          * In 2-way mode the target will send us a PFS info packet
90          * first.  Use the target's current snapshot TID as our default
91          * begin TID.
92          */
93         mirror.tid_beg = 0;
94         if (TwoWayPipeOpt)
95                 validate_mrec_header(fd, 0, 0, pfs.pfs_id,
96                                      NULL, &mirror.tid_beg);
97
98         /*
99          * Write out the PFS header, tid_beg will be updated if our PFS
100          * has a larger begin sync.  tid_end is set to the latest source
101          * TID whos flush cycle has completed.
102          */
103         generate_mrec_header(fd, 1, pfs.pfs_id,
104                              &mirror.tid_beg, &mirror.tid_end);
105
106         /*
107          * A cycle file overrides the beginning TID
108          */
109         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
110
111         if (ac == 2)
112                 mirror.tid_beg = strtoull(av[1], NULL, 0);
113
114         fprintf(stderr, "mirror-read: Mirror from %016llx to %016llx\n",
115                 mirror.tid_beg, mirror.tid_end);
116         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
117                 fprintf(stderr, "mirror-read: Resuming at object %016llx\n",
118                         mirror.key_beg.obj_id);
119         }
120
121         /*
122          * Write out bulk records
123          */
124         mirror.ubuf = buf;
125         mirror.size = SERIALBUF_SIZE;
126
127         do {
128                 mirror.count = 0;
129                 mirror.pfs_id = pfs.pfs_id;
130                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
131                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
132                         fprintf(stderr, "Mirror-read %s failed: %s\n",
133                                 filesystem, strerror(errno));
134                         exit(1);
135                 }
136                 if (mirror.count) {
137                         n = write(1, mirror.ubuf, mirror.count);
138                         if (n != mirror.count) {
139                                 fprintf(stderr, "Mirror-read %s failed: "
140                                                 "short write\n",
141                                 filesystem);
142                                 exit(1);
143                         }
144                 }
145                 mirror.key_beg = mirror.key_cur;
146                 if (TimeoutOpt &&
147                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
148                         fprintf(stderr,
149                                 "Mirror-read %s interrupted by timer at"
150                                 " %016llx\n",
151                                 filesystem,
152                                 mirror.key_cur.obj_id);
153                         interrupted = 1;
154                         break;
155                 }
156         } while (mirror.count != 0);
157
158         /*
159          * Write out the termination sync record
160          */
161         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
162                       &mrec_tmp, sizeof(mrec_tmp.sync));
163
164         /*
165          * If the -2 option was given (automatic when doing mirror-copy),
166          * a two-way pipe is assumed and we expect a response mrec from
167          * the target.
168          */
169         if (TwoWayPipeOpt) {
170                 mrec = read_mrecord(0, &error, NULL);
171                 if (mrec == NULL || 
172                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
173                     mrec->head.rec_size != sizeof(mrec->update)) {
174                         fprintf(stderr, "mirror_read: Did not get final "
175                                         "acknowledgement packet from target\n");
176                         exit(1);
177                 }
178                 if (interrupted) {
179                         if (CyclePath) {
180                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
181                                 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
182                         }
183                 } else {
184                         sync_tid = mrec->update.tid;
185                         if (CyclePath) {
186                                 hammer_key_beg_init(&mirror.key_beg);
187                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
188                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
189                                         CyclePath, sync_tid);
190                         } else {
191                                 fprintf(stderr, "Source can update synctid "
192                                                 "to 0x%016llx\n",
193                                         sync_tid);
194                         }
195                 }
196         } else if (CyclePath) {
197                 /* NOTE! mirror.tid_beg cannot be updated */
198                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
199                                 "fully updated unless you use mirror-copy\n");
200                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
201         }
202         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
203 }
204
205 /*
206  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
207  * some additional packet types to negotiate TID ranges and to verify
208  * completion.  The HAMMER VFS does most of the work.
209  *
210  * It is important to note that the mirror.key_{beg,end} range must
211  * match the ranged used by the original.  For now both sides use
212  * range the entire key space.
213  *
214  * It is even more important that the records in the stream conform
215  * to the TID range also supplied in the stream.  The HAMMER VFS will
216  * use the REC, PASS, and SKIP record types to track the portions of
217  * the B-Tree being scanned in order to be able to proactively delete
218  * records on the target within those active areas that are not mentioned
219  * by the source.
220  *
221  * The mirror.key_cur field is used by the VFS to do this tracking.  It
222  * must be initialized to key_beg but then is persistently updated by
223  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
224  * field you will blow up the mirror target, possibly to the point of
225  * deleting everything.  As a safety measure the HAMMER VFS simply marks
226  * the records that the source has destroyed as deleted on the target,
227  * and normal pruning operations will deal with their final disposition
228  * at some later time.
229  */
230 void
231 hammer_cmd_mirror_write(char **av, int ac)
232 {
233         struct hammer_ioc_mirror_rw mirror;
234         const char *filesystem;
235         char *buf = malloc(SERIALBUF_SIZE);
236         struct hammer_ioc_pseudofs_rw pfs;
237         struct hammer_ioc_mrecord_head pickup;
238         struct hammer_ioc_synctid synctid;
239         union hammer_ioc_mrecord_any mrec_tmp;
240         hammer_ioc_mrecord_any_t mrec;
241         int error;
242         int fd;
243
244         if (ac > 2)
245                 mirror_usage(1);
246         filesystem = av[0];
247
248         bzero(&mirror, sizeof(mirror));
249         hammer_key_beg_init(&mirror.key_beg);
250         hammer_key_end_init(&mirror.key_end);
251         mirror.key_end = mirror.key_beg;
252
253         fd = getpfs(&pfs, filesystem);
254
255         /*
256          * In two-way mode the target writes out a PFS packet first.
257          * The source uses our tid_end as its tid_beg by default,
258          * picking up where it left off.
259          */
260         mirror.tid_beg = 0;
261         if (TwoWayPipeOpt) {
262                 generate_mrec_header(fd, 1, pfs.pfs_id,
263                                      &mirror.tid_beg, &mirror.tid_end);
264         }
265
266         /*
267          * Read and process the PFS header.  The source informs us of
268          * the TID range the stream represents.
269          */
270         validate_mrec_header(fd, 0, 1, pfs.pfs_id,
271                              &mirror.tid_beg, &mirror.tid_end);
272
273         mirror.ubuf = buf;
274         mirror.size = SERIALBUF_SIZE;
275
276         pickup.signature = 0;
277         pickup.type = 0;
278
279         /*
280          * Read and process bulk records (REC, PASS, and SKIP types).
281          *
282          * On your life, do NOT mess with mirror.key_cur or your mirror
283          * target may become history.
284          */
285         for (;;) {
286                 mirror.count = 0;
287                 mirror.pfs_id = pfs.pfs_id;
288                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
289                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
290                 if (mirror.size <= 0)
291                         break;
292                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
293                         fprintf(stderr, "Mirror-write %s failed: %s\n",
294                                 filesystem, strerror(errno));
295                         exit(1);
296                 }
297                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
298                         fprintf(stderr,
299                                 "Mirror-write %s fatal error %d\n",
300                                 filesystem, mirror.head.error);
301                         exit(1);
302                 }
303 #if 0
304                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
305                         fprintf(stderr,
306                                 "Mirror-write %s interrupted by timer at"
307                                 " %016llx\n",
308                                 filesystem,
309                                 mirror.key_cur.obj_id);
310                         exit(0);
311                 }
312 #endif
313         }
314
315         /*
316          * Read and process the termination sync record.
317          */
318         mrec = read_mrecord(0, &error, &pickup);
319         if (mrec == NULL || 
320             mrec->head.type != HAMMER_MREC_TYPE_SYNC ||
321             mrec->head.rec_size != sizeof(mrec->sync)) {
322                 fprintf(stderr, "Mirror-write %s: Did not get termination "
323                                 "sync record, or rec_size is wrong rt=%d\n",
324                                 filesystem, mrec->head.type);
325         }
326         free(mrec);
327         mrec = NULL;
328
329         /*
330          * Update the PFS info on the target so the user has visibility
331          * into the new snapshot.
332          */
333         update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
334
335         /*
336          * Sync the target filesystem
337          */
338         bzero(&synctid, sizeof(synctid));
339         synctid.op = HAMMER_SYNCTID_SYNC2;
340         ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
341
342         fprintf(stderr, "Mirror-write %s: succeeded\n", filesystem);
343
344         /*
345          * Report back to the originator.
346          */
347         if (TwoWayPipeOpt) {
348                 mrec_tmp.update.tid = mirror.tid_end;
349                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
350                               &mrec_tmp, sizeof(mrec_tmp.update));
351         } else {
352                 printf("Source can update synctid to 0x%016llx\n",
353                        mirror.tid_end);
354         }
355 }
356
357 void
358 hammer_cmd_mirror_dump(void)
359 {
360         char *buf = malloc(SERIALBUF_SIZE);
361         struct hammer_ioc_mrecord_head pickup;
362         hammer_ioc_mrecord_any_t mrec;
363         int error;
364         int size;
365         int offset;
366         int bytes;
367
368         /*
369          * Read and process the PFS header 
370          */
371         pickup.signature = 0;
372         pickup.type = 0;
373
374         mrec = read_mrecord(0, &error, &pickup);
375
376         /*
377          * Read and process bulk records
378          */
379         for (;;) {
380                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
381                 if (size <= 0)
382                         break;
383                 offset = 0;
384                 while (offset < size) {
385                         mrec = (void *)((char *)buf + offset);
386                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
387                         if (offset + bytes > size) {
388                                 fprintf(stderr, "Misaligned record\n");
389                                 exit(1);
390                         }
391
392                         switch(mrec->head.type) {
393                         case HAMMER_MREC_TYPE_REC:
394                                 printf("Record obj=%016llx key=%016llx "
395                                        "rt=%02x ot=%02x\n",
396                                         mrec->rec.leaf.base.obj_id,
397                                         mrec->rec.leaf.base.key,
398                                         mrec->rec.leaf.base.rec_type,
399                                         mrec->rec.leaf.base.obj_type);
400                                 printf("       tids %016llx:%016llx data=%d\n",
401                                         mrec->rec.leaf.base.create_tid,
402                                         mrec->rec.leaf.base.delete_tid,
403                                         mrec->rec.leaf.data_len);
404                                 break;
405                         case HAMMER_MREC_TYPE_PASS:
406                                 printf("Pass   obj=%016llx key=%016llx "
407                                        "rt=%02x ot=%02x\n",
408                                         mrec->rec.leaf.base.obj_id,
409                                         mrec->rec.leaf.base.key,
410                                         mrec->rec.leaf.base.rec_type,
411                                         mrec->rec.leaf.base.obj_type);
412                                 printf("       tids %016llx:%016llx data=%d\n",
413                                         mrec->rec.leaf.base.create_tid,
414                                         mrec->rec.leaf.base.delete_tid,
415                                         mrec->rec.leaf.data_len);
416                                 break;
417                         case HAMMER_MREC_TYPE_SKIP:
418                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
419                                        "       obj=%016llx key=%016llx rt=%02x\n",
420                                        mrec->skip.skip_beg.obj_id,
421                                        mrec->skip.skip_beg.key,
422                                        mrec->skip.skip_beg.rec_type,
423                                        mrec->skip.skip_end.obj_id,
424                                        mrec->skip.skip_end.key,
425                                        mrec->skip.skip_end.rec_type);
426                         default:
427                                 break;
428                         }
429                         offset += bytes;
430                 }
431         }
432
433         /*
434          * Read and process the termination sync record.
435          */
436         mrec = read_mrecord(0, &error, &pickup);
437         if (mrec == NULL || mrec->head.type != HAMMER_MREC_TYPE_SYNC) {
438                 fprintf(stderr, "Mirror-dump: Did not get termination "
439                                 "sync record\n");
440         }
441 }
442
443 void
444 hammer_cmd_mirror_copy(char **av, int ac)
445 {
446         pid_t pid1;
447         pid_t pid2;
448         int fds[2];
449         const char *xav[16];
450         char tbuf[16];
451         char *ptr;
452         int xac;
453
454         if (ac != 2)
455                 mirror_usage(1);
456
457         if (pipe(fds) < 0) {
458                 perror("pipe");
459                 exit(1);
460         }
461
462         TwoWayPipeOpt = 1;
463
464         /*
465          * Source
466          */
467         if ((pid1 = fork()) == 0) {
468                 dup2(fds[0], 0);
469                 dup2(fds[0], 1);
470                 close(fds[0]);
471                 close(fds[1]);
472                 if ((ptr = strchr(av[0], ':')) != NULL) {
473                         *ptr++ = 0;
474                         xac = 0;
475                         xav[xac++] = "ssh";
476                         xav[xac++] = av[0];
477                         xav[xac++] = "hammer";
478                         if (VerboseOpt)
479                                 xav[xac++] = "-v";
480                         xav[xac++] = "-2";
481                         if (TimeoutOpt) {
482                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
483                                 xav[xac++] = "-t";
484                                 xav[xac++] = tbuf;
485                         }
486                         xav[xac++] = "mirror-read";
487                         xav[xac++] = ptr;
488                         xav[xac++] = NULL;
489                         execv("/usr/bin/ssh", (void *)xav);
490                 } else {
491                         hammer_cmd_mirror_read(av, 1);
492                         fflush(stdout);
493                         fflush(stderr);
494                 }
495                 _exit(1);
496         }
497
498         /*
499          * Target
500          */
501         if ((pid2 = fork()) == 0) {
502                 dup2(fds[1], 0);
503                 dup2(fds[1], 1);
504                 close(fds[0]);
505                 close(fds[1]);
506                 if ((ptr = strchr(av[1], ':')) != NULL) {
507                         *ptr++ = 0;
508                         xac = 0;
509                         xav[xac++] = "ssh";
510                         xav[xac++] = av[1];
511                         xav[xac++] = "hammer";
512                         if (VerboseOpt)
513                                 xav[xac++] = "-v";
514                         xav[xac++] = "-2";
515                         xav[xac++] = "mirror-write";
516                         xav[xac++] = ptr;
517                         xav[xac++] = NULL;
518                         execv("/usr/bin/ssh", (void *)xav);
519                 } else {
520                         hammer_cmd_mirror_write(av + 1, 1);
521                         fflush(stdout);
522                         fflush(stderr);
523                 }
524                 _exit(1);
525         }
526         close(fds[0]);
527         close(fds[1]);
528
529         while (waitpid(pid1, NULL, 0) <= 0)
530                 ;
531         while (waitpid(pid2, NULL, 0) <= 0)
532                 ;
533 }
534
535 /*
536  * Read and return multiple mrecords
537  */
538 static int
539 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
540 {
541         hammer_ioc_mrecord_any_t mrec;
542         u_int count;
543         size_t n;
544         size_t i;
545         size_t bytes;
546
547         count = 0;
548         while (size - count >= HAMMER_MREC_HEADSIZE) {
549                 /*
550                  * Cached the record header in case we run out of buffer
551                  * space.
552                  */
553                 fflush(stdout);
554                 if (pickup->signature == 0) {
555                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
556                                 i = read(fd, (char *)pickup + n,
557                                          HAMMER_MREC_HEADSIZE - n);
558                                 if (i <= 0)
559                                         break;
560                         }
561                         if (n == 0)
562                                 break;
563                         if (n != HAMMER_MREC_HEADSIZE) {
564                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
565                                 exit(1);
566                         }
567
568                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
569                                 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
570                                 exit(1);
571                         }
572                 }
573                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
574                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
575                         fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
576                         exit(1);
577                 }
578
579                 /*
580                  * Stop if we have insufficient space for the record and data.
581                  */
582                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
583                 if (size - count < bytes)
584                         break;
585
586                 /*
587                  * Stop if the record type is not a REC or a SKIP (the only
588                  * two types the ioctl supports.  Other types are used only
589                  * by the userland protocol).
590                  */
591                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
592                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
593                     pickup->type != HAMMER_MREC_TYPE_PASS) {
594                         break;
595                 }
596
597                 /*
598                  * Read the remainder and clear the pickup signature.
599                  */
600                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
601                         i = read(fd, buf + count + n, bytes - n);
602                         if (i <= 0)
603                                 break;
604                 }
605                 if (n != bytes) {
606                         fprintf(stderr, "read_mrecords: short read on pipe\n");
607                         exit(1);
608                 }
609
610                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
611                 pickup->signature = 0;
612                 pickup->type = 0;
613                 mrec = (void *)(buf + count);
614
615                 /*
616                  * Validate the completed record
617                  */
618                 if (mrec->head.rec_crc !=
619                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
620                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
621                         fprintf(stderr, "read_mrecords: malformed record "
622                                         "on pipe, bad crc\n");
623                         exit(1);
624                 }
625
626                 /*
627                  * If its a B-Tree record validate the data crc
628                  */
629                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
630                         if (mrec->head.rec_size <
631                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
632                                 fprintf(stderr, 
633                                         "read_mrecords: malformed record on "
634                                         "pipe, illegal element data_len\n");
635                                 exit(1);
636                         }
637                         if (mrec->rec.leaf.data_len &&
638                             mrec->rec.leaf.data_offset &&
639                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
640                                 fprintf(stderr,
641                                         "read_mrecords: data_crc did not "
642                                         "match data! obj=%016llx key=%016llx\n",
643                                         mrec->rec.leaf.base.obj_id,
644                                         mrec->rec.leaf.base.key);
645                                 fprintf(stderr,
646                                         "continuing, but there are problems\n");
647                         }
648                 }
649                 count += bytes;
650         }
651         return(count);
652 }
653
654 /*
655  * Read and return a single mrecord.
656  */
657 static
658 hammer_ioc_mrecord_any_t
659 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
660 {
661         hammer_ioc_mrecord_any_t mrec;
662         struct hammer_ioc_mrecord_head mrechd;
663         size_t bytes;
664         size_t n;
665         size_t i;
666
667         if (pickup && pickup->type != 0) {
668                 mrechd = *pickup;
669                 pickup->signature = 0;
670                 pickup->type = 0;
671                 n = HAMMER_MREC_HEADSIZE;
672         } else {
673                 /*
674                  * Read in the PFSD header from the sender.
675                  */
676                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
677                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
678                         if (i <= 0)
679                                 break;
680                 }
681                 if (n == 0) {
682                         *errorp = 0;    /* EOF */
683                         return(NULL);
684                 }
685                 if (n != HAMMER_MREC_HEADSIZE) {
686                         fprintf(stderr, "short read of mrecord header\n");
687                         *errorp = EPIPE;
688                         return(NULL);
689                 }
690         }
691         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
692                 fprintf(stderr, "read_mrecord: bad signature\n");
693                 *errorp = EINVAL;
694                 return(NULL);
695         }
696         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
697         assert(bytes >= sizeof(mrechd));
698         mrec = malloc(bytes);
699         mrec->head = mrechd;
700
701         while (n < bytes) {
702                 i = read(fdin, (char *)mrec + n, bytes - n);
703                 if (i <= 0)
704                         break;
705                 n += i;
706         }
707         if (n != bytes) {
708                 fprintf(stderr, "read_mrecord: short read on payload\n");
709                 *errorp = EPIPE;
710                 return(NULL);
711         }
712         if (mrec->head.rec_crc != 
713             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
714                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
715                 fprintf(stderr, "read_mrecord: bad CRC\n");
716                 *errorp = EINVAL;
717                 return(NULL);
718         }
719         *errorp = 0;
720         return(mrec);
721 }
722
723 static
724 void
725 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
726               int bytes)
727 {
728         char zbuf[HAMMER_HEAD_ALIGN];
729         int pad;
730
731         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
732
733         assert(bytes >= (int)sizeof(mrec->head));
734         bzero(&mrec->head, sizeof(mrec->head));
735         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
736         mrec->head.type = type;
737         mrec->head.rec_size = bytes;
738         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
739                                    bytes - HAMMER_MREC_CRCOFF);
740         if (write(fdout, mrec, bytes) != bytes) {
741                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
742                         errno, strerror(errno));
743                 exit(1);
744         }
745         if (pad) {
746                 bzero(zbuf, pad);
747                 if (write(fdout, zbuf, pad) != pad) {
748                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
749                                 errno, strerror(errno));
750                         exit(1);
751                 }
752         }
753 }
754
755 /*
756  * Generate a mirroring header with the pfs information of the
757  * originating filesytem.
758  */
759 static void
760 generate_mrec_header(int fd, int fdout, int pfs_id,
761                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
762 {
763         struct hammer_ioc_pseudofs_rw pfs;
764         union hammer_ioc_mrecord_any mrec_tmp;
765
766         bzero(&pfs, sizeof(pfs));
767         bzero(&mrec_tmp, sizeof(mrec_tmp));
768         pfs.pfs_id = pfs_id;
769         pfs.ondisk = &mrec_tmp.pfs.pfsd;
770         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
771         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
772                 fprintf(stderr, "mirror-read: not a HAMMER fs/pseudofs!\n");
773                 exit(1);
774         }
775         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
776                 fprintf(stderr, "mirror-read: HAMMER pfs version mismatch!\n");
777                 exit(1);
778         }
779
780         /*
781          * sync_beg_tid - lowest TID on source after which a full history
782          *                is available.
783          *
784          * sync_end_tid - highest fully synchronized TID from source.
785          */
786         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
787                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
788         if (tid_endp)
789                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
790         mrec_tmp.pfs.version = pfs.version;
791         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
792                       &mrec_tmp, sizeof(mrec_tmp.pfs));
793 }
794
795 /*
796  * Validate the pfs information from the originating filesystem
797  * against the target filesystem.  shared_uuid must match.
798  */
799 static void
800 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
801                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
802 {
803         struct hammer_ioc_pseudofs_rw pfs;
804         struct hammer_pseudofs_data pfsd;
805         hammer_ioc_mrecord_any_t mrec;
806         int error;
807
808         /*
809          * Get the PFSD info from the target filesystem.
810          */
811         bzero(&pfs, sizeof(pfs));
812         bzero(&pfsd, sizeof(pfsd));
813         pfs.pfs_id = pfs_id;
814         pfs.ondisk = &pfsd;
815         pfs.bytes = sizeof(pfsd);
816         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
817                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
818                 exit(1);
819         }
820         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
821                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
822                 exit(1);
823         }
824
825         mrec = read_mrecord(fdin, &error, NULL);
826         if (mrec == NULL) {
827                 if (error == 0)
828                         fprintf(stderr, "validate_mrec_header: short read\n");
829                 exit(1);
830         }
831         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
832                 fprintf(stderr, "validate_mrec_header: did not get expected "
833                                 "PFSD record type\n");
834                 exit(1);
835         }
836         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
837                 fprintf(stderr, "validate_mrec_header: unexpected payload "
838                                 "size\n");
839                 exit(1);
840         }
841         if (mrec->pfs.version != pfs.version) {
842                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
843                 exit(1);
844         }
845
846         /*
847          * Whew.  Ok, is the read PFS info compatible with the target?
848          */
849         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
850                  sizeof(pfsd.shared_uuid)) != 0) {
851                 fprintf(stderr, 
852                         "mirror-write: source and target have "
853                         "different shared_uuid's!\n");
854                 exit(1);
855         }
856         if (is_target &&
857             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
858                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
859                 exit(1);
860         }
861         if (tid_begp)
862                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
863         if (tid_endp)
864                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
865         free(mrec);
866 }
867
868 static void
869 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
870 {
871         struct hammer_ioc_pseudofs_rw pfs;
872         struct hammer_pseudofs_data pfsd;
873
874         bzero(&pfs, sizeof(pfs));
875         bzero(&pfsd, sizeof(pfsd));
876         pfs.pfs_id = pfs_id;
877         pfs.ondisk = &pfsd;
878         pfs.bytes = sizeof(pfsd);
879         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
880                 perror("update_pfs_snapshot (read)");
881                 exit(1);
882         }
883         pfsd.sync_end_tid = snapshot_tid;
884         if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
885                 perror("update_pfs_snapshot (rewrite)");
886                 exit(1);
887         }
888 }
889
890
891 static void
892 mirror_usage(int code)
893 {
894         fprintf(stderr, 
895                 "hammer mirror-read <filesystem>\n"
896                 "hammer mirror-write <filesystem>\n"
897                 "hammer mirror-dump\n"
898                 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
899         );
900         exit(code);
901 }