Initial import of binutils 2.22 on the new vendor branch
[dragonfly.git] / contrib / lvm2 / dist / daemons / cmirrord / functions.c
1 /*      $NetBSD: functions.c,v 1.1.1.1 2009/12/02 00:27:10 haad Exp $   */
2
3 /*
4  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
5  *
6  * This copyrighted material is made available to anyone wishing to use,
7  * modify, copy, or redistribute it subject to the terms and conditions
8  * of the GNU Lesser General Public License v.2.1.
9  *
10  * You should have received a copy of the GNU Lesser General Public License
11  * along with this program; if not, write to the Free Software Foundation,
12  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
13  */
14 #define _GNU_SOURCE
15 #define _FILE_OFFSET_BITS 64
16
17 #include <stdint.h>
18 #include <errno.h>
19 #include <string.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <dirent.h>
23 #include <unistd.h>
24 #include <signal.h>
25 #include <linux/kdev_t.h>
26 //#define __USE_GNU /* for O_DIRECT */
27 #include <fcntl.h>
28 #include <time.h>
29 #include "libdevmapper.h"
30 #include "dm-log-userspace.h"
31 #include "functions.h"
32 #include "common.h"
33 #include "cluster.h"
34 #include "logging.h"
35
36 #define BYTE_SHIFT 3
37
38 /*
39  * Magic for persistent mirrors: "MiRr"
40  * Following on-disk header information is stolen from
41  * drivers/md/dm-log.c
42  */
43 #define MIRROR_MAGIC 0x4D695272
44 #define MIRROR_DISK_VERSION 2
45 #define LOG_OFFSET 2
46
47 #define RESYNC_HISTORY 50
48 //static char resync_history[RESYNC_HISTORY][128];
49 //static int idx = 0;
50 #define LOG_SPRINT(_lc, f, arg...) do {                                 \
51                 lc->idx++;                                              \
52                 lc->idx = lc->idx % RESYNC_HISTORY;                     \
53                 sprintf(lc->resync_history[lc->idx], f, ## arg);        \
54         } while (0)
55
56 struct log_header {
57         uint32_t magic;
58         uint32_t version;
59         uint64_t nr_regions;
60 };
61
62 struct log_c {
63         struct dm_list list;
64
65         char uuid[DM_UUID_LEN];
66         uint64_t luid;
67
68         time_t delay; /* limits how fast a resume can happen after suspend */
69         int touched;
70         uint32_t region_size;
71         uint32_t region_count;
72         uint64_t sync_count;
73
74         dm_bitset_t clean_bits;
75         dm_bitset_t sync_bits;
76         uint32_t recoverer;
77         uint64_t recovering_region; /* -1 means not recovering */
78         uint64_t skip_bit_warning; /* used to warn if region skipped */
79         int sync_search;
80
81         int resume_override;
82
83         uint32_t block_on_error;
84         enum sync {
85                 DEFAULTSYNC,    /* Synchronize if necessary */
86                 NOSYNC,         /* Devices known to be already in sync */
87                 FORCESYNC,      /* Force a sync to happen */
88         } sync;
89
90         uint32_t state;         /* current operational state of the log */
91
92         struct dm_list mark_list;
93
94         uint32_t recovery_halted;
95         struct recovery_request *recovery_request_list;
96
97         int disk_fd;            /* -1 means no disk log */
98         int log_dev_failed;
99         uint64_t disk_nr_regions;
100         size_t disk_size;       /* size of disk_buffer in bytes */
101         void *disk_buffer;      /* aligned memory for O_DIRECT */
102         int idx;
103         char resync_history[RESYNC_HISTORY][128];
104 };
105
106 struct mark_entry {
107         struct dm_list list;
108         uint32_t nodeid;
109         uint64_t region;
110 };
111
112 struct recovery_request {
113         uint64_t region;
114         struct recovery_request *next;
115 };
116
117 static DM_LIST_INIT(log_list);
118 static DM_LIST_INIT(log_pending_list);
119
120 static int log_test_bit(dm_bitset_t bs, int bit)
121 {
122         return dm_bit(bs, bit);
123 }
124
125 static void log_set_bit(struct log_c *lc, dm_bitset_t bs, int bit)
126 {
127         dm_bit_set(bs, bit);
128         lc->touched = 1;
129 }
130
131 static void log_clear_bit(struct log_c *lc, dm_bitset_t bs, int bit)
132 {
133         dm_bit_clear(bs, bit);
134         lc->touched = 1;
135 }
136
137 static int find_next_zero_bit(dm_bitset_t bs, int start)
138 {
139         while (dm_bit(bs, start++))
140                 if (start >= (int)bs[0])
141                         return -1;
142
143         return start - 1;
144 }
145
146 static uint64_t count_bits32(dm_bitset_t bs)
147 {
148         int i, size = ((int)bs[0]/DM_BITS_PER_INT + 1);
149         unsigned count = 0;
150
151         for (i = 1; i <= size; i++)
152                 count += hweight32(bs[i]);
153
154         return (uint64_t)count;
155 }
156
157 /*
158  * get_log
159  *
160  * Returns: log if found, NULL otherwise
161  */
162 static struct log_c *get_log(const char *uuid, uint64_t luid)
163 {
164         struct log_c *lc;
165
166         dm_list_iterate_items(lc, &log_list)
167                 if (!strcmp(lc->uuid, uuid) &&
168                     (!luid || (luid == lc->luid)))
169                         return lc;
170
171         return NULL;
172 }
173
174 /*
175  * get_pending_log
176  *
177  * Pending logs are logs that have been 'clog_ctr'ed, but
178  * have not joined the CPG (via clog_resume).
179  *
180  * Returns: log if found, NULL otherwise
181  */
182 static struct log_c *get_pending_log(const char *uuid, uint64_t luid)
183 {
184         struct log_c *lc;
185
186         dm_list_iterate_items(lc, &log_pending_list)
187                 if (!strcmp(lc->uuid, uuid) &&
188                     (!luid || (luid == lc->luid)))
189                         return lc;
190
191         return NULL;
192 }
193
194 static void header_to_disk(struct log_header *mem, struct log_header *disk)
195 {
196         memcpy(disk, mem, sizeof(struct log_header));
197 }
198
199 static void header_from_disk(struct log_header *mem, struct log_header *disk)
200 {
201         memcpy(mem, disk, sizeof(struct log_header));
202 }
203
204 static int rw_log(struct log_c *lc, int do_write)
205 {
206         int r;
207
208         r = lseek(lc->disk_fd, 0, SEEK_SET);
209         if (r < 0) {
210                 LOG_ERROR("[%s] rw_log:  lseek failure: %s",
211                           SHORT_UUID(lc->uuid), strerror(errno));
212                 return -errno;
213         }
214
215         if (do_write) {
216                 r = write(lc->disk_fd, lc->disk_buffer, lc->disk_size);
217                 if (r < 0) {
218                         LOG_ERROR("[%s] rw_log:  write failure: %s",
219                                   SHORT_UUID(lc->uuid), strerror(errno));
220                         return -EIO; /* Failed disk write */
221                 }
222                 return 0;
223         }
224
225         /* Read */
226         r = read(lc->disk_fd, lc->disk_buffer, lc->disk_size);
227         if (r < 0)
228                 LOG_ERROR("[%s] rw_log:  read failure: %s",
229                           SHORT_UUID(lc->uuid), strerror(errno));
230         if (r != lc->disk_size)
231                 return -EIO; /* Failed disk read */
232         return 0;
233 }
234
235 /*
236  * read_log
237  * @lc
238  *
239  * Valid return codes:
240  *   -EINVAL:  Invalid header, bits not copied
241  *   -EIO:     Unable to read disk log
242  *    0:       Valid header, disk bit -> lc->clean_bits
243  *
244  * Returns: 0 on success, -EXXX on failure
245  */
246 static int read_log(struct log_c *lc)
247 {
248         struct log_header lh;
249         size_t bitset_size;
250
251         memset(&lh, 0, sizeof(struct log_header));
252
253         if (rw_log(lc, 0))
254                 return -EIO; /* Failed disk read */
255
256         header_from_disk(&lh, lc->disk_buffer);
257         if (lh.magic != MIRROR_MAGIC)
258                 return -EINVAL;
259
260         lc->disk_nr_regions = lh.nr_regions;
261
262         /* Read disk bits into sync_bits */
263         bitset_size = lc->region_count / 8;
264         bitset_size += (lc->region_count % 8) ? 1 : 0;
265         memcpy(lc->clean_bits, lc->disk_buffer + 1024, bitset_size);
266
267         return 0;
268 }
269
270 /*
271  * write_log
272  * @lc
273  *
274  * Returns: 0 on success, -EIO on failure
275  */
276 static int write_log(struct log_c *lc)
277 {
278         struct log_header lh;
279         size_t bitset_size;
280
281         lh.magic = MIRROR_MAGIC;
282         lh.version = MIRROR_DISK_VERSION;
283         lh.nr_regions = lc->region_count;
284
285         header_to_disk(&lh, lc->disk_buffer);
286
287         /* Write disk bits from clean_bits */
288         bitset_size = lc->region_count / 8;
289         bitset_size += (lc->region_count % 8) ? 1 : 0;
290         memcpy(lc->disk_buffer + 1024, lc->clean_bits, bitset_size);
291
292         if (rw_log(lc, 1)) {
293                 lc->log_dev_failed = 1;
294                 return -EIO; /* Failed disk write */
295         }
296         return 0;
297 }
298
299 static int find_disk_path(char *major_minor_str, char *path_rtn, int *unlink_path)
300 {
301         int r;
302         DIR *dp;
303         struct dirent *dep;
304         struct stat statbuf;
305         int major, minor;
306
307         if (!strstr(major_minor_str, ":")) {
308                 r = stat(major_minor_str, &statbuf);
309                 if (r)
310                         return -errno;
311                 if (!S_ISBLK(statbuf.st_mode))
312                         return -EINVAL;
313                 sprintf(path_rtn, "%s", major_minor_str);
314                 return 0;
315         }
316
317         r = sscanf(major_minor_str, "%d:%d", &major, &minor);
318         if (r != 2)
319                 return -EINVAL;
320
321         LOG_DBG("Checking /dev/mapper for device %d:%d", major, minor);
322         /* Check /dev/mapper dir */
323         dp = opendir("/dev/mapper");
324         if (!dp)
325                 return -ENOENT;
326
327         while ((dep = readdir(dp)) != NULL) {
328                 /*
329                  * FIXME: This is racy.  By the time the path is used,
330                  * it may point to something else.  'fstat' will be
331                  * required upon opening to ensure we got what we
332                  * wanted.
333                  */
334
335                 sprintf(path_rtn, "/dev/mapper/%s", dep->d_name);
336                 stat(path_rtn, &statbuf);
337                 if (S_ISBLK(statbuf.st_mode) &&
338                     (major(statbuf.st_rdev) == major) &&
339                     (minor(statbuf.st_rdev) == minor)) {
340                         LOG_DBG("  %s: YES", dep->d_name);
341                         closedir(dp);
342                         return 0;
343                 } else {
344                         LOG_DBG("  %s: NO", dep->d_name);
345                 }
346         }
347
348         closedir(dp);
349
350         LOG_DBG("Path not found for %d/%d", major, minor);
351         LOG_DBG("Creating /dev/mapper/%d-%d", major, minor);
352         sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor);
353         r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor));
354
355         /*
356          * If we have to make the path, we unlink it after we open it
357          */
358         *unlink_path = 1;
359
360         return r ? -errno : 0;
361 }
362
363 static int _clog_ctr(char *uuid, uint64_t luid,
364                      int argc, char **argv, uint64_t device_size)
365 {
366         int i;
367         int r = 0;
368         char *p;
369         uint64_t region_size;
370         uint64_t region_count;
371         struct log_c *lc = NULL;
372         struct log_c *duplicate;
373         enum sync sync = DEFAULTSYNC;
374         uint32_t block_on_error = 0;
375
376         int disk_log = 0;
377         char disk_path[128];
378         int unlink_path = 0;
379         size_t page_size;
380         int pages;
381
382         /* If core log request, then argv[0] will be region_size */
383         if (!strtoll(argv[0], &p, 0) || *p) {
384                 disk_log = 1;
385
386                 if ((argc < 2) || (argc > 4)) {
387                         LOG_ERROR("Too %s arguments to clustered_disk log type",
388                                   (argc < 3) ? "few" : "many");
389                         r = -EINVAL;
390                         goto fail;
391                 }
392
393                 r = find_disk_path(argv[0], disk_path, &unlink_path);
394                 if (r) {
395                         LOG_ERROR("Unable to find path to device %s", argv[0]);
396                         goto fail;
397                 }
398                 LOG_DBG("Clustered log disk is %s", disk_path);
399         } else {
400                 disk_log = 0;
401
402                 if ((argc < 1) || (argc > 3)) {
403                         LOG_ERROR("Too %s arguments to clustered_core log type",
404                                   (argc < 2) ? "few" : "many");
405                         r = -EINVAL;
406                         goto fail;
407                 }
408         }
409
410         if (!(region_size = strtoll(argv[disk_log], &p, 0)) || *p) {
411                 LOG_ERROR("Invalid region_size argument to clustered_%s log type",
412                           (disk_log) ? "disk" : "core");
413                 r = -EINVAL;
414                 goto fail;
415         }
416
417         region_count = device_size / region_size;
418         if (device_size % region_size) {
419                 /*
420                  * I can't remember if device_size must be a multiple
421                  * of region_size, so check it anyway.
422                  */
423                 region_count++;
424         }
425
426         for (i = 0; i < argc; i++) {
427                 if (!strcmp(argv[i], "sync"))
428                         sync = FORCESYNC;
429                 else if (!strcmp(argv[i], "nosync"))
430                         sync = NOSYNC;
431                 else if (!strcmp(argv[i], "block_on_error"))
432                         block_on_error = 1;
433         }
434
435         lc = malloc(sizeof(*lc));
436         if (!lc) {
437                 LOG_ERROR("Unable to allocate cluster log context");
438                 r = -ENOMEM;
439                 goto fail;
440         }
441         memset(lc, 0, sizeof(*lc));
442
443         lc->region_size = region_size;
444         lc->region_count = region_count;
445         lc->sync = sync;
446         lc->block_on_error = block_on_error;
447         lc->sync_search = 0;
448         lc->recovering_region = (uint64_t)-1;
449         lc->skip_bit_warning = region_count;
450         lc->disk_fd = -1;
451         lc->log_dev_failed = 0;
452         strncpy(lc->uuid, uuid, DM_UUID_LEN);
453         lc->luid = luid;
454
455         if ((duplicate = get_log(lc->uuid, lc->luid)) ||
456             (duplicate = get_pending_log(lc->uuid, lc->luid))) {
457                 LOG_ERROR("[%s/%llu] Log already exists, unable to create.",
458                           SHORT_UUID(lc->uuid), lc->luid);
459                 free(lc);
460                 return -EINVAL;
461         }
462
463         dm_list_init(&lc->mark_list);
464
465         lc->clean_bits = dm_bitset_create(NULL, region_count);
466         if (!lc->clean_bits) {
467                 LOG_ERROR("Unable to allocate clean bitset");
468                 r = -ENOMEM;
469                 goto fail;
470         }
471
472         lc->sync_bits = dm_bitset_create(NULL, region_count);
473         if (!lc->sync_bits) {
474                 LOG_ERROR("Unable to allocate sync bitset");
475                 r = -ENOMEM;
476                 goto fail;
477         }
478         if (sync == NOSYNC)
479                 dm_bit_set_all(lc->sync_bits);
480
481         lc->sync_count = (sync == NOSYNC) ? region_count : 0;
482         if (disk_log) {
483                 page_size = sysconf(_SC_PAGESIZE);
484                 pages = ((int)lc->clean_bits[0])/page_size;
485                 pages += ((int)lc->clean_bits[0])%page_size ? 1 : 0;
486                 pages += 1; /* for header */
487
488                 r = open(disk_path, O_RDWR | O_DIRECT);
489                 if (r < 0) {
490                         LOG_ERROR("Unable to open log device, %s: %s",
491                                   disk_path, strerror(errno));
492                         r = errno;
493                         goto fail;
494                 }
495                 if (unlink_path)
496                         unlink(disk_path);
497
498                 lc->disk_fd = r;
499                 lc->disk_size = pages * page_size;
500
501                 r = posix_memalign(&(lc->disk_buffer), page_size,
502                                    lc->disk_size);
503                 if (r) {
504                         LOG_ERROR("Unable to allocate memory for disk_buffer");
505                         goto fail;
506                 }
507                 memset(lc->disk_buffer, 0, lc->disk_size);
508                 LOG_DBG("Disk log ready");
509         }
510
511         dm_list_add(&log_pending_list, &lc->list);
512
513         return 0;
514 fail:
515         if (lc) {
516                 if (lc->clean_bits)
517                         free(lc->clean_bits);
518                 if (lc->sync_bits)
519                         free(lc->sync_bits);
520                 if (lc->disk_buffer)
521                         free(lc->disk_buffer);
522                 if (lc->disk_fd >= 0)
523                         close(lc->disk_fd);
524                 free(lc);
525         }
526         return r;
527 }
528
529 /*
530  * clog_ctr
531  * @rq
532  *
533  * rq->data should contain constructor string as follows:
534  *      <log_type> [disk] <region_size> [[no]sync] <device_len>
535  * The kernel is responsible for adding the <dev_len> argument
536  * to the end; otherwise, we cannot compute the region_count.
537  *
538  * FIXME: Currently relies on caller to fill in rq->error
539  */
540 static int clog_dtr(struct dm_ulog_request *rq);
541 static int clog_ctr(struct dm_ulog_request *rq)
542 {
543         int argc, i, r = 0;
544         char *p, **argv = NULL;
545         char *dev_size_str;
546         uint64_t device_size;
547
548         /* Sanity checks */
549         if (!rq->data_size) {
550                 LOG_ERROR("Received constructor request with no data");
551                 return -EINVAL;
552         }
553
554         if (strlen(rq->data) > rq->data_size) {
555                 LOG_ERROR("Received constructor request with bad data");
556                 LOG_ERROR("strlen(rq->data)[%d] != rq->data_size[%llu]",
557                           (int)strlen(rq->data),
558                           (unsigned long long)rq->data_size);
559                 LOG_ERROR("rq->data = '%s' [%d]",
560                           rq->data, (int)strlen(rq->data));
561                 return -EINVAL;
562         }
563
564         /* Split up args */
565         for (argc = 0, p = rq->data; (p = strstr(p, " ")); p++, argc++)
566                 *p = '\0';
567
568         argv = malloc(argc * sizeof(char *));
569         if (!argv)
570                 return -ENOMEM;
571
572         p = dev_size_str = rq->data;
573         p += strlen(p) + 1;
574         for (i = 0; i < argc; i++, p = p + strlen(p) + 1)
575                 argv[i] = p;
576
577         if (strcmp(argv[0], "clustered_disk") &&
578             strcmp(argv[0], "clustered_core")) {
579                 LOG_ERROR("Unsupported userspace log type, \"%s\"", argv[0]);
580                 free(argv);
581                 return -EINVAL;
582         }
583
584         if (!(device_size = strtoll(dev_size_str, &p, 0)) || *p) {
585                 LOG_ERROR("Invalid device size argument: %s", dev_size_str);
586                 free(argv);
587                 return -EINVAL;
588         }
589
590         r = _clog_ctr(rq->uuid, rq->luid, argc - 1, argv + 1, device_size);
591
592         /* We join the CPG when we resume */
593
594         /* No returning data */
595         rq->data_size = 0;
596
597         if (r) {
598                 LOG_ERROR("Failed to create cluster log (%s)", rq->uuid);
599                 for (i = 0; i < argc; i++)
600                         LOG_ERROR("argv[%d] = %s", i, argv[i]);
601         }
602         else
603                 LOG_DBG("[%s] Cluster log created",
604                         SHORT_UUID(rq->uuid));
605
606         free(argv);
607         return r;
608 }
609
610 /*
611  * clog_dtr
612  * @rq
613  *
614  */
615 static int clog_dtr(struct dm_ulog_request *rq)
616 {
617         struct log_c *lc = get_log(rq->uuid, rq->luid);
618
619         if (lc) {
620                 /*
621                  * The log should not be on the official list.  There
622                  * should have been a suspend first.
623                  */
624                 LOG_ERROR("[%s] DTR before SUS: leaving CPG",
625                           SHORT_UUID(rq->uuid));
626                 destroy_cluster_cpg(rq->uuid);
627         } else if (!(lc = get_pending_log(rq->uuid, rq->luid))) {
628                 LOG_ERROR("clog_dtr called on log that is not official or pending");
629                 return -EINVAL;
630         }
631
632         LOG_DBG("[%s] Cluster log removed", SHORT_UUID(lc->uuid));
633
634         dm_list_del(&lc->list);
635         if (lc->disk_fd != -1)
636                 close(lc->disk_fd);
637         if (lc->disk_buffer)
638                 free(lc->disk_buffer);
639         free(lc->clean_bits);
640         free(lc->sync_bits);
641         free(lc);
642
643         return 0;
644 }
645
646 /*
647  * clog_presuspend
648  * @rq
649  *
650  */
651 static int clog_presuspend(struct dm_ulog_request *rq)
652 {
653         struct log_c *lc = get_log(rq->uuid, rq->luid);
654
655         if (!lc)
656                 return -EINVAL;
657
658         if (lc->touched)
659                 LOG_DBG("WARNING: log still marked as 'touched' during suspend");
660
661         lc->recovery_halted = 1;
662
663         return 0;
664 }
665
666 /*
667  * clog_postsuspend
668  * @rq
669  *
670  */
671 static int clog_postsuspend(struct dm_ulog_request *rq)
672 {
673         struct log_c *lc = get_log(rq->uuid, rq->luid);
674
675         if (!lc)
676                 return -EINVAL;
677
678         LOG_DBG("[%s] clog_postsuspend: leaving CPG", SHORT_UUID(lc->uuid));
679         destroy_cluster_cpg(rq->uuid);
680
681         lc->state = LOG_SUSPENDED;
682         lc->recovering_region = (uint64_t)-1;
683         lc->recoverer = (uint32_t)-1;
684         lc->delay = time(NULL);
685
686         return 0;
687 }
688
689 /*
690  * cluster_postsuspend
691  * @rq
692  *
693  */
694 int cluster_postsuspend(char *uuid, uint64_t luid)
695 {
696         struct log_c *lc = get_log(uuid, luid);
697
698         if (!lc)
699                 return -EINVAL;
700
701         LOG_DBG("[%s] clog_postsuspend: finalizing", SHORT_UUID(lc->uuid));
702         lc->resume_override = 0;
703
704         /* move log to pending list */
705         dm_list_del(&lc->list);
706         dm_list_add(&log_pending_list, &lc->list);
707
708         return 0;
709 }
710
711 /*
712  * clog_resume
713  * @rq
714  *
715  * Does the main work of resuming.
716  */
717 static int clog_resume(struct dm_ulog_request *rq)
718 {
719         uint32_t i;
720         int commit_log = 0;
721         struct log_c *lc = get_log(rq->uuid, rq->luid);
722
723         if (!lc)
724                 return -EINVAL;
725
726         switch (lc->resume_override) {
727         case 1000:
728                 LOG_ERROR("[%s] Additional resume issued before suspend",
729                           SHORT_UUID(rq->uuid));
730 #ifdef DEBUG
731                 kill(getpid(), SIGUSR1);
732 #endif
733                 return 0;
734         case 0:
735                 lc->resume_override = 1000;
736                 if (lc->disk_fd == -1) {
737                         LOG_DBG("[%s] Master resume.",
738                                 SHORT_UUID(lc->uuid));
739                         goto no_disk;
740                 }
741
742                 LOG_DBG("[%s] Master resume: reading disk log",
743                         SHORT_UUID(lc->uuid));
744                 commit_log = 1;
745                 break;
746         case 1:
747                 LOG_ERROR("Error:: partial bit loading (just sync_bits)");
748                 return -EINVAL;
749         case 2:
750                 LOG_ERROR("Error:: partial bit loading (just clean_bits)");
751                 return -EINVAL;
752         case 3:
753                 LOG_DBG("[%s] Non-master resume: bits pre-loaded",
754                         SHORT_UUID(lc->uuid));
755                 lc->resume_override = 1000;
756                 goto out;
757         default:
758                 LOG_ERROR("Error:: multiple loading of bits (%d)",
759                           lc->resume_override);
760                 return -EINVAL;
761         }
762
763         if (lc->log_dev_failed) {
764                 LOG_ERROR("Log device has failed, unable to read bits");
765                 rq->error = 0;  /* We can handle this so far */
766                 lc->disk_nr_regions = 0;
767         } else
768                 rq->error = read_log(lc);
769
770         switch (rq->error) {
771         case 0:
772                 if (lc->disk_nr_regions < lc->region_count)
773                         LOG_DBG("[%s] Mirror has grown, updating log bits",
774                                 SHORT_UUID(lc->uuid));
775                 else if (lc->disk_nr_regions > lc->region_count)
776                         LOG_DBG("[%s] Mirror has shrunk, updating log bits",
777                                 SHORT_UUID(lc->uuid));
778                 break;          
779         case -EINVAL:
780                 LOG_DBG("[%s] (Re)initializing mirror log - resync issued.",
781                         SHORT_UUID(lc->uuid));
782                 lc->disk_nr_regions = 0;
783                 break;
784         default:
785                 LOG_ERROR("Failed to read disk log");
786                 lc->disk_nr_regions = 0;
787                 break;
788         }
789
790 no_disk:
791         /* If mirror has grown, set bits appropriately */
792         if (lc->sync == NOSYNC)
793                 for (i = lc->disk_nr_regions; i < lc->region_count; i++)
794                         log_set_bit(lc, lc->clean_bits, i);
795         else
796                 for (i = lc->disk_nr_regions; i < lc->region_count; i++)
797                         log_clear_bit(lc, lc->clean_bits, i);
798
799         /* Clear any old bits if device has shrunk */
800         for (i = lc->region_count; i % 32; i++)
801                 log_clear_bit(lc, lc->clean_bits, i);
802
803         /* copy clean across to sync */
804         dm_bit_copy(lc->sync_bits, lc->clean_bits);
805
806         if (commit_log && (lc->disk_fd >= 0)) {
807                 rq->error = write_log(lc);
808                 if (rq->error)
809                         LOG_ERROR("Failed initial disk log write");
810                 else
811                         LOG_DBG("Disk log initialized");
812                 lc->touched = 0;
813         }
814 out:
815         /*
816          * Clear any old bits if device has shrunk - necessary
817          * for non-master resume
818          */
819         for (i = lc->region_count; i % 32; i++) {
820                 log_clear_bit(lc, lc->clean_bits, i);
821                 log_clear_bit(lc, lc->sync_bits, i);
822         }
823
824         lc->sync_count = count_bits32(lc->sync_bits);
825
826         LOG_SPRINT(lc, "[%s] Initial sync_count = %llu",
827                    SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
828         lc->sync_search = 0;
829         lc->state = LOG_RESUMED;
830         lc->recovery_halted = 0;
831         
832         return rq->error;
833 }
834
835 /*
836  * local_resume
837  * @rq
838  *
839  * If the log is pending, we must first join the cpg and
840  * put the log in the official list.
841  *
842  */
843 int local_resume(struct dm_ulog_request *rq)
844 {
845         int r;
846         time_t t;
847         struct log_c *lc = get_log(rq->uuid, rq->luid);
848
849         if (!lc) {
850                 /* Is the log in the pending list? */
851                 lc = get_pending_log(rq->uuid, rq->luid);
852                 if (!lc) {
853                         LOG_ERROR("clog_resume called on log that is not official or pending");
854                         return -EINVAL;
855                 }
856
857                 t = time(NULL);
858                 t -= lc->delay;
859                 /*
860                  * This should be considered a temporary fix.  It addresses
861                  * a problem that exists when nodes suspend/resume in rapid
862                  * succession.  While the problem is very rare, it has been
863                  * seen to happen in real-world-like testing.
864                  *
865                  * The problem:
866                  * - Node A joins cluster
867                  * - Node B joins cluster
868                  * - Node A prepares checkpoint
869                  * - Node A gets ready to write checkpoint
870                  * - Node B leaves
871                  * - Node B joins
872                  * - Node A finishes write of checkpoint
873                  * - Node B receives checkpoint meant for previous session
874                  * -- Node B can now be non-coherent
875                  *
876                  * This timer will solve the problem for now, but could be
877                  * replaced by a generation number sent with the resume
878                  * command from the kernel.  The generation number would
879                  * be included in the name of the checkpoint to prevent
880                  * reading stale data.
881                  */
882                 if ((t < 3) && (t >= 0))
883                         sleep(3 - t);
884
885                 /* Join the CPG */
886                 r = create_cluster_cpg(rq->uuid, rq->luid);
887                 if (r) {
888                         LOG_ERROR("clog_resume:  Failed to create cluster CPG");
889                         return r;
890                 }
891
892                 /* move log to official list */
893                 dm_list_del(&lc->list);
894                 dm_list_add(&log_list, &lc->list);
895         }
896
897         return 0;
898 }
899
900 /*
901  * clog_get_region_size
902  * @rq
903  *
904  * Since this value doesn't change, the kernel
905  * should not need to talk to server to get this
906  * The function is here for completness
907  *
908  * Returns: 0 on success, -EXXX on failure
909  */
910 static int clog_get_region_size(struct dm_ulog_request *rq)
911 {
912         uint64_t *rtn = (uint64_t *)rq->data;
913         struct log_c *lc = get_log(rq->uuid, rq->luid);
914
915         if (!lc && !(lc = get_pending_log(rq->uuid, rq->luid)))
916                 return -EINVAL;
917
918         *rtn = lc->region_size;
919         rq->data_size = sizeof(*rtn);
920
921         return 0;
922 }
923
924 /*
925  * clog_is_clean
926  * @rq
927  *
928  * Returns: 1 if clean, 0 otherwise
929  */
930 static int clog_is_clean(struct dm_ulog_request *rq)
931 {
932         int64_t *rtn = (int64_t *)rq->data;
933         uint64_t region = *((uint64_t *)(rq->data));
934         struct log_c *lc = get_log(rq->uuid, rq->luid);
935
936         if (!lc)
937                 return -EINVAL;
938
939         *rtn = log_test_bit(lc->clean_bits, region);
940         rq->data_size = sizeof(*rtn);
941
942         return 0;
943 }
944
945 /*
946  * clog_in_sync
947  * @rq
948  *
949  * We ignore any request for non-block.  That
950  * should be handled elsewhere.  (If the request
951  * has come this far, it has already blocked.)
952  *
953  * Returns: 1 if in-sync, 0 otherwise
954  */
955 static int clog_in_sync(struct dm_ulog_request *rq)
956 {
957         int64_t *rtn = (int64_t *)rq->data;
958         uint64_t region = *((uint64_t *)(rq->data));
959         struct log_c *lc = get_log(rq->uuid, rq->luid);
960
961         if (!lc)
962                 return -EINVAL;
963
964         if (region > lc->region_count)
965                 return -EINVAL;
966
967         *rtn = log_test_bit(lc->sync_bits, region);
968         if (*rtn)
969                 LOG_DBG("[%s] Region is in-sync: %llu",
970                         SHORT_UUID(lc->uuid), (unsigned long long)region);
971         else
972                 LOG_DBG("[%s] Region is not in-sync: %llu",
973                         SHORT_UUID(lc->uuid), (unsigned long long)region);
974
975         rq->data_size = sizeof(*rtn);
976
977         return 0;
978 }
979
980 /*
981  * clog_flush
982  * @rq
983  *
984  */
985 static int clog_flush(struct dm_ulog_request *rq, int server)
986 {
987         int r = 0;
988         struct log_c *lc = get_log(rq->uuid, rq->luid);
989         
990         if (!lc)
991                 return -EINVAL;
992
993         if (!lc->touched)
994                 return 0;
995
996         /*
997          * Do the actual flushing of the log only
998          * if we are the server.
999          */
1000         if (server && (lc->disk_fd >= 0)) {
1001                 r = rq->error = write_log(lc);
1002                 if (r)
1003                         LOG_ERROR("[%s] Error writing to disk log",
1004                                   SHORT_UUID(lc->uuid));
1005                 else 
1006                         LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
1007         }
1008
1009         lc->touched = 0;
1010
1011         return r;
1012
1013 }
1014
1015 /*
1016  * mark_region
1017  * @lc
1018  * @region
1019  * @who
1020  *
1021  * Put a mark region request in the tree for tracking.
1022  *
1023  * Returns: 0 on success, -EXXX on error
1024  */
1025 static int mark_region(struct log_c *lc, uint64_t region, uint32_t who)
1026 {
1027         int found = 0;
1028         struct mark_entry *m;
1029
1030         dm_list_iterate_items(m, &lc->mark_list)
1031                 if (m->region == region) {
1032                         found = 1;
1033                         if (m->nodeid == who)
1034                                 return 0;
1035                 }
1036
1037         if (!found)
1038                 log_clear_bit(lc, lc->clean_bits, region);
1039
1040         /*
1041          * Save allocation until here - if there is a failure,
1042          * at least we have cleared the bit.
1043          */
1044         m = malloc(sizeof(*m));
1045         if (!m) {
1046                 LOG_ERROR("Unable to allocate space for mark_entry: %llu/%u",
1047                           (unsigned long long)region, who);
1048                 return -ENOMEM;
1049         }
1050
1051         m->nodeid = who;
1052         m->region = region;
1053         dm_list_add(&lc->mark_list, &m->list);
1054
1055         return 0;
1056 }
1057
1058 /*
1059  * clog_mark_region
1060  * @rq
1061  *
1062  * rq may contain more than one mark request.  We
1063  * can determine the number from the 'data_size' field.
1064  *
1065  * Returns: 0 on success, -EXXX on failure
1066  */
1067 static int clog_mark_region(struct dm_ulog_request *rq, uint32_t originator)
1068 {
1069         int r;
1070         int count;
1071         uint64_t *region;
1072         struct log_c *lc = get_log(rq->uuid, rq->luid);
1073
1074         if (!lc)
1075                 return -EINVAL;
1076
1077         if (rq->data_size % sizeof(uint64_t)) {
1078                 LOG_ERROR("Bad data size given for mark_region request");
1079                 return -EINVAL;
1080         }
1081
1082         count = rq->data_size / sizeof(uint64_t);
1083         region = (uint64_t *)&rq->data;
1084
1085         for (; count > 0; count--, region++) {
1086                 r = mark_region(lc, *region, originator);
1087                 if (r)
1088                         return r;
1089         }
1090
1091         rq->data_size = 0;
1092
1093         return 0;
1094 }
1095
1096 static int clear_region(struct log_c *lc, uint64_t region, uint32_t who)
1097 {
1098         int other_matches = 0;
1099         struct mark_entry *m, *n;
1100
1101         dm_list_iterate_items_safe(m, n, &lc->mark_list)
1102                 if (m->region == region) {
1103                         if (m->nodeid == who) {
1104                                 dm_list_del(&m->list);
1105                                 free(m);
1106                         } else
1107                                 other_matches = 1;
1108                 }
1109
1110         /*
1111          * Clear region if:
1112          *  1) It is in-sync
1113          *  2) There are no other machines that have it marked
1114          */
1115         if (!other_matches && log_test_bit(lc->sync_bits, region))
1116                 log_set_bit(lc, lc->clean_bits, region);
1117
1118         return 0;
1119 }
1120
1121 /*
1122  * clog_clear_region
1123  * @rq
1124  *
1125  * rq may contain more than one clear request.  We
1126  * can determine the number from the 'data_size' field.
1127  *
1128  * Returns: 0 on success, -EXXX on failure
1129  */
1130 static int clog_clear_region(struct dm_ulog_request *rq, uint32_t originator)
1131 {
1132         int r;
1133         int count;
1134         uint64_t *region;
1135         struct log_c *lc = get_log(rq->uuid, rq->luid);
1136
1137         if (!lc)
1138                 return -EINVAL;
1139
1140         if (rq->data_size % sizeof(uint64_t)) {
1141                 LOG_ERROR("Bad data size given for clear_region request");
1142                 return -EINVAL;
1143         }
1144
1145         count = rq->data_size / sizeof(uint64_t);
1146         region = (uint64_t *)&rq->data;
1147
1148         for (; count > 0; count--, region++) {
1149                 r = clear_region(lc, *region, originator);
1150                 if (r)
1151                         return r;
1152         }
1153
1154         rq->data_size = 0;
1155
1156         return 0;
1157 }
1158
1159 /*
1160  * clog_get_resync_work
1161  * @rq
1162  *
1163  */
1164 static int clog_get_resync_work(struct dm_ulog_request *rq, uint32_t originator)
1165 {
1166         struct {
1167                 int64_t i;
1168                 uint64_t r;
1169         } *pkg = (void *)rq->data;
1170         struct log_c *lc = get_log(rq->uuid, rq->luid);
1171
1172         if (!lc)
1173                 return -EINVAL;
1174
1175         rq->data_size = sizeof(*pkg);
1176         pkg->i = 0;
1177
1178         if (lc->sync_search >= lc->region_count) {
1179                 /*
1180                  * FIXME: handle intermittent errors during recovery
1181                  * by resetting sync_search... but not to many times.
1182                  */
1183                 LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1184                            "Recovery finished",
1185                            rq->seq, SHORT_UUID(lc->uuid), originator);
1186                 return 0;
1187         }
1188
1189         if (lc->recovering_region != (uint64_t)-1) {
1190                 if (lc->recoverer == originator) {
1191                         LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1192                                    "Re-requesting work (%llu)",
1193                                    rq->seq, SHORT_UUID(lc->uuid), originator,
1194                                    (unsigned long long)lc->recovering_region);
1195                         pkg->r = lc->recovering_region;
1196                         pkg->i = 1;
1197                         LOG_COND(log_resend_requests, "***** RE-REQUEST *****");
1198                 } else {
1199                         LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1200                                    "Someone already recovering (%llu)",
1201                                    rq->seq, SHORT_UUID(lc->uuid), originator,
1202                                    (unsigned long long)lc->recovering_region);
1203                 }
1204
1205                 return 0;
1206         }
1207
1208         while (lc->recovery_request_list) {
1209                 struct recovery_request *del;
1210
1211                 del = lc->recovery_request_list;
1212                 lc->recovery_request_list = del->next;
1213
1214                 pkg->r = del->region;
1215                 free(del);
1216
1217                 if (!log_test_bit(lc->sync_bits, pkg->r)) {
1218                         LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1219                                    "Assigning priority resync work (%llu)",
1220                                    rq->seq, SHORT_UUID(lc->uuid), originator,
1221                                    (unsigned long long)pkg->r);
1222                         pkg->i = 1;
1223                         lc->recovering_region = pkg->r;
1224                         lc->recoverer = originator;
1225                         return 0;
1226                 }
1227         }
1228
1229         pkg->r = find_next_zero_bit(lc->sync_bits,
1230                                     lc->sync_search);
1231
1232         if (pkg->r >= lc->region_count) {
1233                 LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1234                            "Resync work complete.",
1235                            rq->seq, SHORT_UUID(lc->uuid), originator);
1236                 return 0;
1237         }
1238
1239         lc->sync_search = pkg->r + 1;
1240
1241         LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1242                    "Assigning resync work (%llu)",
1243                    rq->seq, SHORT_UUID(lc->uuid), originator,
1244                    (unsigned long long)pkg->r);
1245         pkg->i = 1;
1246         lc->recovering_region = pkg->r;
1247         lc->recoverer = originator;
1248
1249         return 0;
1250 }
1251
1252 /*
1253  * clog_set_region_sync
1254  * @rq
1255  */
1256 static int clog_set_region_sync(struct dm_ulog_request *rq, uint32_t originator)
1257 {
1258         struct {
1259                 uint64_t region;
1260                 int64_t in_sync;
1261         } *pkg = (void *)rq->data;
1262         struct log_c *lc = get_log(rq->uuid, rq->luid);
1263
1264         if (!lc)
1265                 return -EINVAL;
1266
1267         lc->recovering_region = (uint64_t)-1;
1268
1269         if (pkg->in_sync) {
1270                 if (log_test_bit(lc->sync_bits, pkg->region)) {
1271                         LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1272                                    "Region already set (%llu)",
1273                                    rq->seq, SHORT_UUID(lc->uuid), originator,
1274                                    (unsigned long long)pkg->region);
1275                 } else {
1276                         log_set_bit(lc, lc->sync_bits, pkg->region);
1277                         lc->sync_count++;
1278
1279                         /* The rest of this section is all for debugging */
1280                         LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1281                                    "Setting region (%llu)",
1282                                    rq->seq, SHORT_UUID(lc->uuid), originator,
1283                                    (unsigned long long)pkg->region);
1284                         if (pkg->region == lc->skip_bit_warning)
1285                                 lc->skip_bit_warning = lc->region_count;
1286
1287                         if (pkg->region > (lc->skip_bit_warning + 5)) {
1288                                 LOG_ERROR("*** Region #%llu skipped during recovery ***",
1289                                           (unsigned long long)lc->skip_bit_warning);
1290                                 lc->skip_bit_warning = lc->region_count;
1291 #ifdef DEBUG
1292                                 kill(getpid(), SIGUSR1);
1293 #endif
1294                         }
1295
1296                         if (!log_test_bit(lc->sync_bits,
1297                                           (pkg->region) ? pkg->region - 1 : 0)) {
1298                                 LOG_SPRINT(lc, "*** Previous bit not set ***");
1299                                 lc->skip_bit_warning = (pkg->region) ?
1300                                         pkg->region - 1 : 0;
1301                         }
1302                 }
1303         } else if (log_test_bit(lc->sync_bits, pkg->region)) {
1304                 lc->sync_count--;
1305                 log_clear_bit(lc, lc->sync_bits, pkg->region);
1306                 LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1307                            "Unsetting region (%llu)",
1308                            rq->seq, SHORT_UUID(lc->uuid), originator,
1309                            (unsigned long long)pkg->region);
1310         }
1311
1312         if (lc->sync_count != count_bits32(lc->sync_bits)) {
1313                 unsigned long long reset = count_bits32(lc->sync_bits);
1314
1315                 LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1316                            "sync_count(%llu) != bitmap count(%llu)",
1317                            rq->seq, SHORT_UUID(lc->uuid), originator,
1318                            (unsigned long long)lc->sync_count, reset);
1319 #ifdef DEBUG
1320                 kill(getpid(), SIGUSR1);
1321 #endif
1322                 lc->sync_count = reset;
1323         }
1324
1325         if (lc->sync_count > lc->region_count)
1326                 LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1327                            "(lc->sync_count > lc->region_count) - this is bad",
1328                            rq->seq, SHORT_UUID(lc->uuid), originator);
1329
1330         rq->data_size = 0;
1331         return 0;
1332 }
1333
1334 /*
1335  * clog_get_sync_count
1336  * @rq
1337  */
1338 static int clog_get_sync_count(struct dm_ulog_request *rq, uint32_t originator)
1339 {
1340         uint64_t *sync_count = (uint64_t *)rq->data;
1341         struct log_c *lc = get_log(rq->uuid, rq->luid);
1342
1343         /*
1344          * FIXME: Mirror requires us to be able to ask for
1345          * the sync count while pending... but I don't like
1346          * it because other machines may not be suspended and
1347          * the stored value may not be accurate.
1348          */
1349         if (!lc)
1350                 lc = get_pending_log(rq->uuid, rq->luid);
1351
1352         if (!lc)
1353                 return -EINVAL;
1354
1355         *sync_count = lc->sync_count;
1356
1357         rq->data_size = sizeof(*sync_count);
1358
1359         if (lc->sync_count != count_bits32(lc->sync_bits)) {
1360                 unsigned long long reset = count_bits32(lc->sync_bits);
1361
1362                 LOG_SPRINT(lc, "get_sync_count - SEQ#=%u, UUID=%s, nodeid = %u:: "
1363                            "sync_count(%llu) != bitmap count(%llu)",
1364                            rq->seq, SHORT_UUID(lc->uuid), originator,
1365                            (unsigned long long)lc->sync_count, reset);
1366 #ifdef DEBUG
1367                 kill(getpid(), SIGUSR1);
1368 #endif
1369                 lc->sync_count = reset;
1370         }
1371
1372         return 0;
1373 }
1374
1375 static int core_status_info(struct log_c *lc, struct dm_ulog_request *rq)
1376 {
1377         char *data = (char *)rq->data;
1378
1379         rq->data_size = sprintf(data, "1 clustered_core");
1380
1381         return 0;
1382 }
1383
1384 static int disk_status_info(struct log_c *lc, struct dm_ulog_request *rq)
1385 {
1386         char *data = (char *)rq->data;
1387         struct stat statbuf;
1388
1389         if(fstat(lc->disk_fd, &statbuf)) {
1390                 rq->error = -errno;
1391                 return -errno;
1392         }
1393
1394         rq->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
1395                                 major(statbuf.st_rdev), minor(statbuf.st_rdev),
1396                                 (lc->log_dev_failed) ? 'D' : 'A');
1397
1398         return 0;
1399 }
1400
1401 /*
1402  * clog_status_info
1403  * @rq
1404  *
1405  */
1406 static int clog_status_info(struct dm_ulog_request *rq)
1407 {
1408         int r;
1409         struct log_c *lc = get_log(rq->uuid, rq->luid);
1410
1411         if (!lc)
1412                 lc = get_pending_log(rq->uuid, rq->luid);
1413
1414         if (!lc)
1415                 return -EINVAL;
1416
1417         if (lc->disk_fd == -1)
1418                 r = core_status_info(lc, rq);
1419         else
1420                 r = disk_status_info(lc, rq);
1421
1422         return r;
1423 }
1424
1425 static int core_status_table(struct log_c *lc, struct dm_ulog_request *rq)
1426 {
1427         char *data = (char *)rq->data;
1428
1429         rq->data_size = sprintf(data, "clustered_core %u %s%s ",
1430                                 lc->region_size,
1431                                 (lc->sync == DEFAULTSYNC) ? "" :
1432                                 (lc->sync == NOSYNC) ? "nosync " : "sync ",
1433                                 (lc->block_on_error) ? "block_on_error" : "");
1434         return 0;
1435 }
1436
1437 static int disk_status_table(struct log_c *lc, struct dm_ulog_request *rq)
1438 {
1439         char *data = (char *)rq->data;
1440         struct stat statbuf;
1441
1442         if(fstat(lc->disk_fd, &statbuf)) {
1443                 rq->error = -errno;
1444                 return -errno;
1445         }
1446
1447         rq->data_size = sprintf(data, "clustered_disk %d:%d %u %s%s ",
1448                                 major(statbuf.st_rdev), minor(statbuf.st_rdev),
1449                                 lc->region_size,
1450                                 (lc->sync == DEFAULTSYNC) ? "" :
1451                                 (lc->sync == NOSYNC) ? "nosync " : "sync ",
1452                                 (lc->block_on_error) ? "block_on_error" : "");
1453         return 0;
1454 }
1455
1456 /*
1457  * clog_status_table
1458  * @rq
1459  *
1460  */
1461 static int clog_status_table(struct dm_ulog_request *rq)
1462 {
1463         int r;
1464         struct log_c *lc = get_log(rq->uuid, rq->luid);
1465
1466         if (!lc)
1467                 lc = get_pending_log(rq->uuid, rq->luid);
1468
1469         if (!lc)
1470                 return -EINVAL;
1471
1472         if (lc->disk_fd == -1)
1473                 r = core_status_table(lc, rq);
1474         else
1475                 r = disk_status_table(lc, rq);
1476
1477         return r;
1478 }
1479
1480 /*
1481  * clog_is_remote_recovering
1482  * @rq
1483  *
1484  */
1485 static int clog_is_remote_recovering(struct dm_ulog_request *rq)
1486 {
1487         uint64_t region = *((uint64_t *)(rq->data));
1488         struct {
1489                 int64_t is_recovering;
1490                 uint64_t in_sync_hint;
1491         } *pkg = (void *)rq->data;
1492         struct log_c *lc = get_log(rq->uuid, rq->luid);
1493
1494         if (!lc)
1495                 return -EINVAL;
1496
1497         if (region > lc->region_count)
1498                 return -EINVAL;
1499
1500         if (lc->recovery_halted) {
1501                 LOG_DBG("[%s] Recovery halted... [not remote recovering]: %llu",
1502                         SHORT_UUID(lc->uuid), (unsigned long long)region);
1503                 pkg->is_recovering = 0;
1504                 pkg->in_sync_hint = lc->region_count; /* none are recovering */
1505         } else {
1506                 pkg->is_recovering = !log_test_bit(lc->sync_bits, region);
1507
1508                 /*
1509                  * Remember, 'lc->sync_search' is 1 plus the region
1510                  * currently being recovered.  So, we must take off 1
1511                  * to account for that; but only if 'sync_search > 1'.
1512                  */
1513                 pkg->in_sync_hint = lc->sync_search ? (lc->sync_search - 1) : 0;
1514                 LOG_DBG("[%s] Region is %s: %llu",
1515                         SHORT_UUID(lc->uuid),
1516                         (region == lc->recovering_region) ?
1517                         "currently remote recovering" :
1518                         (pkg->is_recovering) ? "pending remote recovery" :
1519                         "not remote recovering", (unsigned long long)region);
1520         }
1521
1522         if (pkg->is_recovering &&
1523             (region != lc->recovering_region)) {
1524                 struct recovery_request *rr;
1525
1526                 /* Already in the list? */
1527                 for (rr = lc->recovery_request_list; rr; rr = rr->next)
1528                         if (rr->region == region)
1529                                 goto out;
1530
1531                 /* Failure to allocated simply means we can't prioritize it */
1532                 rr = malloc(sizeof(*rr));
1533                 if (!rr)
1534                         goto out;
1535
1536                 LOG_DBG("[%s] Adding region to priority list: %llu",
1537                         SHORT_UUID(lc->uuid), (unsigned long long)region);
1538                 rr->region = region;
1539                 rr->next = lc->recovery_request_list;
1540                 lc->recovery_request_list = rr;
1541         }
1542
1543 out:
1544
1545         rq->data_size = sizeof(*pkg);
1546
1547         return 0;       
1548 }
1549
1550
1551 /*
1552  * do_request
1553  * @rq: the request
1554  * @server: is this request performed by the server
1555  *
1556  * An inability to perform this function will return an error
1557  * from this function.  However, an inability to successfully
1558  * perform the request will fill in the 'rq->error' field.
1559  *
1560  * Returns: 0 on success, -EXXX on error
1561  */
1562 int do_request(struct clog_request *rq, int server)
1563 {
1564         int r;
1565
1566         if (!rq)
1567                 return 0;
1568
1569         if (rq->u_rq.error)
1570                 LOG_DBG("Programmer error: rq struct has error set");
1571
1572         switch (rq->u_rq.request_type) {
1573         case DM_ULOG_CTR:
1574                 r = clog_ctr(&rq->u_rq);
1575                 break;
1576         case DM_ULOG_DTR:
1577                 r = clog_dtr(&rq->u_rq);
1578                 break;
1579         case DM_ULOG_PRESUSPEND:
1580                 r = clog_presuspend(&rq->u_rq);
1581                 break;
1582         case DM_ULOG_POSTSUSPEND:
1583                 r = clog_postsuspend(&rq->u_rq);
1584                 break;
1585         case DM_ULOG_RESUME:
1586                 r = clog_resume(&rq->u_rq);
1587                 break;
1588         case DM_ULOG_GET_REGION_SIZE:
1589                 r = clog_get_region_size(&rq->u_rq);
1590                 break;
1591         case DM_ULOG_IS_CLEAN:
1592                 r = clog_is_clean(&rq->u_rq);
1593                 break;
1594         case DM_ULOG_IN_SYNC:
1595                 r = clog_in_sync(&rq->u_rq);
1596                 break;
1597         case DM_ULOG_FLUSH:
1598                 r = clog_flush(&rq->u_rq, server);
1599                 break;
1600         case DM_ULOG_MARK_REGION:
1601                 r = clog_mark_region(&rq->u_rq, rq->originator);
1602                 break;
1603         case DM_ULOG_CLEAR_REGION:
1604                 r = clog_clear_region(&rq->u_rq, rq->originator);
1605                 break;
1606         case DM_ULOG_GET_RESYNC_WORK:
1607                 r = clog_get_resync_work(&rq->u_rq, rq->originator);
1608                 break;
1609         case DM_ULOG_SET_REGION_SYNC:
1610                 r = clog_set_region_sync(&rq->u_rq, rq->originator);
1611                 break;
1612         case DM_ULOG_GET_SYNC_COUNT:
1613                 r = clog_get_sync_count(&rq->u_rq, rq->originator);
1614                 break;
1615         case DM_ULOG_STATUS_INFO:
1616                 r = clog_status_info(&rq->u_rq);
1617                 break;
1618         case DM_ULOG_STATUS_TABLE:
1619                 r = clog_status_table(&rq->u_rq);
1620                 break;
1621         case DM_ULOG_IS_REMOTE_RECOVERING:
1622                 r = clog_is_remote_recovering(&rq->u_rq);
1623                 break;
1624         default:
1625                 LOG_ERROR("Unknown request");
1626                 r = rq->u_rq.error = -EINVAL;
1627                 break;
1628         }
1629
1630         if (r && !rq->u_rq.error)
1631                 rq->u_rq.error = r;
1632         else if (r != rq->u_rq.error)
1633                 LOG_DBG("Warning:  error from function != rq->u_rq.error");
1634
1635         if (rq->u_rq.error && rq->u_rq.data_size) {
1636                 /* Make sure I'm handling errors correctly above */
1637                 LOG_DBG("Programmer error: rq->u_rq.error && rq->u_rq.data_size");
1638                 rq->u_rq.data_size = 0;
1639         }
1640
1641         return 0;
1642 }
1643
1644 static void print_bits(char *buf, int size, int print)
1645 {
1646         int i;
1647         char outbuf[128];
1648
1649         memset(outbuf, 0, sizeof(outbuf));
1650
1651         for (i = 0; i < size; i++) {
1652                 if (!(i % 16)) {
1653                         if (outbuf[0] != '\0') {
1654                                 if (print)
1655                                         LOG_PRINT("%s", outbuf);
1656                                 else
1657                                         LOG_DBG("%s", outbuf);
1658                         }
1659                         memset(outbuf, 0, sizeof(outbuf));
1660                         sprintf(outbuf, "[%3d - %3d]", i, i+15);
1661                 }
1662                 sprintf(outbuf + strlen(outbuf), " %.2X", (unsigned char)buf[i]);
1663         }
1664         if (outbuf[0] != '\0') {
1665                 if (print)
1666                         LOG_PRINT("%s", outbuf);
1667                 else
1668                         LOG_DBG("%s", outbuf);
1669         }
1670 }
1671
1672 /* int store_bits(const char *uuid, const char *which, char **buf)*/
1673 int push_state(const char *uuid, uint64_t luid,
1674                const char *which, char **buf, uint32_t debug_who)
1675 {
1676         int bitset_size;
1677         struct log_c *lc;
1678
1679         if (*buf)
1680                 LOG_ERROR("store_bits: *buf != NULL");
1681
1682         lc = get_log(uuid, luid);
1683         if (!lc) {
1684                 LOG_ERROR("store_bits: No log found for %s", uuid);
1685                 return -EINVAL;
1686         }
1687
1688         if (!strcmp(which, "recovering_region")) {
1689                 *buf = malloc(64); /* easily handles the 2 written numbers */
1690                 if (!*buf)
1691                         return -ENOMEM;
1692                 sprintf(*buf, "%llu %u", (unsigned long long)lc->recovering_region,
1693                         lc->recoverer);
1694
1695                 LOG_SPRINT(lc, "CKPT SEND - SEQ#=X, UUID=%s, nodeid = %u:: "
1696                            "recovering_region=%llu, recoverer=%u, sync_count=%llu",
1697                            SHORT_UUID(lc->uuid), debug_who,
1698                            (unsigned long long)lc->recovering_region,
1699                            lc->recoverer,
1700                            (unsigned long long)count_bits32(lc->sync_bits));
1701                 return 64;
1702         }
1703
1704         /* Size in 'int's */
1705         bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1;
1706
1707         /* Size in bytes */
1708         bitset_size *= 4;
1709
1710         *buf = malloc(bitset_size);
1711
1712         if (!*buf) {
1713                 LOG_ERROR("store_bits: Unable to allocate memory");
1714                 return -ENOMEM;
1715         }
1716
1717         if (!strncmp(which, "sync_bits", 9)) {
1718                 memcpy(*buf, lc->sync_bits + 1, bitset_size);
1719                 LOG_DBG("[%s] storing sync_bits (sync_count = %llu):",
1720                         SHORT_UUID(uuid), (unsigned long long)
1721                         count_bits32(lc->sync_bits));
1722                 print_bits(*buf, bitset_size, 0);
1723         } else if (!strncmp(which, "clean_bits", 9)) {
1724                 memcpy(*buf, lc->clean_bits + 1, bitset_size);
1725                 LOG_DBG("[%s] storing clean_bits:", SHORT_UUID(lc->uuid));
1726                 print_bits(*buf, bitset_size, 0);
1727         }
1728
1729         return bitset_size;
1730 }
1731
1732 /*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
1733 int pull_state(const char *uuid, uint64_t luid,
1734                const char *which, char *buf, int size)
1735 {
1736         int bitset_size;
1737         struct log_c *lc;
1738
1739         if (!buf)
1740                 LOG_ERROR("pull_state: buf == NULL");
1741
1742         lc = get_log(uuid, luid);
1743         if (!lc) {
1744                 LOG_ERROR("pull_state: No log found for %s", uuid);
1745                 return -EINVAL;
1746         }
1747
1748         if (!strncmp(which, "recovering_region", 17)) {
1749                 sscanf(buf, "%llu %u", (unsigned long long *)&lc->recovering_region,
1750                        &lc->recoverer);
1751                 LOG_SPRINT(lc, "CKPT INIT - SEQ#=X, UUID=%s, nodeid = X:: "
1752                            "recovering_region=%llu, recoverer=%u",
1753                            SHORT_UUID(lc->uuid),
1754                            (unsigned long long)lc->recovering_region, lc->recoverer);
1755                 return 0;
1756         }
1757
1758         /* Size in 'int's */
1759         bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1;
1760
1761         /* Size in bytes */
1762         bitset_size *= 4;
1763
1764         if (bitset_size != size) {
1765                 LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
1766                           which, size, bitset_size);
1767                 return -EINVAL;
1768         }
1769
1770         if (!strncmp(which, "sync_bits", 9)) {
1771                 lc->resume_override += 1;
1772                 memcpy(lc->sync_bits + 1, buf, bitset_size);
1773                 LOG_DBG("[%s] loading sync_bits (sync_count = %llu):",
1774                         SHORT_UUID(lc->uuid),(unsigned long long)
1775                         count_bits32(lc->sync_bits));
1776                 print_bits((char *)lc->sync_bits, bitset_size, 0);
1777         } else if (!strncmp(which, "clean_bits", 9)) {
1778                 lc->resume_override += 2;
1779                 memcpy(lc->clean_bits + 1, buf, bitset_size);
1780                 LOG_DBG("[%s] loading clean_bits:", SHORT_UUID(lc->uuid));
1781                 print_bits((char *)lc->clean_bits, bitset_size, 0);
1782         }
1783
1784         return 0;
1785 }
1786
1787 int log_get_state(struct dm_ulog_request *rq)
1788 {
1789         struct log_c *lc;
1790
1791         lc = get_log(rq->uuid, rq->luid);
1792         if (!lc)
1793                 return -EINVAL;
1794
1795         return lc->state;
1796 }
1797
1798 /*
1799  * log_status
1800  *
1801  * Returns: 1 if logs are still present, 0 otherwise
1802  */
1803 int log_status(void)
1804 {
1805         if (!dm_list_empty(&log_list) || !dm_list_empty(&log_pending_list))
1806                 return 1;
1807
1808         return 0;
1809 }
1810
1811 void log_debug(void)
1812 {
1813         struct log_c *lc;
1814         uint64_t r;
1815         int i;
1816
1817         LOG_ERROR("");
1818         LOG_ERROR("LOG COMPONENT DEBUGGING::");
1819         LOG_ERROR("Official log list:");
1820         LOG_ERROR("Pending log list:");
1821         dm_list_iterate_items(lc, &log_pending_list) {
1822                 LOG_ERROR("%s", lc->uuid);
1823                 LOG_ERROR("sync_bits:");
1824                 print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1);
1825                 LOG_ERROR("clean_bits:");
1826                 print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1);
1827         }
1828
1829         dm_list_iterate_items(lc, &log_list) {
1830                 LOG_ERROR("%s", lc->uuid);
1831                 LOG_ERROR("  recoverer        : %u", lc->recoverer);
1832                 LOG_ERROR("  recovering_region: %llu",
1833                           (unsigned long long)lc->recovering_region);
1834                 LOG_ERROR("  recovery_halted  : %s", (lc->recovery_halted) ?
1835                           "YES" : "NO");
1836                 LOG_ERROR("sync_bits:");
1837                 print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1);
1838                 LOG_ERROR("clean_bits:");
1839                 print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1);
1840
1841                 LOG_ERROR("Validating %s::", SHORT_UUID(lc->uuid));
1842                 r = find_next_zero_bit(lc->sync_bits, 0);
1843                 LOG_ERROR("  lc->region_count = %llu",
1844                           (unsigned long long)lc->region_count);
1845                 LOG_ERROR("  lc->sync_count = %llu",
1846                           (unsigned long long)lc->sync_count);
1847                 LOG_ERROR("  next zero bit  = %llu",
1848                           (unsigned long long)r);
1849                 if ((r > lc->region_count) ||
1850                     ((r == lc->region_count) && (lc->sync_count > lc->region_count))) {
1851                         LOG_ERROR("ADJUSTING SYNC_COUNT");
1852                         lc->sync_count = lc->region_count;
1853                 }
1854
1855                 LOG_ERROR("Resync request history:");
1856                 for (i = 0; i < RESYNC_HISTORY; i++) {
1857                         lc->idx++;
1858                         lc->idx = lc->idx % RESYNC_HISTORY;
1859                         if (lc->resync_history[lc->idx][0] == '\0')
1860                                 continue;
1861                         LOG_ERROR("%d:%d) %s", i, lc->idx,
1862                                   lc->resync_history[lc->idx]);
1863                 }
1864         }
1865 }