MFC: Fix message.
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.9.2.1 2008/07/18 23:25:05 swildner Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static void validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
51 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
52 static void mirror_usage(int code);
53
54 /*
55  * Generate a mirroring data stream from the specific source over the
56  * entire key range, but restricted to the specified transaction range.
57  *
58  * The HAMMER VFS does most of the work, we add a few new mrecord
59  * types to negotiate the TID ranges and verify that the entire
60  * stream made it to the destination.
61  */
62 void
63 hammer_cmd_mirror_read(char **av, int ac)
64 {
65         struct hammer_ioc_mirror_rw mirror;
66         struct hammer_ioc_pseudofs_rw pfs;
67         union hammer_ioc_mrecord_any mrec_tmp;
68         hammer_ioc_mrecord_any_t mrec;
69         hammer_tid_t sync_tid;
70         const char *filesystem;
71         char *buf = malloc(SERIALBUF_SIZE);
72         int interrupted = 0;
73         int error;
74         int fd;
75         int n;
76         time_t base_t = time(NULL);
77
78         if (ac > 2)
79                 mirror_usage(1);
80         filesystem = av[0];
81
82         bzero(&mirror, sizeof(mirror));
83         hammer_key_beg_init(&mirror.key_beg);
84         hammer_key_end_init(&mirror.key_end);
85
86         fd = getpfs(&pfs, filesystem);
87
88         /*
89          * In 2-way mode the target will send us a PFS info packet
90          * first.  Use the target's current snapshot TID as our default
91          * begin TID.
92          */
93         mirror.tid_beg = 0;
94         if (TwoWayPipeOpt)
95                 validate_mrec_header(fd, 0, 0, pfs.pfs_id,
96                                      NULL, &mirror.tid_beg);
97
98         /*
99          * Write out the PFS header, tid_beg will be updated if our PFS
100          * has a larger begin sync.  tid_end is set to the latest source
101          * TID whos flush cycle has completed.
102          */
103         generate_mrec_header(fd, 1, pfs.pfs_id,
104                              &mirror.tid_beg, &mirror.tid_end);
105
106         /*
107          * A cycle file overrides the beginning TID
108          */
109         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
110
111         if (ac == 2)
112                 mirror.tid_beg = strtoull(av[1], NULL, 0);
113
114         fprintf(stderr, "Mirror-read: Mirror from %016llx to %016llx\n",
115                 mirror.tid_beg, mirror.tid_end);
116         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
117                 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
118                         mirror.key_beg.obj_id);
119         }
120
121         /*
122          * Nothing to do if begin equals end.
123          */
124         if (mirror.tid_beg == mirror.tid_end) {
125                 fprintf(stderr, "Mirror-read: No work to do, stopping\n");
126                 write_mrecord(1, HAMMER_MREC_TYPE_TERM,
127                               &mrec_tmp, sizeof(mrec_tmp.sync));
128                 goto done;
129         }
130
131         /*
132          * Write out bulk records
133          */
134         mirror.ubuf = buf;
135         mirror.size = SERIALBUF_SIZE;
136
137         do {
138                 mirror.count = 0;
139                 mirror.pfs_id = pfs.pfs_id;
140                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
141                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
142                         fprintf(stderr, "Mirror-read %s failed: %s\n",
143                                 filesystem, strerror(errno));
144                         exit(1);
145                 }
146                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
147                         fprintf(stderr,
148                                 "Mirror-read %s fatal error %d\n",
149                                 filesystem, mirror.head.error);
150                         exit(1);
151                 }
152                 if (mirror.count) {
153                         n = write(1, mirror.ubuf, mirror.count);
154                         if (n != mirror.count) {
155                                 fprintf(stderr, "Mirror-read %s failed: "
156                                                 "short write\n",
157                                 filesystem);
158                                 exit(1);
159                         }
160                 }
161                 mirror.key_beg = mirror.key_cur;
162                 if (TimeoutOpt &&
163                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
164                         fprintf(stderr,
165                                 "Mirror-read %s interrupted by timer at"
166                                 " %016llx\n",
167                                 filesystem,
168                                 mirror.key_cur.obj_id);
169                         interrupted = 1;
170                         break;
171                 }
172         } while (mirror.count != 0);
173
174         /*
175          * Write out the termination sync record
176          */
177         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
178                       &mrec_tmp, sizeof(mrec_tmp.sync));
179
180         /*
181          * If the -2 option was given (automatic when doing mirror-copy),
182          * a two-way pipe is assumed and we expect a response mrec from
183          * the target.
184          */
185         if (TwoWayPipeOpt) {
186                 mrec = read_mrecord(0, &error, NULL);
187                 if (mrec == NULL || 
188                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
189                     mrec->head.rec_size != sizeof(mrec->update)) {
190                         fprintf(stderr, "mirror_read: Did not get final "
191                                         "acknowledgement packet from target\n");
192                         exit(1);
193                 }
194                 if (interrupted) {
195                         if (CyclePath) {
196                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
197                                 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
198                         }
199                 } else {
200                         sync_tid = mrec->update.tid;
201                         if (CyclePath) {
202                                 hammer_key_beg_init(&mirror.key_beg);
203                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
204                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
205                                         CyclePath, sync_tid);
206                         }
207                 }
208         } else if (CyclePath) {
209                 /* NOTE! mirror.tid_beg cannot be updated */
210                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
211                                 "fully updated unless you use mirror-copy\n");
212                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
213         }
214 done:
215         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
216 }
217
218 /*
219  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
220  * some additional packet types to negotiate TID ranges and to verify
221  * completion.  The HAMMER VFS does most of the work.
222  *
223  * It is important to note that the mirror.key_{beg,end} range must
224  * match the ranged used by the original.  For now both sides use
225  * range the entire key space.
226  *
227  * It is even more important that the records in the stream conform
228  * to the TID range also supplied in the stream.  The HAMMER VFS will
229  * use the REC, PASS, and SKIP record types to track the portions of
230  * the B-Tree being scanned in order to be able to proactively delete
231  * records on the target within those active areas that are not mentioned
232  * by the source.
233  *
234  * The mirror.key_cur field is used by the VFS to do this tracking.  It
235  * must be initialized to key_beg but then is persistently updated by
236  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
237  * field you will blow up the mirror target, possibly to the point of
238  * deleting everything.  As a safety measure the HAMMER VFS simply marks
239  * the records that the source has destroyed as deleted on the target,
240  * and normal pruning operations will deal with their final disposition
241  * at some later time.
242  */
243 void
244 hammer_cmd_mirror_write(char **av, int ac)
245 {
246         struct hammer_ioc_mirror_rw mirror;
247         const char *filesystem;
248         char *buf = malloc(SERIALBUF_SIZE);
249         struct hammer_ioc_pseudofs_rw pfs;
250         struct hammer_ioc_mrecord_head pickup;
251         struct hammer_ioc_synctid synctid;
252         union hammer_ioc_mrecord_any mrec_tmp;
253         hammer_ioc_mrecord_any_t mrec;
254         int error;
255         int fd;
256
257         if (ac > 2)
258                 mirror_usage(1);
259         filesystem = av[0];
260
261         bzero(&mirror, sizeof(mirror));
262         hammer_key_beg_init(&mirror.key_beg);
263         hammer_key_end_init(&mirror.key_end);
264         mirror.key_end = mirror.key_beg;
265
266         fd = getpfs(&pfs, filesystem);
267
268         /*
269          * In two-way mode the target writes out a PFS packet first.
270          * The source uses our tid_end as its tid_beg by default,
271          * picking up where it left off.
272          */
273         mirror.tid_beg = 0;
274         if (TwoWayPipeOpt) {
275                 generate_mrec_header(fd, 1, pfs.pfs_id,
276                                      &mirror.tid_beg, &mirror.tid_end);
277         }
278
279         /*
280          * Read and process the PFS header.  The source informs us of
281          * the TID range the stream represents.
282          */
283         validate_mrec_header(fd, 0, 1, pfs.pfs_id,
284                              &mirror.tid_beg, &mirror.tid_end);
285
286         mirror.ubuf = buf;
287         mirror.size = SERIALBUF_SIZE;
288
289         pickup.signature = 0;
290         pickup.type = 0;
291
292         /*
293          * Read and process bulk records (REC, PASS, and SKIP types).
294          *
295          * On your life, do NOT mess with mirror.key_cur or your mirror
296          * target may become history.
297          */
298         for (;;) {
299                 mirror.count = 0;
300                 mirror.pfs_id = pfs.pfs_id;
301                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
302                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
303                 if (mirror.size <= 0)
304                         break;
305                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
306                         fprintf(stderr, "Mirror-write %s failed: %s\n",
307                                 filesystem, strerror(errno));
308                         exit(1);
309                 }
310                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
311                         fprintf(stderr,
312                                 "Mirror-write %s fatal error %d\n",
313                                 filesystem, mirror.head.error);
314                         exit(1);
315                 }
316 #if 0
317                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
318                         fprintf(stderr,
319                                 "Mirror-write %s interrupted by timer at"
320                                 " %016llx\n",
321                                 filesystem,
322                                 mirror.key_cur.obj_id);
323                         exit(0);
324                 }
325 #endif
326         }
327
328         /*
329          * Read and process the termination sync record.
330          */
331         mrec = read_mrecord(0, &error, &pickup);
332
333         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
334                 fprintf(stderr, "Mirror-write: No work to do, stopping\n");
335                 return;
336         }
337
338         if (mrec == NULL || 
339             mrec->head.type != HAMMER_MREC_TYPE_SYNC ||
340             mrec->head.rec_size != sizeof(mrec->sync)) {
341                 fprintf(stderr, "Mirror-write %s: Did not get termination "
342                                 "sync record, or rec_size is wrong rt=%d\n",
343                                 filesystem, mrec->head.type);
344         }
345         free(mrec);
346         mrec = NULL;
347
348         /*
349          * Update the PFS info on the target so the user has visibility
350          * into the new snapshot.
351          */
352         update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
353
354         /*
355          * Sync the target filesystem
356          */
357         bzero(&synctid, sizeof(synctid));
358         synctid.op = HAMMER_SYNCTID_SYNC2;
359         ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
360
361         fprintf(stderr, "Mirror-write %s: succeeded\n", filesystem);
362
363         /*
364          * Report back to the originator.
365          */
366         if (TwoWayPipeOpt) {
367                 mrec_tmp.update.tid = mirror.tid_end;
368                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
369                               &mrec_tmp, sizeof(mrec_tmp.update));
370         } else {
371                 printf("Source can update synctid to 0x%016llx\n",
372                        mirror.tid_end);
373         }
374 }
375
376 void
377 hammer_cmd_mirror_dump(void)
378 {
379         char *buf = malloc(SERIALBUF_SIZE);
380         struct hammer_ioc_mrecord_head pickup;
381         hammer_ioc_mrecord_any_t mrec;
382         int error;
383         int size;
384         int offset;
385         int bytes;
386
387         /*
388          * Read and process the PFS header 
389          */
390         pickup.signature = 0;
391         pickup.type = 0;
392
393         mrec = read_mrecord(0, &error, &pickup);
394
395         /*
396          * Read and process bulk records
397          */
398         for (;;) {
399                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
400                 if (size <= 0)
401                         break;
402                 offset = 0;
403                 while (offset < size) {
404                         mrec = (void *)((char *)buf + offset);
405                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
406                         if (offset + bytes > size) {
407                                 fprintf(stderr, "Misaligned record\n");
408                                 exit(1);
409                         }
410
411                         switch(mrec->head.type) {
412                         case HAMMER_MREC_TYPE_REC:
413                                 printf("Record obj=%016llx key=%016llx "
414                                        "rt=%02x ot=%02x\n",
415                                         mrec->rec.leaf.base.obj_id,
416                                         mrec->rec.leaf.base.key,
417                                         mrec->rec.leaf.base.rec_type,
418                                         mrec->rec.leaf.base.obj_type);
419                                 printf("       tids %016llx:%016llx data=%d\n",
420                                         mrec->rec.leaf.base.create_tid,
421                                         mrec->rec.leaf.base.delete_tid,
422                                         mrec->rec.leaf.data_len);
423                                 break;
424                         case HAMMER_MREC_TYPE_PASS:
425                                 printf("Pass   obj=%016llx key=%016llx "
426                                        "rt=%02x ot=%02x\n",
427                                         mrec->rec.leaf.base.obj_id,
428                                         mrec->rec.leaf.base.key,
429                                         mrec->rec.leaf.base.rec_type,
430                                         mrec->rec.leaf.base.obj_type);
431                                 printf("       tids %016llx:%016llx data=%d\n",
432                                         mrec->rec.leaf.base.create_tid,
433                                         mrec->rec.leaf.base.delete_tid,
434                                         mrec->rec.leaf.data_len);
435                                 break;
436                         case HAMMER_MREC_TYPE_SKIP:
437                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
438                                        "       obj=%016llx key=%016llx rt=%02x\n",
439                                        mrec->skip.skip_beg.obj_id,
440                                        mrec->skip.skip_beg.key,
441                                        mrec->skip.skip_beg.rec_type,
442                                        mrec->skip.skip_end.obj_id,
443                                        mrec->skip.skip_end.key,
444                                        mrec->skip.skip_end.rec_type);
445                         default:
446                                 break;
447                         }
448                         offset += bytes;
449                 }
450         }
451
452         /*
453          * Read and process the termination sync record.
454          */
455         mrec = read_mrecord(0, &error, &pickup);
456         if (mrec == NULL || mrec->head.type != HAMMER_MREC_TYPE_SYNC) {
457                 fprintf(stderr, "Mirror-dump: Did not get termination "
458                                 "sync record\n");
459         }
460 }
461
462 void
463 hammer_cmd_mirror_copy(char **av, int ac)
464 {
465         pid_t pid1;
466         pid_t pid2;
467         int fds[2];
468         const char *xav[16];
469         char tbuf[16];
470         char *ptr;
471         int xac;
472
473         if (ac != 2)
474                 mirror_usage(1);
475
476         if (pipe(fds) < 0) {
477                 perror("pipe");
478                 exit(1);
479         }
480
481         TwoWayPipeOpt = 1;
482
483         /*
484          * Source
485          */
486         if ((pid1 = fork()) == 0) {
487                 dup2(fds[0], 0);
488                 dup2(fds[0], 1);
489                 close(fds[0]);
490                 close(fds[1]);
491                 if ((ptr = strchr(av[0], ':')) != NULL) {
492                         *ptr++ = 0;
493                         xac = 0;
494                         xav[xac++] = "ssh";
495                         xav[xac++] = av[0];
496                         xav[xac++] = "hammer";
497                         if (VerboseOpt)
498                                 xav[xac++] = "-v";
499                         xav[xac++] = "-2";
500                         if (TimeoutOpt) {
501                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
502                                 xav[xac++] = "-t";
503                                 xav[xac++] = tbuf;
504                         }
505                         xav[xac++] = "mirror-read";
506                         xav[xac++] = ptr;
507                         xav[xac++] = NULL;
508                         execv("/usr/bin/ssh", (void *)xav);
509                 } else {
510                         hammer_cmd_mirror_read(av, 1);
511                         fflush(stdout);
512                         fflush(stderr);
513                 }
514                 _exit(1);
515         }
516
517         /*
518          * Target
519          */
520         if ((pid2 = fork()) == 0) {
521                 dup2(fds[1], 0);
522                 dup2(fds[1], 1);
523                 close(fds[0]);
524                 close(fds[1]);
525                 if ((ptr = strchr(av[1], ':')) != NULL) {
526                         *ptr++ = 0;
527                         xac = 0;
528                         xav[xac++] = "ssh";
529                         xav[xac++] = av[1];
530                         xav[xac++] = "hammer";
531                         if (VerboseOpt)
532                                 xav[xac++] = "-v";
533                         xav[xac++] = "-2";
534                         xav[xac++] = "mirror-write";
535                         xav[xac++] = ptr;
536                         xav[xac++] = NULL;
537                         execv("/usr/bin/ssh", (void *)xav);
538                 } else {
539                         hammer_cmd_mirror_write(av + 1, 1);
540                         fflush(stdout);
541                         fflush(stderr);
542                 }
543                 _exit(1);
544         }
545         close(fds[0]);
546         close(fds[1]);
547
548         while (waitpid(pid1, NULL, 0) <= 0)
549                 ;
550         while (waitpid(pid2, NULL, 0) <= 0)
551                 ;
552 }
553
554 /*
555  * Read and return multiple mrecords
556  */
557 static int
558 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
559 {
560         hammer_ioc_mrecord_any_t mrec;
561         u_int count;
562         size_t n;
563         size_t i;
564         size_t bytes;
565
566         count = 0;
567         while (size - count >= HAMMER_MREC_HEADSIZE) {
568                 /*
569                  * Cached the record header in case we run out of buffer
570                  * space.
571                  */
572                 fflush(stdout);
573                 if (pickup->signature == 0) {
574                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
575                                 i = read(fd, (char *)pickup + n,
576                                          HAMMER_MREC_HEADSIZE - n);
577                                 if (i <= 0)
578                                         break;
579                         }
580                         if (n == 0)
581                                 break;
582                         if (n != HAMMER_MREC_HEADSIZE) {
583                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
584                                 exit(1);
585                         }
586
587                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
588                                 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
589                                 exit(1);
590                         }
591                 }
592                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
593                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
594                         fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
595                         exit(1);
596                 }
597
598                 /*
599                  * Stop if we have insufficient space for the record and data.
600                  */
601                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
602                 if (size - count < bytes)
603                         break;
604
605                 /*
606                  * Stop if the record type is not a REC or a SKIP (the only
607                  * two types the ioctl supports.  Other types are used only
608                  * by the userland protocol).
609                  */
610                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
611                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
612                     pickup->type != HAMMER_MREC_TYPE_PASS) {
613                         break;
614                 }
615
616                 /*
617                  * Read the remainder and clear the pickup signature.
618                  */
619                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
620                         i = read(fd, buf + count + n, bytes - n);
621                         if (i <= 0)
622                                 break;
623                 }
624                 if (n != bytes) {
625                         fprintf(stderr, "read_mrecords: short read on pipe\n");
626                         exit(1);
627                 }
628
629                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
630                 pickup->signature = 0;
631                 pickup->type = 0;
632                 mrec = (void *)(buf + count);
633
634                 /*
635                  * Validate the completed record
636                  */
637                 if (mrec->head.rec_crc !=
638                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
639                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
640                         fprintf(stderr, "read_mrecords: malformed record "
641                                         "on pipe, bad crc\n");
642                         exit(1);
643                 }
644
645                 /*
646                  * If its a B-Tree record validate the data crc
647                  */
648                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
649                         if (mrec->head.rec_size <
650                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
651                                 fprintf(stderr, 
652                                         "read_mrecords: malformed record on "
653                                         "pipe, illegal element data_len\n");
654                                 exit(1);
655                         }
656                         if (mrec->rec.leaf.data_len &&
657                             mrec->rec.leaf.data_offset &&
658                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
659                                 fprintf(stderr,
660                                         "read_mrecords: data_crc did not "
661                                         "match data! obj=%016llx key=%016llx\n",
662                                         mrec->rec.leaf.base.obj_id,
663                                         mrec->rec.leaf.base.key);
664                                 fprintf(stderr,
665                                         "continuing, but there are problems\n");
666                         }
667                 }
668                 count += bytes;
669         }
670         return(count);
671 }
672
673 /*
674  * Read and return a single mrecord.
675  */
676 static
677 hammer_ioc_mrecord_any_t
678 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
679 {
680         hammer_ioc_mrecord_any_t mrec;
681         struct hammer_ioc_mrecord_head mrechd;
682         size_t bytes;
683         size_t n;
684         size_t i;
685
686         if (pickup && pickup->type != 0) {
687                 mrechd = *pickup;
688                 pickup->signature = 0;
689                 pickup->type = 0;
690                 n = HAMMER_MREC_HEADSIZE;
691         } else {
692                 /*
693                  * Read in the PFSD header from the sender.
694                  */
695                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
696                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
697                         if (i <= 0)
698                                 break;
699                 }
700                 if (n == 0) {
701                         *errorp = 0;    /* EOF */
702                         return(NULL);
703                 }
704                 if (n != HAMMER_MREC_HEADSIZE) {
705                         fprintf(stderr, "short read of mrecord header\n");
706                         *errorp = EPIPE;
707                         return(NULL);
708                 }
709         }
710         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
711                 fprintf(stderr, "read_mrecord: bad signature\n");
712                 *errorp = EINVAL;
713                 return(NULL);
714         }
715         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
716         assert(bytes >= sizeof(mrechd));
717         mrec = malloc(bytes);
718         mrec->head = mrechd;
719
720         while (n < bytes) {
721                 i = read(fdin, (char *)mrec + n, bytes - n);
722                 if (i <= 0)
723                         break;
724                 n += i;
725         }
726         if (n != bytes) {
727                 fprintf(stderr, "read_mrecord: short read on payload\n");
728                 *errorp = EPIPE;
729                 return(NULL);
730         }
731         if (mrec->head.rec_crc != 
732             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
733                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
734                 fprintf(stderr, "read_mrecord: bad CRC\n");
735                 *errorp = EINVAL;
736                 return(NULL);
737         }
738         *errorp = 0;
739         return(mrec);
740 }
741
742 static
743 void
744 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
745               int bytes)
746 {
747         char zbuf[HAMMER_HEAD_ALIGN];
748         int pad;
749
750         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
751
752         assert(bytes >= (int)sizeof(mrec->head));
753         bzero(&mrec->head, sizeof(mrec->head));
754         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
755         mrec->head.type = type;
756         mrec->head.rec_size = bytes;
757         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
758                                    bytes - HAMMER_MREC_CRCOFF);
759         if (write(fdout, mrec, bytes) != bytes) {
760                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
761                         errno, strerror(errno));
762                 exit(1);
763         }
764         if (pad) {
765                 bzero(zbuf, pad);
766                 if (write(fdout, zbuf, pad) != pad) {
767                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
768                                 errno, strerror(errno));
769                         exit(1);
770                 }
771         }
772 }
773
774 /*
775  * Generate a mirroring header with the pfs information of the
776  * originating filesytem.
777  */
778 static void
779 generate_mrec_header(int fd, int fdout, int pfs_id,
780                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
781 {
782         struct hammer_ioc_pseudofs_rw pfs;
783         union hammer_ioc_mrecord_any mrec_tmp;
784
785         bzero(&pfs, sizeof(pfs));
786         bzero(&mrec_tmp, sizeof(mrec_tmp));
787         pfs.pfs_id = pfs_id;
788         pfs.ondisk = &mrec_tmp.pfs.pfsd;
789         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
790         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
791                 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
792                 exit(1);
793         }
794         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
795                 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
796                 exit(1);
797         }
798
799         /*
800          * sync_beg_tid - lowest TID on source after which a full history
801          *                is available.
802          *
803          * sync_end_tid - highest fully synchronized TID from source.
804          */
805         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
806                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
807         if (tid_endp)
808                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
809         mrec_tmp.pfs.version = pfs.version;
810         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
811                       &mrec_tmp, sizeof(mrec_tmp.pfs));
812 }
813
814 /*
815  * Validate the pfs information from the originating filesystem
816  * against the target filesystem.  shared_uuid must match.
817  */
818 static void
819 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
820                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
821 {
822         struct hammer_ioc_pseudofs_rw pfs;
823         struct hammer_pseudofs_data pfsd;
824         hammer_ioc_mrecord_any_t mrec;
825         int error;
826
827         /*
828          * Get the PFSD info from the target filesystem.
829          */
830         bzero(&pfs, sizeof(pfs));
831         bzero(&pfsd, sizeof(pfsd));
832         pfs.pfs_id = pfs_id;
833         pfs.ondisk = &pfsd;
834         pfs.bytes = sizeof(pfsd);
835         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
836                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
837                 exit(1);
838         }
839         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
840                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
841                 exit(1);
842         }
843
844         mrec = read_mrecord(fdin, &error, NULL);
845         if (mrec == NULL) {
846                 if (error == 0)
847                         fprintf(stderr, "validate_mrec_header: short read\n");
848                 exit(1);
849         }
850         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
851                 fprintf(stderr, "validate_mrec_header: did not get expected "
852                                 "PFSD record type\n");
853                 exit(1);
854         }
855         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
856                 fprintf(stderr, "validate_mrec_header: unexpected payload "
857                                 "size\n");
858                 exit(1);
859         }
860         if (mrec->pfs.version != pfs.version) {
861                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
862                 exit(1);
863         }
864
865         /*
866          * Whew.  Ok, is the read PFS info compatible with the target?
867          */
868         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
869                  sizeof(pfsd.shared_uuid)) != 0) {
870                 fprintf(stderr, 
871                         "mirror-write: source and target have "
872                         "different shared-uuid's!\n");
873                 exit(1);
874         }
875         if (is_target &&
876             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
877                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
878                 exit(1);
879         }
880         if (tid_begp)
881                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
882         if (tid_endp)
883                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
884         free(mrec);
885 }
886
887 static void
888 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
889 {
890         struct hammer_ioc_pseudofs_rw pfs;
891         struct hammer_pseudofs_data pfsd;
892
893         bzero(&pfs, sizeof(pfs));
894         bzero(&pfsd, sizeof(pfsd));
895         pfs.pfs_id = pfs_id;
896         pfs.ondisk = &pfsd;
897         pfs.bytes = sizeof(pfsd);
898         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
899                 perror("update_pfs_snapshot (read)");
900                 exit(1);
901         }
902         if (pfsd.sync_end_tid != snapshot_tid) {
903                 pfsd.sync_end_tid = snapshot_tid;
904                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
905                         perror("update_pfs_snapshot (rewrite)");
906                         exit(1);
907                 }
908                 fprintf(stderr,
909                         "Mirror-write: Completed, updated snapshot "
910                         "to %016llx\n",
911                         snapshot_tid);
912         }
913 }
914
915
916 static void
917 mirror_usage(int code)
918 {
919         fprintf(stderr, 
920                 "hammer mirror-read <filesystem>\n"
921                 "hammer mirror-write <filesystem>\n"
922                 "hammer mirror-dump\n"
923                 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
924         );
925         exit(code);
926 }