Merge branch 'vendor/GCC44'
[dragonfly.git] / sys / vfs / hammer / hammer_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_mirror.c,v 1.17 2008/07/31 22:30:33 dillon Exp $
35  */
36 /*
37  * HAMMER mirroring ioctls - serialize and deserialize modifications made
38  *                           to a filesystem.
39  */
40
41 #include "hammer.h"
42
43 static int hammer_mirror_check(hammer_cursor_t cursor,
44                                 struct hammer_ioc_mrecord_rec *mrec);
45 static int hammer_mirror_update(hammer_cursor_t cursor,
46                                 struct hammer_ioc_mrecord_rec *mrec);
47 static int hammer_mirror_write(hammer_cursor_t cursor,
48                                 struct hammer_ioc_mrecord_rec *mrec,
49                                 char *udata);
50 static int hammer_ioc_mirror_write_rec(hammer_cursor_t cursor,
51                                 struct hammer_ioc_mrecord_rec *mrec,
52                                 struct hammer_ioc_mirror_rw *mirror,
53                                 u_int32_t localization,
54                                 char *uptr);
55 static int hammer_ioc_mirror_write_pass(hammer_cursor_t cursor,
56                                 struct hammer_ioc_mrecord_rec *mrec,
57                                 struct hammer_ioc_mirror_rw *mirror,
58                                 u_int32_t localization);
59 static int hammer_ioc_mirror_write_skip(hammer_cursor_t cursor,
60                                 struct hammer_ioc_mrecord_skip *mrec,
61                                 struct hammer_ioc_mirror_rw *mirror,
62                                 u_int32_t localization);
63 static int hammer_mirror_delete_to(hammer_cursor_t cursor,
64                                 struct hammer_ioc_mirror_rw *mirror);
65 static int hammer_mirror_localize_data(hammer_data_ondisk_t data,
66                                 hammer_btree_leaf_elm_t leaf);
67
68 /*
69  * All B-Tree records within the specified key range which also conform
70  * to the transaction id range are returned.  Mirroring code keeps track
71  * of the last transaction id fully scanned and can efficiently pick up
72  * where it left off if interrupted.
73  *
74  * The PFS is identified in the mirror structure.  The passed ip is just
75  * some directory in the overall HAMMER filesystem and has nothing to
76  * do with the PFS.
77  */
78 int
79 hammer_ioc_mirror_read(hammer_transaction_t trans, hammer_inode_t ip,
80                        struct hammer_ioc_mirror_rw *mirror)
81 {
82         struct hammer_cmirror cmirror;
83         struct hammer_cursor cursor;
84         union hammer_ioc_mrecord_any mrec;
85         hammer_btree_leaf_elm_t elm;
86         const int crc_start = HAMMER_MREC_CRCOFF;
87         char *uptr;
88         int error;
89         int data_len;
90         int bytes;
91         int eatdisk;
92         int mrec_flags;
93         u_int32_t localization;
94         u_int32_t rec_crc;
95
96         localization = (u_int32_t)mirror->pfs_id << 16;
97
98         if ((mirror->key_beg.localization | mirror->key_end.localization) &
99             HAMMER_LOCALIZE_PSEUDOFS_MASK) {
100                 return(EINVAL);
101         }
102         if (hammer_btree_cmp(&mirror->key_beg, &mirror->key_end) > 0)
103                 return(EINVAL);
104
105         mirror->key_cur = mirror->key_beg;
106         mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
107         mirror->key_cur.localization += localization;
108         bzero(&mrec, sizeof(mrec));
109         bzero(&cmirror, sizeof(cmirror));
110
111         /*
112          * Make CRC errors non-fatal (at least on data), causing an EDOM
113          * error instead of EIO.
114          */
115         trans->flags |= HAMMER_TRANSF_CRCDOM;
116
117 retry:
118         error = hammer_init_cursor(trans, &cursor, NULL, NULL);
119         if (error) {
120                 hammer_done_cursor(&cursor);
121                 goto failed;
122         }
123         cursor.key_beg = mirror->key_cur;
124         cursor.key_end = mirror->key_end;
125         cursor.key_end.localization &= HAMMER_LOCALIZE_MASK;
126         cursor.key_end.localization += localization;
127
128         cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE;
129         cursor.flags |= HAMMER_CURSOR_BACKEND;
130
131         /*
132          * This flag filters the search to only return elements whos create
133          * or delete TID is >= mirror_tid.  The B-Tree uses the mirror_tid
134          * field stored with internal and leaf nodes to shortcut the scan.
135          */
136         cursor.flags |= HAMMER_CURSOR_MIRROR_FILTERED;
137         cursor.cmirror = &cmirror;
138         cmirror.mirror_tid = mirror->tid_beg;
139
140         error = hammer_btree_first(&cursor);
141         while (error == 0) {
142                 /*
143                  * Yield to more important tasks
144                  */
145                 if (error == 0) {
146                         error = hammer_signal_check(trans->hmp);
147                         if (error)
148                                 break;
149                 }
150
151                 /*
152                  * An internal node can be returned in mirror-filtered
153                  * mode and indicates that the scan is returning a skip
154                  * range in the cursor->cmirror structure.
155                  */
156                 uptr = (char *)mirror->ubuf + mirror->count;
157                 if (cursor.node->ondisk->type == HAMMER_BTREE_TYPE_INTERNAL) {
158                         /*
159                          * Check space
160                          */
161                         mirror->key_cur = cmirror.skip_beg;
162                         bytes = sizeof(mrec.skip);
163                         if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) >
164                             mirror->size) {
165                                 break;
166                         }
167
168                         /*
169                          * Fill mrec
170                          */
171                         mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
172                         mrec.head.type = HAMMER_MREC_TYPE_SKIP;
173                         mrec.head.rec_size = bytes;
174                         mrec.skip.skip_beg = cmirror.skip_beg;
175                         mrec.skip.skip_end = cmirror.skip_end;
176                         mrec.head.rec_crc = crc32(&mrec.head.rec_size,
177                                                  bytes - crc_start);
178                         error = copyout(&mrec, uptr, bytes);
179                         eatdisk = 0;
180                         goto didwrite;
181                 }
182
183                 /*
184                  * Leaf node.  In full-history mode we could filter out
185                  * elements modified outside the user-requested TID range.
186                  *
187                  * However, such elements must be returned so the writer
188                  * can compare them against the target to determine what
189                  * needs to be deleted on the target, particular for
190                  * no-history mirrors.
191                  */
192                 KKASSERT(cursor.node->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
193                 elm = &cursor.node->ondisk->elms[cursor.index].leaf;
194                 mirror->key_cur = elm->base;
195
196                 /*
197                  * If the record was created after our end point we just
198                  * ignore it.
199                  */
200                 if (elm->base.create_tid > mirror->tid_end) {
201                         error = 0;
202                         bytes = 0;
203                         eatdisk = 1;
204                         goto didwrite;
205                 }
206
207                 /*
208                  * Determine if we should generate a PASS or a REC.  PASS
209                  * records are records without any data payload.  Such
210                  * records will be generated if the target is already expected
211                  * to have the record, allowing it to delete the gaps.
212                  *
213                  * A PASS record is also used to perform deletions on the
214                  * target.
215                  *
216                  * Such deletions are needed if the master or files on the
217                  * master are no-history, or if the slave is so far behind
218                  * the master has already been pruned.
219                  */
220                 if (elm->base.create_tid < mirror->tid_beg) {
221                         bytes = sizeof(mrec.rec);
222                         if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) >
223                             mirror->size) {
224                                 break;
225                         }
226
227                         /*
228                          * Fill mrec.
229                          */
230                         mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
231                         mrec.head.type = HAMMER_MREC_TYPE_PASS;
232                         mrec.head.rec_size = bytes;
233                         mrec.rec.leaf = *elm;
234                         mrec.head.rec_crc = crc32(&mrec.head.rec_size,
235                                                  bytes - crc_start);
236                         error = copyout(&mrec, uptr, bytes);
237                         eatdisk = 1;
238                         goto didwrite;
239                         
240                 }
241
242                 /*
243                  * The core code exports the data to userland.
244                  *
245                  * CRC errors on data are reported but passed through,
246                  * but the data must be washed by the user program.
247                  */
248                 mrec_flags = 0;
249                 data_len = (elm->data_offset) ? elm->data_len : 0;
250                 if (data_len) {
251                         error = hammer_btree_extract(&cursor,
252                                                      HAMMER_CURSOR_GET_DATA);
253                         if (error) {
254                                 if (error != EDOM)
255                                         break;
256                                 mrec_flags |= HAMMER_MRECF_CRC_ERROR |
257                                               HAMMER_MRECF_DATA_CRC_BAD;
258                         }
259                 }
260
261                 bytes = sizeof(mrec.rec) + data_len;
262                 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > mirror->size)
263                         break;
264
265                 /*
266                  * Construct the record for userland and copyout.
267                  *
268                  * The user is asking for a snapshot, if the record was
269                  * deleted beyond the user-requested ending tid, the record
270                  * is not considered deleted from the point of view of
271                  * userland and delete_tid is cleared.
272                  */
273                 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
274                 mrec.head.type = HAMMER_MREC_TYPE_REC | mrec_flags;
275                 mrec.head.rec_size = bytes;
276                 mrec.rec.leaf = *elm;
277
278                 if (elm->base.delete_tid > mirror->tid_end)
279                         mrec.rec.leaf.base.delete_tid = 0;
280                 rec_crc = crc32(&mrec.head.rec_size,
281                                 sizeof(mrec.rec) - crc_start);
282                 if (data_len)
283                         rec_crc = crc32_ext(cursor.data, data_len, rec_crc);
284                 mrec.head.rec_crc = rec_crc;
285                 error = copyout(&mrec, uptr, sizeof(mrec.rec));
286                 if (data_len && error == 0) {
287                         error = copyout(cursor.data, uptr + sizeof(mrec.rec),
288                                         data_len);
289                 }
290                 eatdisk = 1;
291
292                 /*
293                  * eatdisk controls whether we skip the current cursor
294                  * position on the next scan or not.  If doing a SKIP
295                  * the cursor is already positioned properly for the next
296                  * scan and eatdisk will be 0.
297                  */
298 didwrite:
299                 if (error == 0) {
300                         mirror->count += HAMMER_HEAD_DOALIGN(bytes);
301                         if (eatdisk)
302                                 cursor.flags |= HAMMER_CURSOR_ATEDISK;
303                         else
304                                 cursor.flags &= ~HAMMER_CURSOR_ATEDISK;
305                         error = hammer_btree_iterate(&cursor);
306                 }
307         }
308         if (error == ENOENT) {
309                 mirror->key_cur = mirror->key_end;
310                 error = 0;
311         }
312         hammer_done_cursor(&cursor);
313         if (error == EDEADLK)
314                 goto retry;
315         if (error == EINTR) {
316                 mirror->head.flags |= HAMMER_IOC_HEAD_INTR;
317                 error = 0;
318         }
319 failed:
320         mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
321         return(error);
322 }
323
324 /*
325  * Copy records from userland to the target mirror.
326  *
327  * The PFS is identified in the mirror structure.  The passed ip is just
328  * some directory in the overall HAMMER filesystem and has nothing to
329  * do with the PFS.  In fact, there might not even be a root directory for
330  * the PFS yet!
331  */
332 int
333 hammer_ioc_mirror_write(hammer_transaction_t trans, hammer_inode_t ip,
334                        struct hammer_ioc_mirror_rw *mirror)
335 {
336         union hammer_ioc_mrecord_any mrec;
337         struct hammer_cursor cursor;
338         u_int32_t localization;
339         int checkspace_count = 0;
340         int error;
341         int bytes;
342         char *uptr;
343         int seq;
344
345         localization = (u_int32_t)mirror->pfs_id << 16;
346         seq = trans->hmp->flusher.act;
347
348         /*
349          * Validate the mirror structure and relocalize the tracking keys.
350          */
351         if (mirror->size < 0 || mirror->size > 0x70000000)
352                 return(EINVAL);
353         mirror->key_beg.localization &= HAMMER_LOCALIZE_MASK;
354         mirror->key_beg.localization += localization;
355         mirror->key_end.localization &= HAMMER_LOCALIZE_MASK;
356         mirror->key_end.localization += localization;
357         mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK;
358         mirror->key_cur.localization += localization;
359
360         /*
361          * Set up our tracking cursor for the loop.  The tracking cursor
362          * is used to delete records that are no longer present on the
363          * master.  The last handled record at key_cur must be skipped.
364          */
365         error = hammer_init_cursor(trans, &cursor, NULL, NULL);
366
367         cursor.key_beg = mirror->key_cur;
368         cursor.key_end = mirror->key_end;
369         cursor.flags |= HAMMER_CURSOR_BACKEND;
370         error = hammer_btree_first(&cursor);
371         if (error == 0)
372                 cursor.flags |= HAMMER_CURSOR_ATEDISK;
373         if (error == ENOENT)
374                 error = 0;
375
376         /*
377          * Loop until our input buffer has been exhausted.
378          */
379         while (error == 0 &&
380                 mirror->count + sizeof(mrec.head) <= mirror->size) {
381
382                 /*
383                  * Don't blow out the buffer cache.  Leave room for frontend
384                  * cache as well.
385                  */
386                 while (hammer_flusher_meta_halflimit(trans->hmp) ||
387                        hammer_flusher_undo_exhausted(trans, 2)) {
388                         hammer_unlock_cursor(&cursor);
389                         hammer_flusher_wait(trans->hmp, seq);
390                         hammer_lock_cursor(&cursor);
391                         seq = hammer_flusher_async_one(trans->hmp);
392                 }
393
394                 /*
395                  * If there is insufficient free space it may be due to
396                  * reserved bigblocks, which flushing might fix.
397                  */
398                 if (hammer_checkspace(trans->hmp, HAMMER_CHKSPC_MIRROR)) {
399                         if (++checkspace_count == 10) {
400                                 error = ENOSPC;
401                                 break;
402                         }
403                         hammer_unlock_cursor(&cursor);
404                         hammer_flusher_wait(trans->hmp, seq);
405                         hammer_lock_cursor(&cursor);
406                         seq = hammer_flusher_async(trans->hmp, NULL);
407                 }
408
409
410                 /*
411                  * Acquire and validate header
412                  */
413                 if ((bytes = mirror->size - mirror->count) > sizeof(mrec))
414                         bytes = sizeof(mrec);
415                 uptr = (char *)mirror->ubuf + mirror->count;
416                 error = copyin(uptr, &mrec, bytes);
417                 if (error)
418                         break;
419                 if (mrec.head.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
420                         error = EINVAL;
421                         break;
422                 }
423                 if (mrec.head.rec_size < sizeof(mrec.head) ||
424                     mrec.head.rec_size > sizeof(mrec) + HAMMER_XBUFSIZE ||
425                     mirror->count + mrec.head.rec_size > mirror->size) {
426                         error = EINVAL;
427                         break;
428                 }
429
430                 switch(mrec.head.type & HAMMER_MRECF_TYPE_MASK) {
431                 case HAMMER_MREC_TYPE_SKIP:
432                         if (mrec.head.rec_size != sizeof(mrec.skip))
433                                 error = EINVAL;
434                         if (error == 0)
435                                 error = hammer_ioc_mirror_write_skip(&cursor, &mrec.skip, mirror, localization);
436                         break;
437                 case HAMMER_MREC_TYPE_REC:
438                         if (mrec.head.rec_size < sizeof(mrec.rec))
439                                 error = EINVAL;
440                         if (error == 0)
441                                 error = hammer_ioc_mirror_write_rec(&cursor, &mrec.rec, mirror, localization, uptr + sizeof(mrec.rec));
442                         break;
443                 case HAMMER_MREC_TYPE_REC_BADCRC:
444                         /*
445                          * Records with bad data payloads are ignored XXX.
446                          */
447                         if (mrec.head.rec_size < sizeof(mrec.rec))
448                                 error = EINVAL;
449                         break;
450                 case HAMMER_MREC_TYPE_PASS:
451                         if (mrec.head.rec_size != sizeof(mrec.rec))
452                                 error = EINVAL;
453                         if (error == 0)
454                                 error = hammer_ioc_mirror_write_pass(&cursor, &mrec.rec, mirror, localization);
455                         break;
456                 default:
457                         error = EINVAL;
458                         break;
459                 }
460
461                 /*
462                  * Retry the current record on deadlock, otherwise setup
463                  * for the next loop.
464                  */
465                 if (error == EDEADLK) {
466                         while (error == EDEADLK) {
467                                 hammer_recover_cursor(&cursor);
468                                 error = hammer_cursor_upgrade(&cursor);
469                         }
470                 } else {
471                         if (error == EALREADY)
472                                 error = 0;
473                         if (error == 0) {
474                                 mirror->count += 
475                                         HAMMER_HEAD_DOALIGN(mrec.head.rec_size);
476                         }
477                 }
478         }
479         hammer_done_cursor(&cursor);
480
481         /*
482          * cumulative error 
483          */
484         if (error) {
485                 mirror->head.flags |= HAMMER_IOC_HEAD_ERROR;
486                 mirror->head.error = error;
487         }
488
489         /*
490          * ioctls don't update the RW data structure if an error is returned,
491          * always return 0.
492          */
493         return(0);
494 }
495
496 /*
497  * Handle skip records.
498  *
499  * We must iterate from the last resolved record position at mirror->key_cur
500  * to skip_beg non-inclusive and delete any records encountered.
501  *
502  * mirror->key_cur must be carefully set when we succeed in processing
503  * this mrec.
504  */
505 static int
506 hammer_ioc_mirror_write_skip(hammer_cursor_t cursor,
507                              struct hammer_ioc_mrecord_skip *mrec,
508                              struct hammer_ioc_mirror_rw *mirror,
509                              u_int32_t localization)
510 {
511         int error;
512
513         /*
514          * Relocalize the skip range
515          */
516         mrec->skip_beg.localization &= HAMMER_LOCALIZE_MASK;
517         mrec->skip_beg.localization += localization;
518         mrec->skip_end.localization &= HAMMER_LOCALIZE_MASK;
519         mrec->skip_end.localization += localization;
520
521         /*
522          * Iterate from current position to skip_beg, deleting any records
523          * we encounter.  The record at skip_beg is not included (it is
524          * skipped).
525          */
526         cursor->key_end = mrec->skip_beg;
527         cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE;
528         cursor->flags |= HAMMER_CURSOR_BACKEND;
529         error = hammer_mirror_delete_to(cursor, mirror);
530
531         /*
532          * Now skip past the skip (which is the whole point point of
533          * having a skip record).  The sender has not sent us any records
534          * for the skip area so we wouldn't know what to keep and what
535          * to delete anyway.
536          *
537          * Clear ATEDISK because skip_end is non-inclusive, so we can't
538          * count an exact match if we happened to get one.
539          */
540         if (error == 0) {
541                 mirror->key_cur = mrec->skip_end;
542                 cursor->key_beg = mrec->skip_end;
543                 error = hammer_btree_lookup(cursor);
544                 cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
545                 if (error == ENOENT)
546                         error = 0;
547         }
548         return(error);
549 }
550
551 /*
552  * Handle B-Tree records.
553  *
554  * We must iterate to mrec->base.key (non-inclusively), and then process
555  * the record.  We are allowed to write a new record or delete an existing
556  * record, but cannot replace an existing record.
557  *
558  * mirror->key_cur must be carefully set when we succeed in processing
559  * this mrec.
560  */
561 static int
562 hammer_ioc_mirror_write_rec(hammer_cursor_t cursor,
563                             struct hammer_ioc_mrecord_rec *mrec,
564                             struct hammer_ioc_mirror_rw *mirror,
565                             u_int32_t localization,
566                             char *uptr)
567 {
568         hammer_transaction_t trans;
569         u_int32_t rec_crc;
570         int error;
571
572         trans = cursor->trans;
573         rec_crc = crc32(mrec, sizeof(*mrec));
574
575         if (mrec->leaf.data_len < 0 || 
576             mrec->leaf.data_len > HAMMER_XBUFSIZE ||
577             mrec->leaf.data_len + sizeof(*mrec) > mrec->head.rec_size) {
578                 return(EINVAL);
579         }
580
581         /*
582          * Re-localize for target.  relocalization of data is handled
583          * by hammer_mirror_write().
584          */
585         mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK;
586         mrec->leaf.base.localization += localization;
587
588         /*
589          * Delete records through until we reach (non-inclusively) the
590          * target record.
591          */
592         cursor->key_end = mrec->leaf.base;
593         cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE;
594         cursor->flags |= HAMMER_CURSOR_BACKEND;
595         error = hammer_mirror_delete_to(cursor, mirror);
596
597         /*
598          * Locate the record.
599          *
600          * If the record exists only the delete_tid may be updated.
601          *
602          * If the record does not exist we can create it only if the
603          * create_tid is not too old.  If the create_tid is too old
604          * it may have already been destroyed on the slave from pruning.
605          *
606          * Note that mirror operations are effectively as-of operations
607          * and delete_tid can be 0 for mirroring purposes even if it is
608          * not actually 0 at the originator.
609          *
610          * These functions can return EDEADLK
611          */
612         cursor->key_beg = mrec->leaf.base;
613         cursor->flags |= HAMMER_CURSOR_BACKEND;
614         cursor->flags &= ~HAMMER_CURSOR_INSERT;
615         error = hammer_btree_lookup(cursor);
616
617         if (error == 0 && hammer_mirror_check(cursor, mrec)) {
618                 error = hammer_mirror_update(cursor, mrec);
619         } else if (error == ENOENT) {
620                 if (mrec->leaf.base.create_tid >= mirror->tid_beg)
621                         error = hammer_mirror_write(cursor, mrec, uptr);
622                 else
623                         error = 0;
624         }
625         if (error == 0 || error == EALREADY)
626                 mirror->key_cur = mrec->leaf.base;
627         return(error);
628 }
629
630 /*
631  * This works like write_rec but no write or update is necessary,
632  * and no data payload is included so we couldn't do a write even
633  * if we wanted to.
634  *
635  * We must still iterate for deletions, and we can validate the
636  * record header which is a good way to test for corrupted mirror
637  * targets XXX.
638  *
639  * mirror->key_cur must be carefully set when we succeed in processing
640  * this mrec.
641  */
642 static
643 int
644 hammer_ioc_mirror_write_pass(hammer_cursor_t cursor,
645                              struct hammer_ioc_mrecord_rec *mrec,
646                              struct hammer_ioc_mirror_rw *mirror,
647                              u_int32_t localization)
648 {
649         hammer_transaction_t trans;
650         u_int32_t rec_crc;
651         int error;
652
653         trans = cursor->trans;
654         rec_crc = crc32(mrec, sizeof(*mrec));
655
656         /*
657          * Re-localize for target.  Relocalization of data is handled
658          * by hammer_mirror_write().
659          */
660         mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK;
661         mrec->leaf.base.localization += localization;
662
663         /*
664          * Delete records through until we reach (non-inclusively) the
665          * target record.
666          */
667         cursor->key_end = mrec->leaf.base;
668         cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE;
669         cursor->flags |= HAMMER_CURSOR_BACKEND;
670         error = hammer_mirror_delete_to(cursor, mirror);
671
672         /*
673          * Locate the record and get past it by setting ATEDISK.  Perform
674          * any necessary deletions.  We have no data payload and cannot
675          * create a new record.
676          */
677         if (error == 0) {
678                 mirror->key_cur = mrec->leaf.base;
679                 cursor->key_beg = mrec->leaf.base;
680                 cursor->flags |= HAMMER_CURSOR_BACKEND;
681                 cursor->flags &= ~HAMMER_CURSOR_INSERT;
682                 error = hammer_btree_lookup(cursor);
683                 if (error == 0) {
684                         if (hammer_mirror_check(cursor, mrec))
685                                 error = hammer_mirror_update(cursor, mrec);
686                         cursor->flags |= HAMMER_CURSOR_ATEDISK;
687                 } else {
688                         cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
689                 }
690                 if (error == ENOENT)
691                         error = 0;
692         }
693         return(error);
694 }
695
696 /*
697  * As part of the mirror write we iterate across swaths of records
698  * on the target which no longer exist on the source, and mark them
699  * deleted.
700  *
701  * The caller has indexed the cursor and set up key_end.  We iterate
702  * through to key_end.
703  *
704  * There is an edge case where the master has deleted a record whos
705  * create_tid exactly matches our end_tid.  We cannot delete this
706  * record on the slave yet because we cannot assign delete_tid == create_tid.
707  * The deletion should be picked up on the next sequence since in order
708  * to have been deleted on the master a transaction must have occured with
709  * a TID greater then the create_tid of the record.
710  *
711  * To support incremental re-mirroring, just for robustness, we do not
712  * touch any records created beyond (or equal to) mirror->tid_end.
713  */
714 static
715 int
716 hammer_mirror_delete_to(hammer_cursor_t cursor,
717                        struct hammer_ioc_mirror_rw *mirror)
718 {
719         hammer_btree_leaf_elm_t elm;
720         int error;
721
722         error = hammer_btree_iterate(cursor);
723         while (error == 0) {
724                 elm = &cursor->node->ondisk->elms[cursor->index].leaf;
725                 KKASSERT(elm->base.btype == HAMMER_BTREE_TYPE_RECORD);
726                 cursor->flags |= HAMMER_CURSOR_ATEDISK;
727
728                 /*
729                  * Note: Must still delete records with create_tid < tid_beg,
730                  *       as record may have been pruned-away on source.
731                  */
732                 if (elm->base.delete_tid == 0 &&
733                     elm->base.create_tid < mirror->tid_end) {
734                         error = hammer_delete_at_cursor(cursor,
735                                                         HAMMER_DELETE_ADJUST,
736                                                         mirror->tid_end,
737                                                         time_second,
738                                                         1, NULL);
739                 }
740                 if (error == 0)
741                         error = hammer_btree_iterate(cursor);
742         }
743         if (error == ENOENT)
744                 error = 0;
745         return(error);
746 }
747
748 /*
749  * Check whether an update is needed in the case where a match already
750  * exists on the target.  The only type of update allowed in this case
751  * is an update of the delete_tid.
752  *
753  * Return non-zero if the update should proceed.
754  */
755 static
756 int
757 hammer_mirror_check(hammer_cursor_t cursor, struct hammer_ioc_mrecord_rec *mrec)
758 {
759         hammer_btree_leaf_elm_t leaf = cursor->leaf;
760
761         if (leaf->base.delete_tid != mrec->leaf.base.delete_tid) {
762                 if (mrec->leaf.base.delete_tid != 0)
763                         return(1);
764         }
765         return(0);
766 }
767
768 /*
769  * Update a record in-place.  Only the delete_tid can change, and
770  * only from zero to non-zero.
771  */
772 static
773 int
774 hammer_mirror_update(hammer_cursor_t cursor,
775                      struct hammer_ioc_mrecord_rec *mrec)
776 {
777         int error;
778
779         /*
780          * This case shouldn't occur.
781          */
782         if (mrec->leaf.base.delete_tid == 0)
783                 return(0);
784
785         /*
786          * Mark the record deleted on the mirror target.
787          */
788         error = hammer_delete_at_cursor(cursor, HAMMER_DELETE_ADJUST,
789                                         mrec->leaf.base.delete_tid,
790                                         mrec->leaf.delete_ts,
791                                         1, NULL);
792         cursor->flags |= HAMMER_CURSOR_ATEDISK;
793         return(error);
794 }
795
796 /*
797  * Write out a new record.
798  */
799 static
800 int
801 hammer_mirror_write(hammer_cursor_t cursor,
802                     struct hammer_ioc_mrecord_rec *mrec,
803                     char *udata)
804 {
805         hammer_transaction_t trans;
806         hammer_buffer_t data_buffer;
807         hammer_off_t ndata_offset;
808         hammer_tid_t high_tid;
809         void *ndata;
810         int error;
811         int doprop;
812
813         trans = cursor->trans;
814         data_buffer = NULL;
815
816         /*
817          * Get the sync lock so the whole mess is atomic
818          */
819         hammer_sync_lock_sh(trans);
820
821         /*
822          * Allocate and adjust data
823          */
824         if (mrec->leaf.data_len && mrec->leaf.data_offset) {
825                 ndata = hammer_alloc_data(trans, mrec->leaf.data_len,
826                                           mrec->leaf.base.rec_type,
827                                           &ndata_offset, &data_buffer,
828                                           0, &error);
829                 if (ndata == NULL)
830                         return(error);
831                 mrec->leaf.data_offset = ndata_offset;
832                 hammer_modify_buffer(trans, data_buffer, NULL, 0);
833                 error = copyin(udata, ndata, mrec->leaf.data_len);
834                 if (error == 0) {
835                         if (hammer_crc_test_leaf(ndata, &mrec->leaf) == 0) {
836                                 kprintf("data crc mismatch on pipe\n");
837                                 error = EINVAL;
838                         } else {
839                                 error = hammer_mirror_localize_data(
840                                                         ndata, &mrec->leaf);
841                         }
842                 }
843                 hammer_modify_buffer_done(data_buffer);
844         } else {
845                 mrec->leaf.data_offset = 0;
846                 error = 0;
847                 ndata = NULL;
848         }
849         if (error)
850                 goto failed;
851
852         /*
853          * Do the insertion.  This can fail with a EDEADLK or EALREADY
854          */
855         cursor->flags |= HAMMER_CURSOR_INSERT;
856         error = hammer_btree_lookup(cursor);
857         if (error != ENOENT) {
858                 if (error == 0)
859                         error = EALREADY;
860                 goto failed;
861         }
862
863         error = hammer_btree_insert(cursor, &mrec->leaf, &doprop);
864
865         /*
866          * Cursor is left on the current element, we want to skip it now.
867          */
868         cursor->flags |= HAMMER_CURSOR_ATEDISK;
869         cursor->flags &= ~HAMMER_CURSOR_INSERT;
870
871         /*
872          * Track a count of active inodes.
873          */
874         if (error == 0 &&
875             mrec->leaf.base.rec_type == HAMMER_RECTYPE_INODE &&
876             mrec->leaf.base.delete_tid == 0) {
877                 hammer_modify_volume_field(trans,
878                                            trans->rootvol,
879                                            vol0_stat_inodes);
880                 ++trans->hmp->rootvol->ondisk->vol0_stat_inodes;
881                 hammer_modify_volume_done(trans->rootvol);
882         }
883
884         /*
885          * vol0_next_tid must track the highest TID stored in the filesystem.
886          * We do not need to generate undo for this update.
887          */
888         high_tid = mrec->leaf.base.create_tid;
889         if (high_tid < mrec->leaf.base.delete_tid)
890                 high_tid = mrec->leaf.base.delete_tid;
891         if (trans->rootvol->ondisk->vol0_next_tid < high_tid) {
892                 hammer_modify_volume(trans, trans->rootvol, NULL, 0);
893                 trans->rootvol->ondisk->vol0_next_tid = high_tid;
894                 hammer_modify_volume_done(trans->rootvol);
895         }
896
897         if (error == 0 && doprop)
898                 hammer_btree_do_propagation(cursor, NULL, &mrec->leaf);
899
900 failed:
901         /*
902          * Cleanup
903          */
904         if (error && mrec->leaf.data_offset) {
905                 hammer_blockmap_free(cursor->trans,
906                                      mrec->leaf.data_offset,
907                                      mrec->leaf.data_len);
908         }
909         hammer_sync_unlock(trans);
910         if (data_buffer)
911                 hammer_rel_buffer(data_buffer, 0);
912         return(error);
913 }
914
915 /*
916  * Localize the data payload.  Directory entries may need their
917  * localization adjusted.
918  *
919  * PFS directory entries must be skipped entirely (return EALREADY).
920  */
921 static
922 int
923 hammer_mirror_localize_data(hammer_data_ondisk_t data,
924                             hammer_btree_leaf_elm_t leaf)
925 {
926         u_int32_t localization;
927
928         if (leaf->base.rec_type == HAMMER_RECTYPE_DIRENTRY) {
929                 if (data->entry.obj_id == HAMMER_OBJID_ROOT)
930                         return(EALREADY);
931                 localization = leaf->base.localization &
932                                HAMMER_LOCALIZE_PSEUDOFS_MASK;
933                 if (data->entry.localization != localization) {
934                         data->entry.localization = localization;
935                         hammer_crc_set_leaf(data, leaf);
936                 }
937         }
938         return(0);
939 }
940