MFC: Fix comments.
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.9.2.2 2008/07/19 18:49:08 dillon Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static void validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
51 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
52 static void mirror_usage(int code);
53
54 /*
55  * Generate a mirroring data stream from the specific source over the
56  * entire key range, but restricted to the specified transaction range.
57  *
58  * The HAMMER VFS does most of the work, we add a few new mrecord
59  * types to negotiate the TID ranges and verify that the entire
60  * stream made it to the destination.
61  */
62 void
63 hammer_cmd_mirror_read(char **av, int ac)
64 {
65         struct hammer_ioc_mirror_rw mirror;
66         struct hammer_ioc_pseudofs_rw pfs;
67         union hammer_ioc_mrecord_any mrec_tmp;
68         hammer_ioc_mrecord_any_t mrec;
69         hammer_tid_t sync_tid;
70         const char *filesystem;
71         char *buf = malloc(SERIALBUF_SIZE);
72         int interrupted = 0;
73         int error;
74         int fd;
75         int n;
76         time_t base_t = time(NULL);
77
78         if (ac > 2)
79                 mirror_usage(1);
80         filesystem = av[0];
81
82         bzero(&mirror, sizeof(mirror));
83         hammer_key_beg_init(&mirror.key_beg);
84         hammer_key_end_init(&mirror.key_end);
85
86         fd = getpfs(&pfs, filesystem);
87
88         /*
89          * In 2-way mode the target will send us a PFS info packet
90          * first.  Use the target's current snapshot TID as our default
91          * begin TID.
92          */
93         mirror.tid_beg = 0;
94         if (TwoWayPipeOpt)
95                 validate_mrec_header(fd, 0, 0, pfs.pfs_id,
96                                      NULL, &mirror.tid_beg);
97
98         /*
99          * Write out the PFS header, tid_beg will be updated if our PFS
100          * has a larger begin sync.  tid_end is set to the latest source
101          * TID whos flush cycle has completed.
102          */
103         generate_mrec_header(fd, 1, pfs.pfs_id,
104                              &mirror.tid_beg, &mirror.tid_end);
105
106         /*
107          * A cycle file overrides the beginning TID
108          */
109         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
110
111         if (ac == 2)
112                 mirror.tid_beg = strtoull(av[1], NULL, 0);
113
114         fprintf(stderr, "Mirror-read: Mirror from %016llx to %016llx\n",
115                 mirror.tid_beg, mirror.tid_end);
116         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
117                 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
118                         mirror.key_beg.obj_id);
119         }
120
121         /*
122          * Nothing to do if begin equals end.
123          */
124         if (mirror.tid_beg == mirror.tid_end) {
125                 fprintf(stderr, "Mirror-read: No work to do, stopping\n");
126                 write_mrecord(1, HAMMER_MREC_TYPE_TERM,
127                               &mrec_tmp, sizeof(mrec_tmp.sync));
128                 goto done;
129         }
130
131         /*
132          * Write out bulk records
133          */
134         mirror.ubuf = buf;
135         mirror.size = SERIALBUF_SIZE;
136
137         do {
138                 mirror.count = 0;
139                 mirror.pfs_id = pfs.pfs_id;
140                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
141                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
142                         fprintf(stderr, "Mirror-read %s failed: %s\n",
143                                 filesystem, strerror(errno));
144                         exit(1);
145                 }
146                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
147                         fprintf(stderr,
148                                 "Mirror-read %s fatal error %d\n",
149                                 filesystem, mirror.head.error);
150                         exit(1);
151                 }
152                 if (mirror.count) {
153                         n = write(1, mirror.ubuf, mirror.count);
154                         if (n != mirror.count) {
155                                 fprintf(stderr, "Mirror-read %s failed: "
156                                                 "short write\n",
157                                 filesystem);
158                                 exit(1);
159                         }
160                 }
161                 mirror.key_beg = mirror.key_cur;
162                 if (TimeoutOpt &&
163                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
164                         fprintf(stderr,
165                                 "Mirror-read %s interrupted by timer at"
166                                 " %016llx\n",
167                                 filesystem,
168                                 mirror.key_cur.obj_id);
169                         interrupted = 1;
170                         break;
171                 }
172         } while (mirror.count != 0);
173
174         /*
175          * Write out the termination sync record
176          */
177         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
178                       &mrec_tmp, sizeof(mrec_tmp.sync));
179
180         /*
181          * If the -2 option was given (automatic when doing mirror-copy),
182          * a two-way pipe is assumed and we expect a response mrec from
183          * the target.
184          */
185         if (TwoWayPipeOpt) {
186                 mrec = read_mrecord(0, &error, NULL);
187                 if (mrec == NULL || 
188                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
189                     mrec->head.rec_size != sizeof(mrec->update)) {
190                         fprintf(stderr, "mirror_read: Did not get final "
191                                         "acknowledgement packet from target\n");
192                         exit(1);
193                 }
194                 if (interrupted) {
195                         if (CyclePath) {
196                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
197                                 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
198                         }
199                 } else {
200                         sync_tid = mrec->update.tid;
201                         if (CyclePath) {
202                                 hammer_key_beg_init(&mirror.key_beg);
203                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
204                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
205                                         CyclePath, sync_tid);
206                         }
207                 }
208         } else if (CyclePath) {
209                 /* NOTE! mirror.tid_beg cannot be updated */
210                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
211                                 "fully updated unless you use mirror-copy\n");
212                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
213         }
214 done:
215         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
216 }
217
218 /*
219  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
220  * some additional packet types to negotiate TID ranges and to verify
221  * completion.  The HAMMER VFS does most of the work.
222  *
223  * It is important to note that the mirror.key_{beg,end} range must
224  * match the ranged used by the original.  For now both sides use
225  * range the entire key space.
226  *
227  * It is even more important that the records in the stream conform
228  * to the TID range also supplied in the stream.  The HAMMER VFS will
229  * use the REC, PASS, and SKIP record types to track the portions of
230  * the B-Tree being scanned in order to be able to proactively delete
231  * records on the target within those active areas that are not mentioned
232  * by the source.
233  *
234  * The mirror.key_cur field is used by the VFS to do this tracking.  It
235  * must be initialized to key_beg but then is persistently updated by
236  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
237  * field you will blow up the mirror target, possibly to the point of
238  * deleting everything.  As a safety measure the HAMMER VFS simply marks
239  * the records that the source has destroyed as deleted on the target,
240  * and normal pruning operations will deal with their final disposition
241  * at some later time.
242  */
243 void
244 hammer_cmd_mirror_write(char **av, int ac)
245 {
246         struct hammer_ioc_mirror_rw mirror;
247         const char *filesystem;
248         char *buf = malloc(SERIALBUF_SIZE);
249         struct hammer_ioc_pseudofs_rw pfs;
250         struct hammer_ioc_mrecord_head pickup;
251         struct hammer_ioc_synctid synctid;
252         union hammer_ioc_mrecord_any mrec_tmp;
253         hammer_ioc_mrecord_any_t mrec;
254         int error;
255         int fd;
256
257         if (ac > 2)
258                 mirror_usage(1);
259         filesystem = av[0];
260
261         bzero(&mirror, sizeof(mirror));
262         hammer_key_beg_init(&mirror.key_beg);
263         hammer_key_end_init(&mirror.key_end);
264         mirror.key_end = mirror.key_beg;
265
266         fd = getpfs(&pfs, filesystem);
267
268         /*
269          * In two-way mode the target writes out a PFS packet first.
270          * The source uses our tid_end as its tid_beg by default,
271          * picking up where it left off.
272          */
273         mirror.tid_beg = 0;
274         if (TwoWayPipeOpt) {
275                 generate_mrec_header(fd, 1, pfs.pfs_id,
276                                      &mirror.tid_beg, &mirror.tid_end);
277         }
278
279         /*
280          * Read and process the PFS header.  The source informs us of
281          * the TID range the stream represents.
282          */
283         validate_mrec_header(fd, 0, 1, pfs.pfs_id,
284                              &mirror.tid_beg, &mirror.tid_end);
285
286         mirror.ubuf = buf;
287         mirror.size = SERIALBUF_SIZE;
288
289         pickup.signature = 0;
290         pickup.type = 0;
291
292         /*
293          * Read and process bulk records (REC, PASS, and SKIP types).
294          *
295          * On your life, do NOT mess with mirror.key_cur or your mirror
296          * target may become history.
297          */
298         for (;;) {
299                 mirror.count = 0;
300                 mirror.pfs_id = pfs.pfs_id;
301                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
302                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
303                 if (mirror.size <= 0)
304                         break;
305                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
306                         fprintf(stderr, "Mirror-write %s failed: %s\n",
307                                 filesystem, strerror(errno));
308                         exit(1);
309                 }
310                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
311                         fprintf(stderr,
312                                 "Mirror-write %s fatal error %d\n",
313                                 filesystem, mirror.head.error);
314                         exit(1);
315                 }
316 #if 0
317                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
318                         fprintf(stderr,
319                                 "Mirror-write %s interrupted by timer at"
320                                 " %016llx\n",
321                                 filesystem,
322                                 mirror.key_cur.obj_id);
323                         exit(0);
324                 }
325 #endif
326         }
327
328         /*
329          * Read and process the termination sync record.
330          */
331         mrec = read_mrecord(0, &error, &pickup);
332
333         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
334                 fprintf(stderr, "Mirror-write: No work to do, stopping\n");
335                 return;
336         }
337
338         if (mrec == NULL || 
339             mrec->head.type != HAMMER_MREC_TYPE_SYNC ||
340             mrec->head.rec_size != sizeof(mrec->sync)) {
341                 fprintf(stderr, "Mirror-write %s: Did not get termination "
342                                 "sync record, or rec_size is wrong rt=%d\n",
343                                 filesystem, mrec->head.type);
344                 exit(1);
345         }
346         free(mrec);
347         mrec = NULL;
348
349         /*
350          * Update the PFS info on the target so the user has visibility
351          * into the new snapshot.
352          */
353         update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
354
355         /*
356          * Sync the target filesystem
357          */
358         bzero(&synctid, sizeof(synctid));
359         synctid.op = HAMMER_SYNCTID_SYNC2;
360         ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
361
362         fprintf(stderr, "Mirror-write %s: succeeded\n", filesystem);
363
364         /*
365          * Report back to the originator.
366          */
367         if (TwoWayPipeOpt) {
368                 mrec_tmp.update.tid = mirror.tid_end;
369                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
370                               &mrec_tmp, sizeof(mrec_tmp.update));
371         } else {
372                 printf("Source can update synctid to 0x%016llx\n",
373                        mirror.tid_end);
374         }
375 }
376
377 void
378 hammer_cmd_mirror_dump(void)
379 {
380         char *buf = malloc(SERIALBUF_SIZE);
381         struct hammer_ioc_mrecord_head pickup;
382         hammer_ioc_mrecord_any_t mrec;
383         int error;
384         int size;
385         int offset;
386         int bytes;
387
388         /*
389          * Read and process the PFS header 
390          */
391         pickup.signature = 0;
392         pickup.type = 0;
393
394         mrec = read_mrecord(0, &error, &pickup);
395
396         /*
397          * Read and process bulk records
398          */
399         for (;;) {
400                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
401                 if (size <= 0)
402                         break;
403                 offset = 0;
404                 while (offset < size) {
405                         mrec = (void *)((char *)buf + offset);
406                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
407                         if (offset + bytes > size) {
408                                 fprintf(stderr, "Misaligned record\n");
409                                 exit(1);
410                         }
411
412                         switch(mrec->head.type) {
413                         case HAMMER_MREC_TYPE_REC:
414                                 printf("Record obj=%016llx key=%016llx "
415                                        "rt=%02x ot=%02x\n",
416                                         mrec->rec.leaf.base.obj_id,
417                                         mrec->rec.leaf.base.key,
418                                         mrec->rec.leaf.base.rec_type,
419                                         mrec->rec.leaf.base.obj_type);
420                                 printf("       tids %016llx:%016llx data=%d\n",
421                                         mrec->rec.leaf.base.create_tid,
422                                         mrec->rec.leaf.base.delete_tid,
423                                         mrec->rec.leaf.data_len);
424                                 break;
425                         case HAMMER_MREC_TYPE_PASS:
426                                 printf("Pass   obj=%016llx key=%016llx "
427                                        "rt=%02x ot=%02x\n",
428                                         mrec->rec.leaf.base.obj_id,
429                                         mrec->rec.leaf.base.key,
430                                         mrec->rec.leaf.base.rec_type,
431                                         mrec->rec.leaf.base.obj_type);
432                                 printf("       tids %016llx:%016llx data=%d\n",
433                                         mrec->rec.leaf.base.create_tid,
434                                         mrec->rec.leaf.base.delete_tid,
435                                         mrec->rec.leaf.data_len);
436                                 break;
437                         case HAMMER_MREC_TYPE_SKIP:
438                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
439                                        "       obj=%016llx key=%016llx rt=%02x\n",
440                                        mrec->skip.skip_beg.obj_id,
441                                        mrec->skip.skip_beg.key,
442                                        mrec->skip.skip_beg.rec_type,
443                                        mrec->skip.skip_end.obj_id,
444                                        mrec->skip.skip_end.key,
445                                        mrec->skip.skip_end.rec_type);
446                         default:
447                                 break;
448                         }
449                         offset += bytes;
450                 }
451         }
452
453         /*
454          * Read and process the termination sync record.
455          */
456         mrec = read_mrecord(0, &error, &pickup);
457         if (mrec == NULL || mrec->head.type != HAMMER_MREC_TYPE_SYNC) {
458                 fprintf(stderr, "Mirror-dump: Did not get termination "
459                                 "sync record\n");
460         }
461 }
462
463 void
464 hammer_cmd_mirror_copy(char **av, int ac)
465 {
466         pid_t pid1;
467         pid_t pid2;
468         int fds[2];
469         const char *xav[16];
470         char tbuf[16];
471         char *ptr;
472         int xac;
473
474         if (ac != 2)
475                 mirror_usage(1);
476
477         if (pipe(fds) < 0) {
478                 perror("pipe");
479                 exit(1);
480         }
481
482         TwoWayPipeOpt = 1;
483
484         /*
485          * Source
486          */
487         if ((pid1 = fork()) == 0) {
488                 dup2(fds[0], 0);
489                 dup2(fds[0], 1);
490                 close(fds[0]);
491                 close(fds[1]);
492                 if ((ptr = strchr(av[0], ':')) != NULL) {
493                         *ptr++ = 0;
494                         xac = 0;
495                         xav[xac++] = "ssh";
496                         xav[xac++] = av[0];
497                         xav[xac++] = "hammer";
498                         if (VerboseOpt)
499                                 xav[xac++] = "-v";
500                         xav[xac++] = "-2";
501                         if (TimeoutOpt) {
502                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
503                                 xav[xac++] = "-t";
504                                 xav[xac++] = tbuf;
505                         }
506                         xav[xac++] = "mirror-read";
507                         xav[xac++] = ptr;
508                         xav[xac++] = NULL;
509                         execv("/usr/bin/ssh", (void *)xav);
510                 } else {
511                         hammer_cmd_mirror_read(av, 1);
512                         fflush(stdout);
513                         fflush(stderr);
514                 }
515                 _exit(1);
516         }
517
518         /*
519          * Target
520          */
521         if ((pid2 = fork()) == 0) {
522                 dup2(fds[1], 0);
523                 dup2(fds[1], 1);
524                 close(fds[0]);
525                 close(fds[1]);
526                 if ((ptr = strchr(av[1], ':')) != NULL) {
527                         *ptr++ = 0;
528                         xac = 0;
529                         xav[xac++] = "ssh";
530                         xav[xac++] = av[1];
531                         xav[xac++] = "hammer";
532                         if (VerboseOpt)
533                                 xav[xac++] = "-v";
534                         xav[xac++] = "-2";
535                         xav[xac++] = "mirror-write";
536                         xav[xac++] = ptr;
537                         xav[xac++] = NULL;
538                         execv("/usr/bin/ssh", (void *)xav);
539                 } else {
540                         hammer_cmd_mirror_write(av + 1, 1);
541                         fflush(stdout);
542                         fflush(stderr);
543                 }
544                 _exit(1);
545         }
546         close(fds[0]);
547         close(fds[1]);
548
549         while (waitpid(pid1, NULL, 0) <= 0)
550                 ;
551         while (waitpid(pid2, NULL, 0) <= 0)
552                 ;
553 }
554
555 /*
556  * Read and return multiple mrecords
557  */
558 static int
559 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
560 {
561         hammer_ioc_mrecord_any_t mrec;
562         u_int count;
563         size_t n;
564         size_t i;
565         size_t bytes;
566
567         count = 0;
568         while (size - count >= HAMMER_MREC_HEADSIZE) {
569                 /*
570                  * Cached the record header in case we run out of buffer
571                  * space.
572                  */
573                 fflush(stdout);
574                 if (pickup->signature == 0) {
575                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
576                                 i = read(fd, (char *)pickup + n,
577                                          HAMMER_MREC_HEADSIZE - n);
578                                 if (i <= 0)
579                                         break;
580                         }
581                         if (n == 0)
582                                 break;
583                         if (n != HAMMER_MREC_HEADSIZE) {
584                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
585                                 exit(1);
586                         }
587
588                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
589                                 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
590                                 exit(1);
591                         }
592                 }
593                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
594                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
595                         fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
596                         exit(1);
597                 }
598
599                 /*
600                  * Stop if we have insufficient space for the record and data.
601                  */
602                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
603                 if (size - count < bytes)
604                         break;
605
606                 /*
607                  * Stop if the record type is not a REC or a SKIP (the only
608                  * two types the ioctl supports.  Other types are used only
609                  * by the userland protocol).
610                  */
611                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
612                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
613                     pickup->type != HAMMER_MREC_TYPE_PASS) {
614                         break;
615                 }
616
617                 /*
618                  * Read the remainder and clear the pickup signature.
619                  */
620                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
621                         i = read(fd, buf + count + n, bytes - n);
622                         if (i <= 0)
623                                 break;
624                 }
625                 if (n != bytes) {
626                         fprintf(stderr, "read_mrecords: short read on pipe\n");
627                         exit(1);
628                 }
629
630                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
631                 pickup->signature = 0;
632                 pickup->type = 0;
633                 mrec = (void *)(buf + count);
634
635                 /*
636                  * Validate the completed record
637                  */
638                 if (mrec->head.rec_crc !=
639                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
640                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
641                         fprintf(stderr, "read_mrecords: malformed record "
642                                         "on pipe, bad crc\n");
643                         exit(1);
644                 }
645
646                 /*
647                  * If its a B-Tree record validate the data crc
648                  */
649                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
650                         if (mrec->head.rec_size <
651                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
652                                 fprintf(stderr, 
653                                         "read_mrecords: malformed record on "
654                                         "pipe, illegal element data_len\n");
655                                 exit(1);
656                         }
657                         if (mrec->rec.leaf.data_len &&
658                             mrec->rec.leaf.data_offset &&
659                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
660                                 fprintf(stderr,
661                                         "read_mrecords: data_crc did not "
662                                         "match data! obj=%016llx key=%016llx\n",
663                                         mrec->rec.leaf.base.obj_id,
664                                         mrec->rec.leaf.base.key);
665                                 fprintf(stderr,
666                                         "continuing, but there are problems\n");
667                         }
668                 }
669                 count += bytes;
670         }
671         return(count);
672 }
673
674 /*
675  * Read and return a single mrecord.
676  */
677 static
678 hammer_ioc_mrecord_any_t
679 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
680 {
681         hammer_ioc_mrecord_any_t mrec;
682         struct hammer_ioc_mrecord_head mrechd;
683         size_t bytes;
684         size_t n;
685         size_t i;
686
687         if (pickup && pickup->type != 0) {
688                 mrechd = *pickup;
689                 pickup->signature = 0;
690                 pickup->type = 0;
691                 n = HAMMER_MREC_HEADSIZE;
692         } else {
693                 /*
694                  * Read in the PFSD header from the sender.
695                  */
696                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
697                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
698                         if (i <= 0)
699                                 break;
700                 }
701                 if (n == 0) {
702                         *errorp = 0;    /* EOF */
703                         return(NULL);
704                 }
705                 if (n != HAMMER_MREC_HEADSIZE) {
706                         fprintf(stderr, "short read of mrecord header\n");
707                         *errorp = EPIPE;
708                         return(NULL);
709                 }
710         }
711         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
712                 fprintf(stderr, "read_mrecord: bad signature\n");
713                 *errorp = EINVAL;
714                 return(NULL);
715         }
716         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
717         assert(bytes >= sizeof(mrechd));
718         mrec = malloc(bytes);
719         mrec->head = mrechd;
720
721         while (n < bytes) {
722                 i = read(fdin, (char *)mrec + n, bytes - n);
723                 if (i <= 0)
724                         break;
725                 n += i;
726         }
727         if (n != bytes) {
728                 fprintf(stderr, "read_mrecord: short read on payload\n");
729                 *errorp = EPIPE;
730                 return(NULL);
731         }
732         if (mrec->head.rec_crc != 
733             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
734                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
735                 fprintf(stderr, "read_mrecord: bad CRC\n");
736                 *errorp = EINVAL;
737                 return(NULL);
738         }
739         *errorp = 0;
740         return(mrec);
741 }
742
743 static
744 void
745 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
746               int bytes)
747 {
748         char zbuf[HAMMER_HEAD_ALIGN];
749         int pad;
750
751         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
752
753         assert(bytes >= (int)sizeof(mrec->head));
754         bzero(&mrec->head, sizeof(mrec->head));
755         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
756         mrec->head.type = type;
757         mrec->head.rec_size = bytes;
758         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
759                                    bytes - HAMMER_MREC_CRCOFF);
760         if (write(fdout, mrec, bytes) != bytes) {
761                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
762                         errno, strerror(errno));
763                 exit(1);
764         }
765         if (pad) {
766                 bzero(zbuf, pad);
767                 if (write(fdout, zbuf, pad) != pad) {
768                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
769                                 errno, strerror(errno));
770                         exit(1);
771                 }
772         }
773 }
774
775 /*
776  * Generate a mirroring header with the pfs information of the
777  * originating filesytem.
778  */
779 static void
780 generate_mrec_header(int fd, int fdout, int pfs_id,
781                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
782 {
783         struct hammer_ioc_pseudofs_rw pfs;
784         union hammer_ioc_mrecord_any mrec_tmp;
785
786         bzero(&pfs, sizeof(pfs));
787         bzero(&mrec_tmp, sizeof(mrec_tmp));
788         pfs.pfs_id = pfs_id;
789         pfs.ondisk = &mrec_tmp.pfs.pfsd;
790         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
791         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
792                 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
793                 exit(1);
794         }
795         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
796                 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
797                 exit(1);
798         }
799
800         /*
801          * sync_beg_tid - lowest TID on source after which a full history
802          *                is available.
803          *
804          * sync_end_tid - highest fully synchronized TID from source.
805          */
806         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
807                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
808         if (tid_endp)
809                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
810         mrec_tmp.pfs.version = pfs.version;
811         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
812                       &mrec_tmp, sizeof(mrec_tmp.pfs));
813 }
814
815 /*
816  * Validate the pfs information from the originating filesystem
817  * against the target filesystem.  shared_uuid must match.
818  */
819 static void
820 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
821                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
822 {
823         struct hammer_ioc_pseudofs_rw pfs;
824         struct hammer_pseudofs_data pfsd;
825         hammer_ioc_mrecord_any_t mrec;
826         int error;
827
828         /*
829          * Get the PFSD info from the target filesystem.
830          */
831         bzero(&pfs, sizeof(pfs));
832         bzero(&pfsd, sizeof(pfsd));
833         pfs.pfs_id = pfs_id;
834         pfs.ondisk = &pfsd;
835         pfs.bytes = sizeof(pfsd);
836         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
837                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
838                 exit(1);
839         }
840         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
841                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
842                 exit(1);
843         }
844
845         mrec = read_mrecord(fdin, &error, NULL);
846         if (mrec == NULL) {
847                 if (error == 0)
848                         fprintf(stderr, "validate_mrec_header: short read\n");
849                 exit(1);
850         }
851         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
852                 fprintf(stderr, "validate_mrec_header: did not get expected "
853                                 "PFSD record type\n");
854                 exit(1);
855         }
856         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
857                 fprintf(stderr, "validate_mrec_header: unexpected payload "
858                                 "size\n");
859                 exit(1);
860         }
861         if (mrec->pfs.version != pfs.version) {
862                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
863                 exit(1);
864         }
865
866         /*
867          * Whew.  Ok, is the read PFS info compatible with the target?
868          */
869         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
870                  sizeof(pfsd.shared_uuid)) != 0) {
871                 fprintf(stderr, 
872                         "mirror-write: source and target have "
873                         "different shared-uuid's!\n");
874                 exit(1);
875         }
876         if (is_target &&
877             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
878                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
879                 exit(1);
880         }
881         if (tid_begp)
882                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
883         if (tid_endp)
884                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
885         free(mrec);
886 }
887
888 static void
889 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
890 {
891         struct hammer_ioc_pseudofs_rw pfs;
892         struct hammer_pseudofs_data pfsd;
893
894         bzero(&pfs, sizeof(pfs));
895         bzero(&pfsd, sizeof(pfsd));
896         pfs.pfs_id = pfs_id;
897         pfs.ondisk = &pfsd;
898         pfs.bytes = sizeof(pfsd);
899         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
900                 perror("update_pfs_snapshot (read)");
901                 exit(1);
902         }
903         if (pfsd.sync_end_tid != snapshot_tid) {
904                 pfsd.sync_end_tid = snapshot_tid;
905                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
906                         perror("update_pfs_snapshot (rewrite)");
907                         exit(1);
908                 }
909                 fprintf(stderr,
910                         "Mirror-write: Completed, updated snapshot "
911                         "to %016llx\n",
912                         snapshot_tid);
913         }
914 }
915
916
917 static void
918 mirror_usage(int code)
919 {
920         fprintf(stderr, 
921                 "hammer mirror-read <filesystem>\n"
922                 "hammer mirror-write <filesystem>\n"
923                 "hammer mirror-dump\n"
924                 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
925         );
926         exit(code);
927 }