zfs: merge OpenZFS master-9305ff2ed
[freebsd.git] / sys / contrib / openzfs / cmd / zstream / zstream_redup.c
1 /*
2  * CDDL HEADER START
3  *
4  * This file and its contents are supplied under the terms of the
5  * Common Development and Distribution License ("CDDL"), version 1.0.
6  * You may only use this file in accordance with the terms of version
7  * 1.0 of the CDDL.
8  *
9  * A full copy of the text of the CDDL should have accompanied this
10  * source.  A copy of the CDDL is also available via the Internet at
11  * http://www.illumos.org/license/CDDL.
12  *
13  * CDDL HEADER END
14  */
15
16 /*
17  * Copyright (c) 2020 by Delphix. All rights reserved.
18  */
19
20 #include <assert.h>
21 #include <cityhash.h>
22 #include <ctype.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <libzfs_impl.h>
26 #include <libzfs.h>
27 #include <libzutil.h>
28 #include <stddef.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <strings.h>
32 #include <umem.h>
33 #include <unistd.h>
34 #include <sys/debug.h>
35 #include <sys/stat.h>
36 #include <sys/zfs_ioctl.h>
37 #include <sys/zio_checksum.h>
38 #include "zfs_fletcher.h"
39 #include "zstream.h"
40
41
42 #define MAX_RDT_PHYSMEM_PERCENT         20
43 #define SMALLEST_POSSIBLE_MAX_RDT_MB            128
44
45 typedef struct redup_entry {
46         struct redup_entry      *rde_next;
47         uint64_t rde_guid;
48         uint64_t rde_object;
49         uint64_t rde_offset;
50         uint64_t rde_stream_offset;
51 } redup_entry_t;
52
53 typedef struct redup_table {
54         redup_entry_t   **redup_hash_array;
55         umem_cache_t    *ddecache;
56         uint64_t        ddt_count;
57         int             numhashbits;
58 } redup_table_t;
59
60 int
61 highbit64(uint64_t i)
62 {
63         if (i == 0)
64                 return (0);
65
66         return (NBBY * sizeof (uint64_t) - __builtin_clzll(i));
67 }
68
69 static void *
70 safe_calloc(size_t n)
71 {
72         void *rv = calloc(1, n);
73         if (rv == NULL) {
74                 fprintf(stderr,
75                     "Error: could not allocate %u bytes of memory\n",
76                     (int)n);
77                 exit(1);
78         }
79         return (rv);
80 }
81
82 /*
83  * Safe version of fread(), exits on error.
84  */
85 static int
86 sfread(void *buf, size_t size, FILE *fp)
87 {
88         int rv = fread(buf, size, 1, fp);
89         if (rv == 0 && ferror(fp)) {
90                 (void) fprintf(stderr, "Error while reading file: %s\n",
91                     strerror(errno));
92                 exit(1);
93         }
94         return (rv);
95 }
96
97 /*
98  * Safe version of pread(), exits on error.
99  */
100 static void
101 spread(int fd, void *buf, size_t count, off_t offset)
102 {
103         ssize_t err = pread(fd, buf, count, offset);
104         if (err == -1) {
105                 (void) fprintf(stderr,
106                     "Error while reading file: %s\n",
107                     strerror(errno));
108                 exit(1);
109         } else if (err != count) {
110                 (void) fprintf(stderr,
111                     "Error while reading file: short read\n");
112                 exit(1);
113         }
114 }
115
116 static int
117 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
118     zio_cksum_t *zc, int outfd)
119 {
120         assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
121             == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
122         fletcher_4_incremental_native(drr,
123             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
124         if (drr->drr_type != DRR_BEGIN) {
125                 assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
126                     drr_checksum.drr_checksum));
127                 drr->drr_u.drr_checksum.drr_checksum = *zc;
128         }
129         fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
130             sizeof (zio_cksum_t), zc);
131         if (write(outfd, drr, sizeof (*drr)) == -1)
132                 return (errno);
133         if (payload_len != 0) {
134                 fletcher_4_incremental_native(payload, payload_len, zc);
135                 if (write(outfd, payload, payload_len) == -1)
136                         return (errno);
137         }
138         return (0);
139 }
140
141 static void
142 rdt_insert(redup_table_t *rdt,
143     uint64_t guid, uint64_t object, uint64_t offset, uint64_t stream_offset)
144 {
145         uint64_t ch = cityhash4(guid, object, offset, 0);
146         uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);
147         redup_entry_t **rdepp;
148
149         rdepp = &(rdt->redup_hash_array[hashcode]);
150         redup_entry_t *rde = umem_cache_alloc(rdt->ddecache, UMEM_NOFAIL);
151         rde->rde_next = *rdepp;
152         rde->rde_guid = guid;
153         rde->rde_object = object;
154         rde->rde_offset = offset;
155         rde->rde_stream_offset = stream_offset;
156         *rdepp = rde;
157         rdt->ddt_count++;
158 }
159
160 static void
161 rdt_lookup(redup_table_t *rdt,
162     uint64_t guid, uint64_t object, uint64_t offset,
163     uint64_t *stream_offsetp)
164 {
165         uint64_t ch = cityhash4(guid, object, offset, 0);
166         uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);
167
168         for (redup_entry_t *rde = rdt->redup_hash_array[hashcode];
169             rde != NULL; rde = rde->rde_next) {
170                 if (rde->rde_guid == guid &&
171                     rde->rde_object == object &&
172                     rde->rde_offset == offset) {
173                         *stream_offsetp = rde->rde_stream_offset;
174                         return;
175                 }
176         }
177         assert(!"could not find expected redup table entry");
178 }
179
180 /*
181  * Convert a dedup stream (generated by "zfs send -D") to a
182  * non-deduplicated stream.  The entire infd will be converted, including
183  * any substreams in a stream package (generated by "zfs send -RD"). The
184  * infd must be seekable.
185  */
186 static void
187 zfs_redup_stream(int infd, int outfd, boolean_t verbose)
188 {
189         int bufsz = SPA_MAXBLOCKSIZE;
190         dmu_replay_record_t thedrr = { 0 };
191         dmu_replay_record_t *drr = &thedrr;
192         redup_table_t rdt;
193         zio_cksum_t stream_cksum;
194         uint64_t numbuckets;
195         uint64_t num_records = 0;
196         uint64_t num_write_byref_records = 0;
197
198 #ifdef _ILP32
199         uint64_t max_rde_size = SMALLEST_POSSIBLE_MAX_RDT_MB << 20;
200 #else
201         uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
202         uint64_t max_rde_size =
203             MAX((physmem * MAX_RDT_PHYSMEM_PERCENT) / 100,
204             SMALLEST_POSSIBLE_MAX_RDT_MB << 20);
205 #endif
206
207         numbuckets = max_rde_size / (sizeof (redup_entry_t));
208
209         /*
210          * numbuckets must be a power of 2.  Increase number to
211          * a power of 2 if necessary.
212          */
213         if (!ISP2(numbuckets))
214                 numbuckets = 1ULL << highbit64(numbuckets);
215
216         rdt.redup_hash_array =
217             safe_calloc(numbuckets * sizeof (redup_entry_t *));
218         rdt.ddecache = umem_cache_create("rde", sizeof (redup_entry_t), 0,
219             NULL, NULL, NULL, NULL, NULL, 0);
220         rdt.numhashbits = highbit64(numbuckets) - 1;
221         rdt.ddt_count = 0;
222
223         char *buf = safe_calloc(bufsz);
224         FILE *ofp = fdopen(infd, "r");
225         long offset = ftell(ofp);
226         while (sfread(drr, sizeof (*drr), ofp) != 0) {
227                 num_records++;
228
229                 /*
230                  * We need to regenerate the checksum.
231                  */
232                 if (drr->drr_type != DRR_BEGIN) {
233                         bzero(&drr->drr_u.drr_checksum.drr_checksum,
234                             sizeof (drr->drr_u.drr_checksum.drr_checksum));
235                 }
236
237                 uint64_t payload_size = 0;
238                 switch (drr->drr_type) {
239                 case DRR_BEGIN:
240                 {
241                         struct drr_begin *drrb = &drr->drr_u.drr_begin;
242                         int fflags;
243                         ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
244
245                         assert(drrb->drr_magic == DMU_BACKUP_MAGIC);
246
247                         /* clear the DEDUP feature flag for this stream */
248                         fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
249                         fflags &= ~(DMU_BACKUP_FEATURE_DEDUP |
250                             DMU_BACKUP_FEATURE_DEDUPPROPS);
251                         /* cppcheck-suppress syntaxError */
252                         DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);
253
254                         int sz = drr->drr_payloadlen;
255                         if (sz != 0) {
256                                 if (sz > bufsz) {
257                                         free(buf);
258                                         buf = safe_calloc(sz);
259                                         bufsz = sz;
260                                 }
261                                 (void) sfread(buf, sz, ofp);
262                         }
263                         payload_size = sz;
264                         break;
265                 }
266
267                 case DRR_END:
268                 {
269                         struct drr_end *drre = &drr->drr_u.drr_end;
270                         /*
271                          * Use the recalculated checksum, unless this is
272                          * the END record of a stream package, which has
273                          * no checksum.
274                          */
275                         if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
276                                 drre->drr_checksum = stream_cksum;
277                         break;
278                 }
279
280                 case DRR_OBJECT:
281                 {
282                         struct drr_object *drro = &drr->drr_u.drr_object;
283
284                         if (drro->drr_bonuslen > 0) {
285                                 payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
286                                 (void) sfread(buf, payload_size, ofp);
287                         }
288                         break;
289                 }
290
291                 case DRR_SPILL:
292                 {
293                         struct drr_spill *drrs = &drr->drr_u.drr_spill;
294                         payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
295                         (void) sfread(buf, payload_size, ofp);
296                         break;
297                 }
298
299                 case DRR_WRITE_BYREF:
300                 {
301                         struct drr_write_byref drrwb =
302                             drr->drr_u.drr_write_byref;
303
304                         num_write_byref_records++;
305
306                         /*
307                          * Look up in hash table by drrwb->drr_refguid,
308                          * drr_refobject, drr_refoffset.  Replace this
309                          * record with the found WRITE record, but with
310                          * drr_object,drr_offset,drr_toguid replaced with ours.
311                          */
312                         uint64_t stream_offset = 0;
313                         rdt_lookup(&rdt, drrwb.drr_refguid,
314                             drrwb.drr_refobject, drrwb.drr_refoffset,
315                             &stream_offset);
316
317                         spread(infd, drr, sizeof (*drr), stream_offset);
318
319                         assert(drr->drr_type == DRR_WRITE);
320                         struct drr_write *drrw = &drr->drr_u.drr_write;
321                         assert(drrw->drr_toguid == drrwb.drr_refguid);
322                         assert(drrw->drr_object == drrwb.drr_refobject);
323                         assert(drrw->drr_offset == drrwb.drr_refoffset);
324
325                         payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
326                         spread(infd, buf, payload_size,
327                             stream_offset + sizeof (*drr));
328
329                         drrw->drr_toguid = drrwb.drr_toguid;
330                         drrw->drr_object = drrwb.drr_object;
331                         drrw->drr_offset = drrwb.drr_offset;
332                         break;
333                 }
334
335                 case DRR_WRITE:
336                 {
337                         struct drr_write *drrw = &drr->drr_u.drr_write;
338                         payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
339                         (void) sfread(buf, payload_size, ofp);
340
341                         rdt_insert(&rdt, drrw->drr_toguid,
342                             drrw->drr_object, drrw->drr_offset, offset);
343                         break;
344                 }
345
346                 case DRR_WRITE_EMBEDDED:
347                 {
348                         struct drr_write_embedded *drrwe =
349                             &drr->drr_u.drr_write_embedded;
350                         payload_size =
351                             P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
352                         (void) sfread(buf, payload_size, ofp);
353                         break;
354                 }
355
356                 case DRR_FREEOBJECTS:
357                 case DRR_FREE:
358                 case DRR_OBJECT_RANGE:
359                         break;
360
361                 default:
362                         (void) fprintf(stderr, "INVALID record type 0x%x\n",
363                             drr->drr_type);
364                         /* should never happen, so assert */
365                         assert(B_FALSE);
366                 }
367
368                 if (feof(ofp)) {
369                         fprintf(stderr, "Error: unexpected end-of-file\n");
370                         exit(1);
371                 }
372                 if (ferror(ofp)) {
373                         fprintf(stderr, "Error while reading file: %s\n",
374                             strerror(errno));
375                         exit(1);
376                 }
377
378                 /*
379                  * We need to recalculate the checksum, and it needs to be
380                  * initially zero to do that.  BEGIN records don't have
381                  * a checksum.
382                  */
383                 if (drr->drr_type != DRR_BEGIN) {
384                         bzero(&drr->drr_u.drr_checksum.drr_checksum,
385                             sizeof (drr->drr_u.drr_checksum.drr_checksum));
386                 }
387                 if (dump_record(drr, buf, payload_size,
388                     &stream_cksum, outfd) != 0)
389                         break;
390                 if (drr->drr_type == DRR_END) {
391                         /*
392                          * Typically the END record is either the last
393                          * thing in the stream, or it is followed
394                          * by a BEGIN record (which also zeros the checksum).
395                          * However, a stream package ends with two END
396                          * records.  The last END record's checksum starts
397                          * from zero.
398                          */
399                         ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
400                 }
401                 offset = ftell(ofp);
402         }
403
404         if (verbose) {
405                 char mem_str[16];
406                 zfs_nicenum(rdt.ddt_count * sizeof (redup_entry_t),
407                     mem_str, sizeof (mem_str));
408                 fprintf(stderr, "converted stream with %llu total records, "
409                     "including %llu dedup records, using %sB memory.\n",
410                     (long long)num_records,
411                     (long long)num_write_byref_records,
412                     mem_str);
413         }
414
415         umem_cache_destroy(rdt.ddecache);
416         free(rdt.redup_hash_array);
417         free(buf);
418         (void) fclose(ofp);
419 }
420
421 int
422 zstream_do_redup(int argc, char *argv[])
423 {
424         boolean_t verbose = B_FALSE;
425         int c;
426
427         while ((c = getopt(argc, argv, "v")) != -1) {
428                 switch (c) {
429                 case 'v':
430                         verbose = B_TRUE;
431                         break;
432                 case '?':
433                         (void) fprintf(stderr, "invalid option '%c'\n",
434                             optopt);
435                         zstream_usage();
436                         break;
437                 }
438         }
439
440         argc -= optind;
441         argv += optind;
442
443         if (argc != 1)
444                 zstream_usage();
445
446         const char *filename = argv[0];
447
448         if (isatty(STDOUT_FILENO)) {
449                 (void) fprintf(stderr,
450                     "Error: Stream can not be written to a terminal.\n"
451                     "You must redirect standard output.\n");
452                 return (1);
453         }
454
455         int fd = open(filename, O_RDONLY);
456         if (fd == -1) {
457                 (void) fprintf(stderr,
458                     "Error while opening file '%s': %s\n",
459                     filename, strerror(errno));
460                 exit(1);
461         }
462
463         fletcher_4_init();
464         zfs_redup_stream(fd, STDOUT_FILENO, verbose);
465         fletcher_4_fini();
466
467         close(fd);
468
469         return (0);
470 }