Initial import of binutils 2.22 on the new vendor branch
[dragonfly.git] / contrib / lvm2 / dist / daemons / cmirrord / cluster.c
1 /*      $NetBSD: cluster.c,v 1.1.1.1 2009/12/02 00:27:08 haad Exp $     */
2
3 /*
4  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
5  *
6  * This copyrighted material is made available to anyone wishing to use,
7  * modify, copy, or redistribute it subject to the terms and conditions
8  * of the GNU Lesser General Public License v.2.1.
9  *
10  * You should have received a copy of the GNU Lesser General Public License
11  * along with this program; if not, write to the Free Software Foundation,
12  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
13  */
14 #include <errno.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <sys/socket.h> /* These are for OpenAIS CPGs */
22 #include <sys/select.h>
23 #include <sys/un.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/cpg.h>
28 #include <openais/saAis.h>
29 #include <openais/saCkpt.h>
30
31 #include "dm-log-userspace.h"
32 #include "libdevmapper.h"
33 #include "functions.h"
34 #include "local.h"
35 #include "common.h"
36 #include "logging.h"
37 #include "link_mon.h"
38 #include "cluster.h"
39
40 /* Open AIS error codes */
41 #define str_ais_error(x)                                                \
42         ((x) == SA_AIS_OK) ? "SA_AIS_OK" :                              \
43         ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" :            \
44         ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" :            \
45         ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" :                  \
46         ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" :            \
47         ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" :        \
48         ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
49         ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" :        \
50         ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" :      \
51         ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" :                  \
52         ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" :              \
53         ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" :        \
54         ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
55         ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" :                \
56         ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" :          \
57         ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" :        \
58         ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
59         ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" :  \
60         ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
61         ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
62         ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
63         ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
64         ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" :      \
65         ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
66         ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" :        \
67         ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" :            \
68         ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" :    \
69         "ais_error_unknown"
70
71 #define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
72 #define DM_ULOG_CHECKPOINT_READY 21
73 #define DM_ULOG_MEMBER_JOIN      22
74
75 #define _RQ_TYPE(x)                                                     \
76         ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
77         ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN":           \
78         RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
79
80 static uint32_t my_cluster_id = 0xDEAD;
81 static SaCkptHandleT ckpt_handle = 0;
82 static SaCkptCallbacksT callbacks = { 0, 0 };
83 static SaVersionT version = { 'B', 1, 1 };
84
85 #define DEBUGGING_HISTORY 100
86 //static char debugging[DEBUGGING_HISTORY][128];
87 //static int idx = 0;
88 #define LOG_SPRINT(cc, f, arg...) do {                          \
89                 cc->idx++;                                      \
90                 cc->idx = cc->idx % DEBUGGING_HISTORY;          \
91                 sprintf(cc->debugging[cc->idx], f, ## arg);     \
92         } while (0)
93
94 static int log_resp_rec = 0;
95
96 struct checkpoint_data {
97         uint32_t requester;
98         char uuid[CPG_MAX_NAME_LENGTH];
99
100         int bitmap_size; /* in bytes */
101         char *sync_bits;
102         char *clean_bits;
103         char *recovering_region;
104         struct checkpoint_data *next;
105 };      
106
107 #define INVALID 0
108 #define VALID   1
109 #define LEAVING 2
110
111 #define MAX_CHECKPOINT_REQUESTERS 10
112 struct clog_cpg {
113         struct dm_list list;
114
115         uint32_t lowest_id;
116         cpg_handle_t handle;
117         struct cpg_name name;
118         uint64_t luid;
119
120         /* Are we the first, or have we received checkpoint? */
121         int state;
122         int cpg_state;  /* FIXME: debugging */
123         int free_me;
124         int delay;
125         int resend_requests;
126         struct dm_list startup_list;
127         struct dm_list working_list;
128
129         int checkpoints_needed;
130         uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
131         struct checkpoint_data *checkpoint_list;
132         int idx;
133         char debugging[DEBUGGING_HISTORY][128];
134 };
135
136 static struct dm_list clog_cpg_list;
137
138 /*
139  * cluster_send
140  * @rq
141  *
142  * Returns: 0 on success, -Exxx on error
143  */
144 int cluster_send(struct clog_request *rq)
145 {
146         int r;
147         int count=0;
148         int found;
149         struct iovec iov;
150         struct clog_cpg *entry;
151
152         dm_list_iterate_items(entry, &clog_cpg_list)
153                 if (!strncmp(entry->name.value, rq->u_rq.uuid,
154                              CPG_MAX_NAME_LENGTH)) {
155                         found = 1;
156                         break;
157                 }
158
159         if (!found) {
160                 rq->u_rq.error = -ENOENT;
161                 return -ENOENT;
162         }
163
164         /*
165          * Once the request heads for the cluster, the luid looses
166          * all its meaning.
167          */
168         rq->u_rq.luid = 0;
169
170         iov.iov_base = rq;
171         iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
172
173         if (entry->cpg_state != VALID)
174                 return -EINVAL;
175
176         do {
177                 r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
178                 if (r != SA_AIS_ERR_TRY_AGAIN)
179                         break;
180                 count++;
181                 if (count < 10)
182                         LOG_PRINT("[%s]  Retry #%d of cpg_mcast_joined: %s",
183                                   SHORT_UUID(rq->u_rq.uuid), count,
184                                   str_ais_error(r));
185                 else if ((count < 100) && !(count % 10))
186                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
187                                   SHORT_UUID(rq->u_rq.uuid), count,
188                                   str_ais_error(r));
189                 else if ((count < 1000) && !(count % 100))
190                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
191                                   SHORT_UUID(rq->u_rq.uuid), count,
192                                   str_ais_error(r));
193                 else if ((count < 10000) && !(count % 1000))
194                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s - "
195                                   "OpenAIS not handling the load?",
196                                   SHORT_UUID(rq->u_rq.uuid), count,
197                                   str_ais_error(r));
198                 usleep(1000);
199         } while (1);
200
201         if (r == CPG_OK)
202                 return 0;
203
204         /* error codes found in openais/cpg.h */
205         LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
206
207         rq->u_rq.error = -EBADE;
208         return -EBADE;
209 }
210
211 static struct clog_request *get_matching_rq(struct clog_request *rq,
212                                             struct dm_list *l)
213 {
214         struct clog_request *match, *n;
215
216         dm_list_iterate_items_safe(match, n, l)
217                 if (match->u_rq.seq == rq->u_rq.seq) {
218                         dm_list_del(&match->list);
219                         return match;
220                 }
221
222         return NULL;
223 }
224
225 static char rq_buffer[DM_ULOG_REQUEST_SIZE];
226 static int handle_cluster_request(struct clog_cpg *entry,
227                                   struct clog_request *rq, int server)
228 {
229         int r = 0;
230         struct clog_request *tmp = (struct clog_request *)rq_buffer;
231
232         /*
233          * We need a separate dm_ulog_request struct, one that can carry
234          * a return payload.  Otherwise, the memory address after
235          * rq will be altered - leading to problems
236          */
237         memset(rq_buffer, 0, sizeof(rq_buffer));
238         memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
239
240         /*
241          * With resumes, we only handle our own.
242          * Resume is a special case that requires
243          * local action (to set up CPG), followed by
244          * a cluster action to co-ordinate reading
245          * the disk and checkpointing
246          */
247         if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
248                 if (tmp->originator == my_cluster_id) {
249                         r = do_request(tmp, server);
250
251                         r = kernel_send(&tmp->u_rq);
252                         if (r < 0)
253                                 LOG_ERROR("Failed to send resume response to kernel");
254                 }
255                 return r;
256         }
257
258         r = do_request(tmp, server);
259
260         if (server &&
261             (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
262             (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
263                 tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
264
265                 /*
266                  * Errors from previous functions are in the rq struct.
267                  */
268                 r = cluster_send(tmp);
269                 if (r < 0)
270                         LOG_ERROR("cluster_send failed: %s", strerror(-r));
271         }
272
273         return r;
274 }
275
276 static int handle_cluster_response(struct clog_cpg *entry,
277                                    struct clog_request *rq)
278 {
279         int r = 0;
280         struct clog_request *orig_rq;
281
282         /*
283          * If I didn't send it, then I don't care about the response
284          */
285         if (rq->originator != my_cluster_id)
286                 return 0;
287
288         rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
289         orig_rq = get_matching_rq(rq, &entry->working_list);
290
291         if (!orig_rq) {
292                 /* Unable to find match for response */
293
294                 LOG_ERROR("[%s] No match for cluster response: %s:%u",
295                           SHORT_UUID(rq->u_rq.uuid),
296                           _RQ_TYPE(rq->u_rq.request_type),
297                           rq->u_rq.seq);
298
299                 LOG_ERROR("Current local list:");
300                 if (dm_list_empty(&entry->working_list))
301                         LOG_ERROR("   [none]");
302
303                 dm_list_iterate_items(orig_rq, &entry->working_list)
304                         LOG_ERROR("   [%s]  %s:%u",
305                                   SHORT_UUID(orig_rq->u_rq.uuid),
306                                   _RQ_TYPE(orig_rq->u_rq.request_type),
307                                   orig_rq->u_rq.seq);
308
309                 return -EINVAL;
310         }
311
312         if (log_resp_rec > 0) {
313                 LOG_COND(log_resend_requests,
314                          "[%s] Response received to %s/#%u",
315                          SHORT_UUID(rq->u_rq.uuid),
316                          _RQ_TYPE(rq->u_rq.request_type),
317                          rq->u_rq.seq);
318                 log_resp_rec--;
319         }
320
321         /* FIXME: Ensure memcpy cannot explode */
322         memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
323
324         r = kernel_send(&orig_rq->u_rq);
325         if (r)
326                 LOG_ERROR("Failed to send response to kernel");
327
328         free(orig_rq);
329         return r;
330 }
331
332 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
333 {
334         struct clog_cpg *match;
335
336         dm_list_iterate_items(match, &clog_cpg_list)
337                 if (match->handle == handle)
338                         return match;
339
340         return NULL;
341 }
342
343 /*
344  * prepare_checkpoint
345  * @entry: clog_cpg describing the log
346  * @cp_requester: nodeid requesting the checkpoint
347  *
348  * Creates and fills in a new checkpoint_data struct.
349  *
350  * Returns: checkpoint_data on success, NULL on error
351  */
352 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
353                                                   uint32_t cp_requester)
354 {
355         int r;
356         struct checkpoint_data *new;
357
358         if (entry->state != VALID) {
359                 /*
360                  * We can't store bitmaps yet, because the log is not
361                  * valid yet.
362                  */
363                 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
364                           cp_requester);
365                 return NULL;
366         }
367
368         new = malloc(sizeof(*new));
369         if (!new) {
370                 LOG_ERROR("Unable to create checkpoint data for %u",
371                           cp_requester);
372                 return NULL;
373         }
374         memset(new, 0, sizeof(*new));
375         new->requester = cp_requester;
376         strncpy(new->uuid, entry->name.value, entry->name.length);
377
378         new->bitmap_size = push_state(entry->name.value, entry->luid,
379                                       "clean_bits",
380                                       &new->clean_bits, cp_requester);
381         if (new->bitmap_size <= 0) {
382                 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
383                           new->requester);
384                 free(new);
385                 return NULL;
386         }
387
388         new->bitmap_size = push_state(entry->name.value, entry->luid,
389                                       "sync_bits",
390                                       &new->sync_bits, cp_requester);
391         if (new->bitmap_size <= 0) {
392                 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
393                           new->requester);
394                 free(new->clean_bits);
395                 free(new);
396                 return NULL;
397         }
398
399         r = push_state(entry->name.value, entry->luid,
400                        "recovering_region",
401                        &new->recovering_region, cp_requester);
402         if (r <= 0) {
403                 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
404                           new->requester);
405                 free(new->sync_bits);
406                 free(new->clean_bits);
407                 free(new);
408                 return NULL;
409         }
410         LOG_DBG("[%s] Checkpoint prepared for node %u:",
411                 SHORT_UUID(new->uuid), new->requester);
412         LOG_DBG("  bitmap_size = %d", new->bitmap_size);
413
414         return new;
415 }
416
417 /*
418  * free_checkpoint
419  * @cp: the checkpoint_data struct to free
420  *
421  */
422 static void free_checkpoint(struct checkpoint_data *cp)
423 {
424         free(cp->recovering_region);
425         free(cp->sync_bits);
426         free(cp->clean_bits);
427         free(cp);
428 }
429
430 static int export_checkpoint(struct checkpoint_data *cp)
431 {
432         SaCkptCheckpointCreationAttributesT attr;
433         SaCkptCheckpointHandleT h;
434         SaCkptSectionIdT section_id;
435         SaCkptSectionCreationAttributesT section_attr;
436         SaCkptCheckpointOpenFlagsT flags;
437         SaNameT name;
438         SaAisErrorT rv;
439         struct clog_request *rq;
440         int len, r = 0;
441         char buf[32];
442
443         LOG_DBG("Sending checkpointed data to %u", cp->requester);
444
445         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
446                        "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
447         name.length = len;
448
449         len = strlen(cp->recovering_region) + 1;
450
451         attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
452         attr.checkpointSize = cp->bitmap_size * 2 + len;
453
454         attr.retentionDuration = SA_TIME_MAX;
455         attr.maxSections = 4;      /* don't know why we need +1 */
456
457         attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len;
458         attr.maxSectionIdSize = 22;
459
460         flags = SA_CKPT_CHECKPOINT_READ |
461                 SA_CKPT_CHECKPOINT_WRITE |
462                 SA_CKPT_CHECKPOINT_CREATE;
463
464 open_retry:
465         rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
466         if (rv == SA_AIS_ERR_TRY_AGAIN) {
467                 LOG_ERROR("export_checkpoint: ckpt open retry");
468                 usleep(1000);
469                 goto open_retry;
470         }
471
472         if (rv == SA_AIS_ERR_EXIST) {
473                 LOG_DBG("export_checkpoint: checkpoint already exists");
474                 return -EEXIST;
475         }
476
477         if (rv != SA_AIS_OK) {
478                 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
479                           SHORT_UUID(cp->uuid), cp->requester,
480                           str_ais_error(rv));
481                 return -EIO; /* FIXME: better error */
482         }
483
484         /*
485          * Add section for sync_bits
486          */
487         section_id.idLen = snprintf(buf, 32, "sync_bits");
488         section_id.id = (unsigned char *)buf;
489         section_attr.sectionId = &section_id;
490         section_attr.expirationTime = SA_TIME_END;
491
492 sync_create_retry:
493         rv = saCkptSectionCreate(h, &section_attr,
494                                  cp->sync_bits, cp->bitmap_size);
495         if (rv == SA_AIS_ERR_TRY_AGAIN) {
496                 LOG_ERROR("Sync checkpoint section create retry");
497                 usleep(1000);
498                 goto sync_create_retry;
499         }
500
501         if (rv == SA_AIS_ERR_EXIST) {
502                 LOG_DBG("Sync checkpoint section already exists");
503                 saCkptCheckpointClose(h);
504                 return -EEXIST;
505         }
506
507         if (rv != SA_AIS_OK) {
508                 LOG_ERROR("Sync checkpoint section creation failed: %s",
509                           str_ais_error(rv));
510                 saCkptCheckpointClose(h);
511                 return -EIO; /* FIXME: better error */
512         }
513
514         /*
515          * Add section for clean_bits
516          */
517         section_id.idLen = snprintf(buf, 32, "clean_bits");
518         section_id.id = (unsigned char *)buf;
519         section_attr.sectionId = &section_id;
520         section_attr.expirationTime = SA_TIME_END;
521
522 clean_create_retry:
523         rv = saCkptSectionCreate(h, &section_attr, cp->clean_bits, cp->bitmap_size);
524         if (rv == SA_AIS_ERR_TRY_AGAIN) {
525                 LOG_ERROR("Clean checkpoint section create retry");
526                 usleep(1000);
527                 goto clean_create_retry;
528         }
529
530         if (rv == SA_AIS_ERR_EXIST) {
531                 LOG_DBG("Clean checkpoint section already exists");
532                 saCkptCheckpointClose(h);
533                 return -EEXIST;
534         }
535
536         if (rv != SA_AIS_OK) {
537                 LOG_ERROR("Clean checkpoint section creation failed: %s",
538                           str_ais_error(rv));
539                 saCkptCheckpointClose(h);
540                 return -EIO; /* FIXME: better error */
541         }
542
543         /*
544          * Add section for recovering_region
545          */
546         section_id.idLen = snprintf(buf, 32, "recovering_region");
547         section_id.id = (unsigned char *)buf;
548         section_attr.sectionId = &section_id;
549         section_attr.expirationTime = SA_TIME_END;
550
551 rr_create_retry:
552         rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
553                                  strlen(cp->recovering_region) + 1);
554         if (rv == SA_AIS_ERR_TRY_AGAIN) {
555                 LOG_ERROR("RR checkpoint section create retry");
556                 usleep(1000);
557                 goto rr_create_retry;
558         }
559
560         if (rv == SA_AIS_ERR_EXIST) {
561                 LOG_DBG("RR checkpoint section already exists");
562                 saCkptCheckpointClose(h);
563                 return -EEXIST;
564         }
565
566         if (rv != SA_AIS_OK) {
567                 LOG_ERROR("RR checkpoint section creation failed: %s",
568                           str_ais_error(rv));
569                 saCkptCheckpointClose(h);
570                 return -EIO; /* FIXME: better error */
571         }
572
573         LOG_DBG("export_checkpoint: closing checkpoint");
574         saCkptCheckpointClose(h);
575
576         rq = malloc(DM_ULOG_REQUEST_SIZE);
577         if (!rq) {
578                 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
579                 return -ENOMEM;
580         }
581         memset(rq, 0, sizeof(*rq));
582
583         dm_list_init(&rq->list);
584         rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
585         rq->originator = cp->requester;  /* FIXME: hack to overload meaning of originator */
586         strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
587         rq->u_rq.seq = my_cluster_id;
588
589         r = cluster_send(rq);
590         if (r)
591                 LOG_ERROR("Failed to send checkpoint ready notice: %s",
592                           strerror(-r));
593
594         free(rq);
595         return 0;
596 }
597
598 static int import_checkpoint(struct clog_cpg *entry, int no_read)
599 {
600         int rtn = 0;
601         SaCkptCheckpointHandleT h;
602         SaCkptSectionIterationHandleT itr;
603         SaCkptSectionDescriptorT desc;
604         SaCkptIOVectorElementT iov;
605         SaNameT name;
606         SaAisErrorT rv;
607         char *bitmap = NULL;
608         int len;
609
610         bitmap = malloc(1024*1024);
611         if (!bitmap)
612                 return -ENOMEM;
613
614         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
615                        SHORT_UUID(entry->name.value), my_cluster_id);
616         name.length = len;
617
618 open_retry:
619         rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
620                                   SA_CKPT_CHECKPOINT_READ, 0, &h);
621         if (rv == SA_AIS_ERR_TRY_AGAIN) {
622                 LOG_ERROR("import_checkpoint: ckpt open retry");
623                 usleep(1000);
624                 goto open_retry;
625         }
626
627         if (rv != SA_AIS_OK) {
628                 LOG_ERROR("[%s] Failed to open checkpoint: %s",
629                           SHORT_UUID(entry->name.value), str_ais_error(rv));
630                 return -EIO; /* FIXME: better error */
631         }
632
633 unlink_retry:
634         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
635         if (rv == SA_AIS_ERR_TRY_AGAIN) {
636                 LOG_ERROR("import_checkpoint: ckpt unlink retry");
637                 usleep(1000);
638                 goto unlink_retry;
639         }
640
641         if (no_read) {
642                 LOG_DBG("Checkpoint for this log already received");
643                 goto no_read;
644         }
645
646 init_retry:
647         rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
648                                               SA_TIME_END, &itr);
649         if (rv == SA_AIS_ERR_TRY_AGAIN) {
650                 LOG_ERROR("import_checkpoint: sync create retry");
651                 usleep(1000);
652                 goto init_retry;
653         }
654
655         if (rv != SA_AIS_OK) {
656                 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
657                           SHORT_UUID(entry->name.value), str_ais_error(rv));
658                 return -EIO; /* FIXME: better error */
659         }
660
661         len = 0;
662         while (1) {
663                 rv = saCkptSectionIterationNext(itr, &desc);
664                 if (rv == SA_AIS_OK)
665                         len++;
666                 else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
667                         break;
668                 else if (rv != SA_AIS_ERR_TRY_AGAIN) {
669                         LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
670                         break;
671                 }
672         }
673         saCkptSectionIterationFinalize(itr);
674         if (len != 3) {
675                 LOG_ERROR("import_checkpoint: %d checkpoint sections found",
676                           len);
677                 usleep(1000);
678                 goto init_retry;
679         }
680         saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
681                                          SA_TIME_END, &itr);
682
683         while (1) {
684                 rv = saCkptSectionIterationNext(itr, &desc);
685                 if (rv == SA_AIS_ERR_NO_SECTIONS)
686                         break;
687
688                 if (rv == SA_AIS_ERR_TRY_AGAIN) {
689                         LOG_ERROR("import_checkpoint: ckpt iternext retry");
690                         usleep(1000);
691                         continue;
692                 }
693
694                 if (rv != SA_AIS_OK) {
695                         LOG_ERROR("import_checkpoint: clean checkpoint section "
696                                   "creation failed: %s", str_ais_error(rv));
697                         rtn = -EIO; /* FIXME: better error */
698                         goto fail;
699                 }
700
701                 if (!desc.sectionSize) {
702                         LOG_ERROR("Checkpoint section empty");
703                         continue;
704                 }
705
706                 memset(bitmap, 0, sizeof(*bitmap));
707                 iov.sectionId = desc.sectionId;
708                 iov.dataBuffer = bitmap;
709                 iov.dataSize = desc.sectionSize;
710                 iov.dataOffset = 0;
711
712         read_retry:
713                 rv = saCkptCheckpointRead(h, &iov, 1, NULL);
714                 if (rv == SA_AIS_ERR_TRY_AGAIN) {
715                         LOG_ERROR("ckpt read retry");
716                         usleep(1000);
717                         goto read_retry;
718                 }
719
720                 if (rv != SA_AIS_OK) {
721                         LOG_ERROR("import_checkpoint: ckpt read error: %s",
722                                   str_ais_error(rv));
723                         rtn = -EIO; /* FIXME: better error */
724                         goto fail;
725                 }
726
727                 if (iov.readSize) {
728                         if (pull_state(entry->name.value, entry->luid,
729                                        (char *)desc.sectionId.id, bitmap,
730                                        iov.readSize)) {
731                                 LOG_ERROR("Error loading state");
732                                 rtn = -EIO;
733                                 goto fail;
734                         }
735                 } else {
736                         /* Need to request new checkpoint */
737                         rtn = -EAGAIN;
738                         goto fail;
739                 }
740         }
741
742 fail:
743         saCkptSectionIterationFinalize(itr);
744 no_read:
745         saCkptCheckpointClose(h);
746
747         free(bitmap);
748         return rtn;
749 }
750
751 static void do_checkpoints(struct clog_cpg *entry, int leaving)
752 {
753         struct checkpoint_data *cp;
754
755         for (cp = entry->checkpoint_list; cp;) {
756                 /*
757                  * FIXME: Check return code.  Could send failure
758                  * notice in rq in export_checkpoint function
759                  * by setting rq->error
760                  */
761                 switch (export_checkpoint(cp)) {
762                 case -EEXIST:
763                         LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
764                                    SHORT_UUID(entry->name.value), cp->requester,
765                                    (leaving) ? "(L)": "");
766                         LOG_COND(log_checkpoint,
767                                  "[%s] Checkpoint for %u already handled%s",
768                                  SHORT_UUID(entry->name.value), cp->requester,
769                                  (leaving) ? "(L)": "");
770                         entry->checkpoint_list = cp->next;
771                         free_checkpoint(cp);
772                         cp = entry->checkpoint_list;
773                         break;
774                 case 0:
775                         LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
776                                    SHORT_UUID(entry->name.value), cp->requester,
777                                    (leaving) ? "(L)": "");
778                         LOG_COND(log_checkpoint,
779                                  "[%s] Checkpoint data available for node %u%s",
780                                  SHORT_UUID(entry->name.value), cp->requester,
781                                  (leaving) ? "(L)": "");
782                         entry->checkpoint_list = cp->next;
783                         free_checkpoint(cp);
784                         cp = entry->checkpoint_list;
785                         break;
786                 default:
787                         /* FIXME: Skipping will cause list corruption */
788                         LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
789                                   SHORT_UUID(entry->name.value), cp->requester,
790                                   (leaving) ? "(L)": "");
791                 }
792         }
793 }
794
795 static int resend_requests(struct clog_cpg *entry)
796 {
797         int r = 0;
798         struct clog_request *rq, *n;
799
800         if (!entry->resend_requests || entry->delay)
801                 return 0;
802
803         if (entry->state != VALID)
804                 return 0;
805
806         entry->resend_requests = 0;
807
808         dm_list_iterate_items_safe(rq, n, &entry->working_list) {
809                 dm_list_del(&rq->list);
810
811                 if (strcmp(entry->name.value, rq->u_rq.uuid)) {
812                         LOG_ERROR("[%s]  Stray request from another log (%s)",
813                                   SHORT_UUID(entry->name.value),
814                                   SHORT_UUID(rq->u_rq.uuid));
815                         free(rq);
816                         continue;
817                 }
818
819                 switch (rq->u_rq.request_type) {
820                 case DM_ULOG_SET_REGION_SYNC:
821                         /*
822                          * Some requests simply do not need to be resent.
823                          * If it is a request that just changes log state,
824                          * then it doesn't need to be resent (everyone makes
825                          * updates).
826                          */
827                         LOG_COND(log_resend_requests,
828                                  "[%s] Skipping resend of %s/#%u...",
829                                  SHORT_UUID(entry->name.value),
830                                  _RQ_TYPE(rq->u_rq.request_type),
831                                  rq->u_rq.seq);
832                         LOG_SPRINT(entry, "###  No resend: [%s] %s/%u ###",
833                                    SHORT_UUID(entry->name.value),
834                                    _RQ_TYPE(rq->u_rq.request_type),
835                                    rq->u_rq.seq);
836
837                         rq->u_rq.data_size = 0;
838                         kernel_send(&rq->u_rq);
839                                 
840                         break;
841                         
842                 default:
843                         /*
844                          * If an action or a response is required, then
845                          * the request must be resent.
846                          */
847                         LOG_COND(log_resend_requests,
848                                  "[%s] Resending %s(#%u) due to new server(%u)",
849                                  SHORT_UUID(entry->name.value),
850                                  _RQ_TYPE(rq->u_rq.request_type),
851                                  rq->u_rq.seq, entry->lowest_id);
852                         LOG_SPRINT(entry, "***  Resending: [%s] %s/%u ***",
853                                    SHORT_UUID(entry->name.value),
854                                    _RQ_TYPE(rq->u_rq.request_type),
855                                    rq->u_rq.seq);
856                         r = cluster_send(rq);
857                         if (r < 0)
858                                 LOG_ERROR("Failed resend");
859                 }
860                 free(rq);
861         }
862
863         return r;
864 }
865
866 static int do_cluster_work(void *data)
867 {
868         int r = SA_AIS_OK;
869         struct clog_cpg *entry;
870
871         dm_list_iterate_items(entry, &clog_cpg_list) {
872                 r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
873                 if (r != SA_AIS_OK)
874                         LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
875
876                 if (entry->free_me) {
877                         free(entry);
878                         continue;
879                 }
880                 do_checkpoints(entry, 0);
881
882                 resend_requests(entry);
883         }
884
885         return (r == SA_AIS_OK) ? 0 : -1;  /* FIXME: good error number? */
886 }
887
888 static int flush_startup_list(struct clog_cpg *entry)
889 {
890         int r = 0;
891         int i_was_server;
892         struct clog_request *rq, *n;
893         struct checkpoint_data *new;
894
895         dm_list_iterate_items_safe(rq, n, &entry->startup_list) {
896                 dm_list_del(&rq->list);
897
898                 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
899                         new = prepare_checkpoint(entry, rq->originator);
900                         if (!new) {
901                                 /*
902                                  * FIXME: Need better error handling.  Other nodes
903                                  * will be trying to send the checkpoint too, and we
904                                  * must continue processing the list; so report error
905                                  * but continue.
906                                  */
907                                 LOG_ERROR("Failed to prepare checkpoint for %u!!!",
908                                           rq->originator);
909                                 free(rq);
910                                 continue;
911                         }
912                         LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
913                                    SHORT_UUID(entry->name.value), rq->originator);
914                         LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
915                                  SHORT_UUID(entry->name.value), rq->originator);
916                         new->next = entry->checkpoint_list;
917                         entry->checkpoint_list = new;
918                 } else {
919                         LOG_DBG("[%s] Processing delayed request: %s",
920                                 SHORT_UUID(rq->u_rq.uuid),
921                                 _RQ_TYPE(rq->u_rq.request_type));
922                         i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
923                         r = handle_cluster_request(entry, rq, i_was_server);
924
925                         if (r)
926                                 /*
927                                  * FIXME: If we error out here, we will never get
928                                  * another opportunity to retry these requests
929                                  */
930                                 LOG_ERROR("Error while processing delayed CPG message");
931                 }
932                 free(rq);
933         }
934
935         return 0;
936 }
937
938 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname,
939                                  uint32_t nodeid, uint32_t pid,
940                                  void *msg, size_t msg_len)
941 {
942         int i;
943         int r = 0;
944         int i_am_server;
945         int response = 0;
946         struct clog_request *rq = msg;
947         struct clog_request *tmp_rq;
948         struct clog_cpg *match;
949
950         match = find_clog_cpg(handle);
951         if (!match) {
952                 LOG_ERROR("Unable to find clog_cpg for cluster message");
953                 return;
954         }
955
956         if ((nodeid == my_cluster_id) &&
957             !(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
958             (rq->u_rq.request_type != DM_ULOG_RESUME) &&
959             (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
960             (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
961                 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
962                 if (!tmp_rq) {
963                         /*
964                          * FIXME: It may be possible to continue... but we
965                          * would not be able to resend any messages that might
966                          * be necessary during membership changes
967                          */
968                         LOG_ERROR("[%s] Unable to record request: -ENOMEM",
969                                   SHORT_UUID(rq->u_rq.uuid));
970                         return;
971                 }
972                 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
973                 dm_list_init(&tmp_rq->list);
974                 dm_list_add( &match->working_list, &tmp_rq->list);
975         }
976
977         if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
978                 /*
979                  * If the server (lowest_id) indicates it is leaving,
980                  * then we must resend any outstanding requests.  However,
981                  * we do not want to resend them if the next server in
982                  * line is in the process of leaving.
983                  */
984                 if (nodeid == my_cluster_id) {
985                         LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
986                                  SHORT_UUID(rq->u_rq.uuid));
987                 } else {
988                         if (nodeid < my_cluster_id) {
989                                 if (nodeid == match->lowest_id) {
990                                         match->resend_requests = 1;
991                                         LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
992                                                  SHORT_UUID(rq->u_rq.uuid), nodeid,
993                                                  (dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
994
995                                         dm_list_iterate_items(tmp_rq, &match->working_list)
996                                                 LOG_COND(log_resend_requests,
997                                                          "[%s]                %s/%u",
998                                                          SHORT_UUID(tmp_rq->u_rq.uuid),
999                                                          _RQ_TYPE(tmp_rq->u_rq.request_type),
1000                                                          tmp_rq->u_rq.seq);
1001                                 }
1002
1003                                 match->delay++;
1004                                 LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
1005                                          SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
1006                         }
1007                         rq->originator = nodeid; /* don't really need this, but nice for debug */
1008                         goto out;
1009                 }
1010         }
1011
1012         /*
1013          * We can receive messages after we do a cpg_leave but before we
1014          * get our config callback.  However, since we can't respond after
1015          * leaving, we simply return.
1016          */
1017         if (match->state == LEAVING)
1018                 return;
1019
1020         i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
1021
1022         if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
1023                 if (my_cluster_id == rq->originator) {
1024                         /* Redundant checkpoints ignored if match->valid */
1025                         LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
1026                                    SHORT_UUID(rq->u_rq.uuid), nodeid);
1027                         if (import_checkpoint(match, (match->state != INVALID))) {
1028                                 LOG_SPRINT(match,
1029                                            "[%s] Failed to import checkpoint from %u",
1030                                            SHORT_UUID(rq->u_rq.uuid), nodeid);
1031                                 LOG_ERROR("[%s] Failed to import checkpoint from %u",
1032                                           SHORT_UUID(rq->u_rq.uuid), nodeid);
1033                                 kill(getpid(), SIGUSR1);
1034                                 /* Could we retry? */
1035                                 goto out;
1036                         } else if (match->state == INVALID) {
1037                                 LOG_SPRINT(match,
1038                                            "[%s] Checkpoint data received from %u.  Log is now valid",
1039                                            SHORT_UUID(match->name.value), nodeid);
1040                                 LOG_COND(log_checkpoint,
1041                                          "[%s] Checkpoint data received from %u.  Log is now valid",
1042                                          SHORT_UUID(match->name.value), nodeid);
1043                                 match->state = VALID;
1044
1045                                 flush_startup_list(match);
1046                         } else {
1047                                 LOG_SPRINT(match,
1048                                            "[%s] Redundant checkpoint from %u ignored.",
1049                                            SHORT_UUID(rq->u_rq.uuid), nodeid);
1050                         }
1051                 }
1052                 goto out;
1053         }
1054
1055         if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
1056                 response = 1;
1057                 r = handle_cluster_response(match, rq);
1058         } else {
1059                 rq->originator = nodeid;
1060
1061                 if (match->state == LEAVING) {
1062                         LOG_ERROR("[%s]  Ignoring %s from %u.  Reason: I'm leaving",
1063                                   SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
1064                                   rq->originator);
1065                         goto out;
1066                 }
1067
1068                 if (match->state == INVALID) {
1069                         LOG_DBG("Log not valid yet, storing request");
1070                         tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
1071                         if (!tmp_rq) {
1072                                 LOG_ERROR("cpg_message_callback:  Unable to"
1073                                           " allocate transfer structs");
1074                                 r = -ENOMEM; /* FIXME: Better error #? */
1075                                 goto out;
1076                         }
1077
1078                         memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
1079                         tmp_rq->pit_server = match->lowest_id;
1080                         dm_list_init(&tmp_rq->list);
1081                         dm_list_add(&match->startup_list, &tmp_rq->list);
1082                         goto out;
1083                 }
1084
1085                 r = handle_cluster_request(match, rq, i_am_server);
1086         }
1087
1088         /*
1089          * If the log is now valid, we can queue the checkpoints
1090          */
1091         for (i = match->checkpoints_needed; i; ) {
1092                 struct checkpoint_data *new;
1093
1094                 if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
1095                         LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1096                                 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
1097                         break;
1098                 }
1099
1100                 i--;
1101                 new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
1102                 if (!new) {
1103                         /* FIXME: Need better error handling */
1104                         LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1105                                   SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1106                         break;
1107                 }
1108                 LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
1109                            SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
1110                            (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
1111                 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
1112                          SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1113                 match->checkpoints_needed--;
1114
1115                 new->next = match->checkpoint_list;
1116                 match->checkpoint_list = new;
1117         }
1118
1119 out:
1120         /* nothing happens after this point.  It is just for debugging */
1121         if (r) {
1122                 LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1123                           SHORT_UUID(rq->u_rq.uuid),
1124                           _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
1125                           strerror(-r));
1126                 LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(rq->u_rq.uuid),
1127                           (response) ? "YES" : "NO");
1128                 LOG_ERROR("[%s]    Originator: %u",
1129                           SHORT_UUID(rq->u_rq.uuid), rq->originator);
1130                 if (response)
1131                         LOG_ERROR("[%s]    Responder : %u",
1132                                   SHORT_UUID(rq->u_rq.uuid), nodeid);
1133
1134                 LOG_ERROR("HISTORY::");
1135                 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1136                         match->idx++;
1137                         match->idx = match->idx % DEBUGGING_HISTORY;
1138                         if (match->debugging[match->idx][0] == '\0')
1139                                 continue;
1140                         LOG_ERROR("%d:%d) %s", i, match->idx,
1141                                   match->debugging[match->idx]);
1142                 }
1143         } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
1144                    (rq->originator == my_cluster_id)) {
1145                 if (!response)
1146                         LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1147                                    rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1148                                    _RQ_TYPE(rq->u_rq.request_type),
1149                                    rq->originator, (response) ? "YES" : "NO");
1150                 else
1151                         LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1152                                    rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1153                                    _RQ_TYPE(rq->u_rq.request_type),
1154                                    rq->originator, (response) ? "YES" : "NO",
1155                                    nodeid);
1156         }
1157 }
1158
1159 static void cpg_join_callback(struct clog_cpg *match,
1160                               const struct cpg_address *joined,
1161                               const struct cpg_address *member_list,
1162                               size_t member_list_entries)
1163 {
1164         int i;
1165         int my_pid = getpid();
1166         uint32_t lowest = match->lowest_id;
1167         struct clog_request *rq;
1168         char dbuf[32];
1169
1170         /* Assign my_cluster_id */
1171         if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
1172                 my_cluster_id = joined->nodeid;
1173
1174         /* Am I the very first to join? */
1175         if (member_list_entries == 1) {
1176                 match->lowest_id = joined->nodeid;
1177                 match->state = VALID;
1178         }
1179
1180         /* If I am part of the joining list, I do not send checkpoints */
1181         if (joined->nodeid == my_cluster_id)
1182                 goto out;
1183
1184         memset(dbuf, 0, sizeof(dbuf));
1185         for (i = 0; i < (member_list_entries-1); i++)
1186                 sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
1187         sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
1188         LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
1189                  SHORT_UUID(match->name.value), joined->nodeid, dbuf);
1190
1191         /*
1192          * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1193          * the startup_list interface exclusively
1194          */
1195         if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
1196             (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
1197                 match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
1198                 goto out;
1199         }
1200
1201         rq = malloc(DM_ULOG_REQUEST_SIZE);
1202         if (!rq) {
1203                 LOG_ERROR("cpg_config_callback: "
1204                           "Unable to allocate transfer structs");
1205                 LOG_ERROR("cpg_config_callback: "
1206                           "Unable to perform checkpoint");
1207                 goto out;
1208         }
1209         rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
1210         rq->originator = joined->nodeid;
1211         dm_list_init(&rq->list);
1212         dm_list_add(&match->startup_list, &rq->list);
1213
1214 out:
1215         /* Find the lowest_id, i.e. the server */
1216         match->lowest_id = member_list[0].nodeid;
1217         for (i = 0; i < member_list_entries; i++)
1218                 if (match->lowest_id > member_list[i].nodeid)
1219                         match->lowest_id = member_list[i].nodeid;
1220
1221         if (lowest == 0xDEAD)
1222                 LOG_COND(log_membership_change, "[%s]  Server change <none> -> %u (%u %s)",
1223                          SHORT_UUID(match->name.value), match->lowest_id,
1224                          joined->nodeid, (member_list_entries == 1) ?
1225                          "is first to join" : "joined");
1226         else if (lowest != match->lowest_id)
1227                 LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u joined)",
1228                          SHORT_UUID(match->name.value), lowest,
1229                          match->lowest_id, joined->nodeid);
1230         else
1231                 LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u joined)",
1232                          SHORT_UUID(match->name.value),
1233                          lowest, joined->nodeid);
1234         LOG_SPRINT(match, "+++  UUID=%s  %u join  +++",
1235                    SHORT_UUID(match->name.value), joined->nodeid);
1236 }
1237
1238 static void cpg_leave_callback(struct clog_cpg *match,
1239                                const struct cpg_address *left,
1240                                const struct cpg_address *member_list,
1241                                size_t member_list_entries)
1242 {
1243         int i, j, fd;
1244         uint32_t lowest = match->lowest_id;
1245         struct clog_request *rq, *n;
1246         struct checkpoint_data *p_cp, *c_cp;
1247
1248         LOG_SPRINT(match, "---  UUID=%s  %u left  ---",
1249                    SHORT_UUID(match->name.value), left->nodeid);
1250
1251         /* Am I leaving? */
1252         if (my_cluster_id == left->nodeid) {
1253                 LOG_DBG("Finalizing leave...");
1254                 dm_list_del(&match->list);
1255
1256                 cpg_fd_get(match->handle, &fd);
1257                 links_unregister(fd);
1258
1259                 cluster_postsuspend(match->name.value, match->luid);
1260
1261                 dm_list_iterate_items_safe(rq, n, &match->working_list) {
1262                         dm_list_del(&rq->list);
1263
1264                         if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
1265                                 kernel_send(&rq->u_rq);
1266                         free(rq);
1267                 }
1268
1269                 cpg_finalize(match->handle);
1270
1271                 match->free_me = 1;
1272                 match->lowest_id = 0xDEAD;
1273                 match->state = INVALID;
1274         }                       
1275
1276         /* Remove any pending checkpoints for the leaving node. */
1277         for (p_cp = NULL, c_cp = match->checkpoint_list;
1278              c_cp && (c_cp->requester != left->nodeid);
1279              p_cp = c_cp, c_cp = c_cp->next);
1280         if (c_cp) {
1281                 if (p_cp)
1282                         p_cp->next = c_cp->next;
1283                 else
1284                         match->checkpoint_list = c_cp->next;
1285
1286                 LOG_COND(log_checkpoint,
1287                          "[%s] Removing pending checkpoint (%u is leaving)",
1288                          SHORT_UUID(match->name.value), left->nodeid);
1289                 free_checkpoint(c_cp);
1290         }
1291         dm_list_iterate_items_safe(rq, n, &match->startup_list) {
1292                 if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
1293                     (rq->originator == left->nodeid)) {
1294                         LOG_COND(log_checkpoint,
1295                                  "[%s] Removing pending ckpt from startup list (%u is leaving)",
1296                                  SHORT_UUID(match->name.value), left->nodeid);
1297                         dm_list_del(&rq->list);
1298                         free(rq);
1299                 }
1300         }
1301         for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
1302                 match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
1303                 if (match->checkpoint_requesters[i] == left->nodeid) {
1304                         LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1305                                   SHORT_UUID(match->name.value), left->nodeid);
1306                         j--;
1307                 }
1308         }
1309         match->checkpoints_needed = j;
1310
1311         if (left->nodeid < my_cluster_id) {
1312                 match->delay = (match->delay > 0) ? match->delay - 1 : 0;
1313                 if (!match->delay && dm_list_empty(&match->working_list))
1314                         match->resend_requests = 0;
1315                 LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
1316                          SHORT_UUID(match->name.value), left->nodeid,
1317                          match->delay, (dm_list_empty(&match->working_list)) ?
1318                          " -- working_list empty": "");
1319         }
1320
1321         /* Find the lowest_id, i.e. the server */
1322         if (!member_list_entries) {
1323                 match->lowest_id = 0xDEAD;
1324                 LOG_COND(log_membership_change, "[%s]  Server change %u -> <none> "
1325                          "(%u is last to leave)",
1326                          SHORT_UUID(match->name.value), left->nodeid,
1327                          left->nodeid);
1328                 return;
1329         }
1330                 
1331         match->lowest_id = member_list[0].nodeid;
1332         for (i = 0; i < member_list_entries; i++)
1333                 if (match->lowest_id > member_list[i].nodeid)
1334                         match->lowest_id = member_list[i].nodeid;
1335
1336         if (lowest != match->lowest_id) {
1337                 LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u left)",
1338                          SHORT_UUID(match->name.value), lowest,
1339                          match->lowest_id, left->nodeid);
1340         } else
1341                 LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u left)",
1342                          SHORT_UUID(match->name.value), lowest, left->nodeid);
1343
1344         if ((match->state == INVALID) && !match->free_me) {
1345                 /*
1346                  * If all CPG members are waiting for checkpoints and they
1347                  * are all present in my startup_list, then I was the first to
1348                  * join and I must assume control.
1349                  *
1350                  * We do not normally end up here, but if there was a quick
1351                  * 'resume -> suspend -> resume' across the cluster, we may
1352                  * have initially thought we were not the first to join because
1353                  * of the presence of out-going (and unable to respond) members.
1354                  */
1355
1356                 i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1357                 dm_list_iterate_items(rq, &match->startup_list)
1358                         if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
1359                                 i++;
1360
1361                 if (i == member_list_entries) {
1362                         /* 
1363                          * Last node who could have given me a checkpoint just left.
1364                          * Setting log state to VALID and acting as 'first join'.
1365                          */
1366                         match->state = VALID;
1367                         flush_startup_list(match);
1368                 }
1369         }
1370 }
1371
1372 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname,
1373                                 const struct cpg_address *member_list,
1374                                 size_t member_list_entries,
1375                                 const struct cpg_address *left_list,
1376                                 size_t left_list_entries,
1377                                 const struct cpg_address *joined_list,
1378                                 size_t joined_list_entries)
1379 {
1380         struct clog_cpg *match;
1381         int found = 0;
1382
1383         dm_list_iterate_items(match, &clog_cpg_list)
1384                 if (match->handle == handle) {
1385                         found = 1;
1386                         break;
1387                 }
1388
1389         if (!found) {
1390                 LOG_ERROR("Unable to find match for CPG config callback");
1391                 return;
1392         }
1393
1394         if ((joined_list_entries + left_list_entries) > 1)
1395                 LOG_ERROR("[%s]  More than one node joining/leaving",
1396                           SHORT_UUID(match->name.value));
1397
1398         if (joined_list_entries)
1399                 cpg_join_callback(match, joined_list,
1400                                   member_list, member_list_entries);
1401         else
1402                 cpg_leave_callback(match, left_list,
1403                                    member_list, member_list_entries);
1404 }
1405
1406 cpg_callbacks_t cpg_callbacks = {
1407         .cpg_deliver_fn = cpg_message_callback,
1408         .cpg_confchg_fn = cpg_config_callback,
1409 };
1410
1411 /*
1412  * remove_checkpoint
1413  * @entry
1414  *
1415  * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1416  */
1417 int remove_checkpoint(struct clog_cpg *entry)
1418 {
1419         int len;
1420         SaNameT name;
1421         SaAisErrorT rv;
1422         SaCkptCheckpointHandleT h;
1423
1424         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
1425                        SHORT_UUID(entry->name.value), my_cluster_id);
1426         name.length = len;
1427
1428 open_retry:
1429         rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
1430                                   SA_CKPT_CHECKPOINT_READ, 0, &h);
1431         if (rv == SA_AIS_ERR_TRY_AGAIN) {
1432                 LOG_ERROR("abort_startup: ckpt open retry");
1433                 usleep(1000);
1434                 goto open_retry;
1435         }
1436
1437         if (rv != SA_AIS_OK)
1438                 return 0;
1439
1440         LOG_DBG("[%s]  Removing checkpoint", SHORT_UUID(entry->name.value));
1441 unlink_retry:
1442         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
1443         if (rv == SA_AIS_ERR_TRY_AGAIN) {
1444                 LOG_ERROR("abort_startup: ckpt unlink retry");
1445                 usleep(1000);
1446                 goto unlink_retry;
1447         }
1448         
1449         if (rv != SA_AIS_OK) {
1450                 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1451                           SHORT_UUID(entry->name.value), str_ais_error(rv));
1452                 return -EIO;
1453         }
1454
1455         saCkptCheckpointClose(h);
1456
1457         return 1;
1458 }
1459
1460 int create_cluster_cpg(char *uuid, uint64_t luid)
1461 {
1462         int r;
1463         int size;
1464         struct clog_cpg *new = NULL;
1465         struct clog_cpg *tmp;
1466
1467         dm_list_iterate_items(tmp, &clog_cpg_list)
1468                 if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
1469                         LOG_ERROR("Log entry already exists: %s", uuid);
1470                         return -EEXIST;
1471                 }
1472
1473         new = malloc(sizeof(*new));
1474         if (!new) {
1475                 LOG_ERROR("Unable to allocate memory for clog_cpg");
1476                 return -ENOMEM;
1477         }
1478         memset(new, 0, sizeof(*new));
1479         dm_list_init(&new->list);
1480         new->lowest_id = 0xDEAD;
1481         dm_list_init(&new->startup_list);
1482         dm_list_init(&new->working_list);
1483
1484         size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
1485                 CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
1486         strncpy(new->name.value, uuid, size);
1487         new->name.length = size;
1488         new->luid = luid;
1489
1490         /*
1491          * Ensure there are no stale checkpoints around before we join
1492          */
1493         if (remove_checkpoint(new) == 1)
1494                 LOG_COND(log_checkpoint,
1495                          "[%s]  Removing checkpoints left from previous session",
1496                          SHORT_UUID(new->name.value));
1497
1498         r = cpg_initialize(&new->handle, &cpg_callbacks);
1499         if (r != SA_AIS_OK) {
1500                 LOG_ERROR("cpg_initialize failed:  Cannot join cluster");
1501                 free(new);
1502                 return -EPERM;
1503         }
1504
1505         r = cpg_join(new->handle, &new->name);
1506         if (r != SA_AIS_OK) {
1507                 LOG_ERROR("cpg_join failed:  Cannot join cluster");
1508                 free(new);
1509                 return -EPERM;
1510         }
1511
1512         new->cpg_state = VALID;
1513         dm_list_add(&clog_cpg_list, &new->list);
1514         LOG_DBG("New   handle: %llu", (unsigned long long)new->handle);
1515         LOG_DBG("New   name: %s", new->name.value);
1516
1517         /* FIXME: better variable */
1518         cpg_fd_get(new->handle, &r);
1519         links_register(r, "cluster", do_cluster_work, NULL);
1520
1521         return 0;
1522 }
1523
1524 static void abort_startup(struct clog_cpg *del)
1525 {
1526         struct clog_request *rq, *n;
1527
1528         LOG_DBG("[%s]  CPG teardown before checkpoint received",
1529                 SHORT_UUID(del->name.value));
1530
1531         dm_list_iterate_items_safe(rq, n, &del->startup_list) {
1532                 dm_list_del(&rq->list);
1533
1534                 LOG_DBG("[%s]  Ignoring request from %u: %s",
1535                         SHORT_UUID(del->name.value), rq->originator,
1536                         _RQ_TYPE(rq->u_rq.request_type));
1537                 free(rq);
1538         }
1539
1540         remove_checkpoint(del);
1541 }
1542
1543 static int _destroy_cluster_cpg(struct clog_cpg *del)
1544 {
1545         int r;
1546         int state;
1547         
1548         LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
1549                  SHORT_UUID(del->name.value));
1550
1551         /*
1552          * We must send any left over checkpoints before
1553          * leaving.  If we don't, an incoming node could
1554          * be stuck with no checkpoint and stall.
1555          do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1556
1557          - Incoming node deletes old checkpoints before joining
1558          - A stale checkpoint is issued here by leaving node
1559          - (leaving node leaves)
1560          - Incoming node joins cluster and finds stale checkpoint.
1561          - (leaving node leaves - option 2)
1562         */
1563         do_checkpoints(del, 1);
1564
1565         state = del->state;
1566
1567         del->cpg_state = INVALID;
1568         del->state = LEAVING;
1569
1570         /*
1571          * If the state is VALID, we might be processing the
1572          * startup list.  If so, we certainly don't want to
1573          * clear the startup_list here by calling abort_startup
1574          */
1575         if (!dm_list_empty(&del->startup_list) && (state != VALID))
1576                 abort_startup(del);
1577
1578         r = cpg_leave(del->handle, &del->name);
1579         if (r != CPG_OK)
1580                 LOG_ERROR("Error leaving CPG!");
1581         return 0;
1582 }
1583
1584 int destroy_cluster_cpg(char *uuid)
1585 {
1586         struct clog_cpg *del, *tmp;
1587
1588         dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
1589                 if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
1590                         _destroy_cluster_cpg(del);
1591
1592         return 0;
1593 }
1594
1595 int init_cluster(void)
1596 {
1597         SaAisErrorT rv;
1598
1599         dm_list_init(&clog_cpg_list);
1600         rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
1601
1602         if (rv != SA_AIS_OK)
1603                 return EXIT_CLUSTER_CKPT_INIT;
1604
1605         return 0;
1606 }
1607
1608 void cleanup_cluster(void)
1609 {
1610         SaAisErrorT err;
1611
1612         err = saCkptFinalize(ckpt_handle);
1613         if (err != SA_AIS_OK)
1614                 LOG_ERROR("Failed to finalize checkpoint handle");
1615 }
1616
1617 void cluster_debug(void)
1618 {
1619         struct checkpoint_data *cp;
1620         struct clog_cpg *entry;
1621         struct clog_request *rq;
1622         int i;
1623
1624         LOG_ERROR("");
1625         LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1626         dm_list_iterate_items(entry, &clog_cpg_list) {
1627                 LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
1628                 LOG_ERROR("  lowest_id         : %u", entry->lowest_id);
1629                 LOG_ERROR("  state             : %s", (entry->state == INVALID) ?
1630                           "INVALID" : (entry->state == VALID) ? "VALID" :
1631                           (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
1632                 LOG_ERROR("  cpg_state         : %d", entry->cpg_state);
1633                 LOG_ERROR("  free_me           : %d", entry->free_me);
1634                 LOG_ERROR("  delay             : %d", entry->delay);
1635                 LOG_ERROR("  resend_requests   : %d", entry->resend_requests);
1636                 LOG_ERROR("  checkpoints_needed: %d", entry->checkpoints_needed);
1637                 for (i = 0, cp = entry->checkpoint_list;
1638                      i < MAX_CHECKPOINT_REQUESTERS; i++)
1639                         if (cp)
1640                                 cp = cp->next;
1641                         else
1642                                 break;
1643                 LOG_ERROR("  CKPTs waiting     : %d", i);
1644                 LOG_ERROR("  Working list:");
1645                 dm_list_iterate_items(rq, &entry->working_list)
1646                         LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1647                                   rq->u_rq.seq);
1648
1649                 LOG_ERROR("  Startup list:");
1650                 dm_list_iterate_items(rq, &entry->startup_list)
1651                         LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1652                                   rq->u_rq.seq);
1653
1654                 LOG_ERROR("Command History:");
1655                 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1656                         entry->idx++;
1657                         entry->idx = entry->idx % DEBUGGING_HISTORY;
1658                         if (entry->debugging[entry->idx][0] == '\0')
1659                                 continue;
1660                         LOG_ERROR("%d:%d) %s", i, entry->idx,
1661                                   entry->debugging[entry->idx]);
1662                 }
1663         }
1664 }