Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=d14... Commit: d145234077b65725c0e2166e66d55b41f54edca1 Parent: 413f459962c9bb5a9f90fd2e09b56253d5120cc6 Author: Lon Hohberger lhh@redhat.com AuthorDate: Wed Mar 21 16:29:12 2012 -0400 Committer: Fabio M. Di Nitto fdinitto@redhat.com CommitterDate: Tue Apr 24 17:04:09 2012 +0200
rgmanager: Add simple locking over CPG
Signed-off-by: Lon Hohberger lhh@redhat.com --- rgmanager/include/cpglock-internal.h | 38 ++ rgmanager/include/cpglock.h | 40 ++ rgmanager/include/list.h | 1 + rgmanager/include/lock.h | 15 +- rgmanager/include/sock.h | 18 + rgmanager/src/clulib/Makefile | 2 +- rgmanager/src/clulib/cpg_lock.c | 104 +++ rgmanager/src/clulib/dlm_lock.c | 301 +++++++++ rgmanager/src/clulib/libcpglock.c | 286 +++++++++ rgmanager/src/clulib/lock.c | 294 +--------- rgmanager/src/clulib/sock.c | 354 +++++++++++ rgmanager/src/daemons/Makefile | 12 +- rgmanager/src/daemons/cpglockd.c | 1147 ++++++++++++++++++++++++++++++++++ rgmanager/src/daemons/main.c | 24 +- 14 files changed, 2326 insertions(+), 310 deletions(-)
diff --git a/rgmanager/include/cpglock-internal.h b/rgmanager/include/cpglock-internal.h new file mode 100644 index 0000000..1c322ae --- /dev/null +++ b/rgmanager/include/cpglock-internal.h @@ -0,0 +1,38 @@ +#ifndef _CPGLOCK_INT_H +#define _CPGLOCK_INT_H + +#ifndef CPG_LOCKD_SOCK +#define CPG_LOCKD_SOCK "/var/run/cpglockd.sk" +#endif + +#include <stdint.h> + +typedef enum { + MSG_LOCK = 1, + MSG_NAK = 2, + MSG_GRANT = 3, + MSG_UNLOCK = 4, + MSG_PURGE = 5, + MSG_CONFCHG= 6, + MSG_JOIN = 7, + MSG_DUMP = 998, + MSG_HALT = 999 +} cpg_lock_req_t; + +/* Mixed architecture not supported yet */ +struct __attribute__((packed)) cpg_lock_msg { + char resource[96]; + int32_t request; + uint32_t owner_nodeid; + uint32_t owner_pid; + uint32_t flags; + uint32_t lockid; + uint32_t owner_tid; + char pad[8]; +}; /* 128 */ + +pid_t _gettid (void); + +#define CPG_LOCKD_NAME "cpglockd" + +#endif diff --git a/rgmanager/include/cpglock.h b/rgmanager/include/cpglock.h new file mode 100644 index 0000000..e451d2f --- /dev/null +++ b/rgmanager/include/cpglock.h @@ -0,0 +1,40 @@ +#ifndef _CPGLOCK_H +#define _CPGLOCK_H + +typedef enum { + LOCK_FREE = 0, + LOCK_PENDING = 1, + LOCK_HELD = 2 +} lock_state_t; + +typedef enum { + FL_TRY = 0x1 +} lock_flag_t; + +struct cpg_lock { + char resource[96]; + int local_fd; + int local_id; + lock_state_t state; + int owner_nodeid; + int owner_pid; + int owner_tid; +}; + +typedef void * cpg_lock_handle_t; + +int cpg_lock_init(cpg_lock_handle_t *h); + +/* 0 if successful, -1 if error */ +int cpg_lock(cpg_lock_handle_t h, const char *resource, + lock_flag_t flags, struct cpg_lock *lock); + +int cpg_unlock(cpg_lock_handle_t h, + struct cpg_lock *lock); + +/* Warning: drops all locks with this client */ +int cpg_lock_fin(cpg_lock_handle_t h); + +int cpg_lock_dump(FILE *fp); + +#endif diff --git a/rgmanager/include/list.h b/rgmanager/include/list.h index 0dac6e8..0d1c1d5 100644 --- a/rgmanager/include/list.h +++ b/rgmanager/include/list.h @@ -36,6 +36,7 @@ do { \ *list = newnode; \ } while (0)
+#define list_append list_insert
#define list_remove(list, oldnode) \ do { \ diff --git a/rgmanager/include/lock.h b/rgmanager/include/lock.h index 7941304..82a1faf 100644 --- a/rgmanager/include/lock.h +++ b/rgmanager/include/lock.h @@ -6,17 +6,12 @@ #include <stdlib.h> #include <libdlm.h>
-int clu_ls_lock(dlm_lshandle_t ls, int mode, struct dlm_lksb *lksb, - int options, const char *resource); -dlm_lshandle_t clu_open_lockspace(const char *lsname); -int clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb); -int clu_close_lockspace(dlm_lshandle_t ls, const char *lsname); - /* Default lockspace wrappers */ int clu_lock_init(const char *default_lsname); -int clu_lock(int mode, struct dlm_lksb *lksb, - int options, const char *resource); -int clu_unlock(struct dlm_lksb *lksb); -void clu_lock_finished(const char *default_lsname); +int cpg_lock_initialize(void); + +extern int (*clu_lock)(int, struct dlm_lksb *, int, const char *); +extern int (*clu_unlock)(struct dlm_lksb *lksb); +extern void (*clu_lock_finished)(const char *);
#endif diff --git a/rgmanager/include/sock.h b/rgmanager/include/sock.h new file mode 100644 index 0000000..6cd2331 --- /dev/null +++ b/rgmanager/include/sock.h @@ -0,0 +1,18 @@ +#ifndef _SOCK_H +#define _SOCK_H + +ssize_t read_retry(int sockfd, void *buf, size_t count, + struct timeval * timeout); +int select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds, + struct timeval *timeout); +ssize_t write_retry(int fd, void *buf, size_t count, + struct timeval *timeout); + +int sock_listen(const char *sockpath); +int sock_connect(const char *sockpath, int tout); +int sock_accept(int fd); +void hexdump(const void *buf, size_t len); + +void *do_alloc(size_t); + +#endif diff --git a/rgmanager/src/clulib/Makefile b/rgmanager/src/clulib/Makefile index 34f58fb..adb5578 100644 --- a/rgmanager/src/clulib/Makefile +++ b/rgmanager/src/clulib/Makefile @@ -12,7 +12,7 @@ include $(OBJDIR)/make/uninstall.mk OBJS1= logging.o daemon_init.o signals.o msgsimple.o \ gettid.o rg_strings.o message.o members.o fdops.o \ lock.o cman.o vft.o msg_cluster.o msg_socket.o \ - wrap_lock.o sets.o + wrap_lock.o sets.o sock.o libcpglock.o dlm_lock.o cpg_lock.o
OBJS2= msgtest.o
diff --git a/rgmanager/src/clulib/cpg_lock.c b/rgmanager/src/clulib/cpg_lock.c new file mode 100644 index 0000000..a747419 --- /dev/null +++ b/rgmanager/src/clulib/cpg_lock.c @@ -0,0 +1,104 @@ +/** @file + * Locking. + */ +#include <errno.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <assert.h> +#include <sys/ioctl.h> +#include <lock.h> +#include <sys/types.h> +#include <sys/select.h> +#include <pthread.h> + +#include <cpglock.h> + +/* Default lockspace stuff */ +static cpg_lock_handle_t _cpgh = NULL; +static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER; + +static void +dlm2cpg(struct dlm_lksb *dlm, struct cpg_lock *cpg) +{ + memset(cpg, 0, sizeof(*cpg)); + cpg->local_id = dlm->sb_lkid; + cpg->state = dlm->sb_status; +} + +static void +cpg2dlm(struct cpg_lock *cpg, struct dlm_lksb *dlm) +{ + memset(dlm, 0, sizeof(*dlm)); + dlm->sb_status = cpg->state; + dlm->sb_lkid = cpg->local_id; +} + + +static int +_cpg_lock(int mode, + struct dlm_lksb *lksb, + int options, + const char *resource) +{ + int ret = 0; + + struct cpg_lock l; + + if (options == LKF_NOQUEUE) + ret = cpg_lock(_cpgh, resource, 1, &l); + else + ret = cpg_lock(_cpgh, resource, 0, &l); + + if (ret == 0) { + cpg2dlm(&l, lksb); + } + + return ret; +} + + +static int +_cpg_unlock(struct dlm_lksb *lksb) +{ + struct cpg_lock l; + + dlm2cpg(lksb, &l); + return cpg_unlock(_cpgh, &l); +} + + +static void +_cpg_lock_finished(const char *name) +{ + pthread_mutex_lock(&_default_lock); + cpg_lock_fin(_cpgh); + pthread_mutex_unlock(&_default_lock); +} + + +int +cpg_lock_initialize(void) +{ + int ret, err; + + pthread_mutex_lock(&_default_lock); + if (_cpgh) { + pthread_mutex_unlock(&_default_lock); + return 0; + } + + cpg_lock_init(&_cpgh); + ret = (_cpgh == NULL); + err = errno; + pthread_mutex_unlock(&_default_lock); + + /* Set up function pointers */ + clu_lock = _cpg_lock; + clu_unlock = _cpg_unlock; + clu_lock_finished = _cpg_lock_finished; + + errno = err; + return ret; +} diff --git a/rgmanager/src/clulib/dlm_lock.c b/rgmanager/src/clulib/dlm_lock.c new file mode 100644 index 0000000..d3f4250 --- /dev/null +++ b/rgmanager/src/clulib/dlm_lock.c @@ -0,0 +1,301 @@ +/** @file + * Locking. + */ +#include <errno.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <assert.h> +#include <sys/ioctl.h> +#include <lock.h> +#include <sys/types.h> +#include <sys/select.h> +#include <pthread.h> + +/* Default lockspace stuff */ +static dlm_lshandle_t _default_ls = NULL; +static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER; + + +static void +ast_function(void * __attribute__ ((unused)) arg) +{ +} + + +static int +wait_for_dlm_event(dlm_lshandle_t ls) +{ + fd_set rfds; + int fd = dlm_ls_get_fd(ls); + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + + if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1) + return dlm_dispatch(fd); + + return -1; +} + + +static int +clu_ls_lock(dlm_lshandle_t ls, + int mode, + struct dlm_lksb *lksb, + int options, + const char *resource) +{ + int ret; + + if (!ls || !lksb || !resource || !strlen(resource)) { + printf("%p %p %p %d\n", ls, lksb, resource, + (int)strlen(resource)); + printf("INVAL...\n"); + errno = EINVAL; + return -1; + } + + ret = dlm_ls_lock(ls, mode, lksb, options, resource, + strlen(resource), 0, ast_function, lksb, + NULL, NULL); + + if (ret < 0) { + if (errno == ENOENT) + assert(0); + + return -1; + } + + if ((ret = (wait_for_dlm_event(ls) < 0))) { + fprintf(stderr, "wait_for_dlm_event: %d / %d\n", + ret, errno); + return -1; + } + + if (lksb->sb_status == 0) + return 0; + + errno = lksb->sb_status; + return -1; +} + + +static dlm_lshandle_t +clu_open_lockspace(const char *lsname) +{ + dlm_lshandle_t ls = NULL; + + //printf("opening lockspace %s\n", lsname); + + while (!ls) { + ls = dlm_open_lockspace(lsname); + if (ls) + break; + + /* + printf("Failed to open: %s; trying create.\n", + strerror(errno)); + */ + + ls = dlm_create_lockspace(lsname, 0644); + if (ls) + break; + + /* Work around race: Someone was closing lockspace as + we were trying to open it. Retry. */ + if (errno == ENOENT) + continue; + + fprintf(stderr, "failed acquiring lockspace: %s\n", + strerror(errno)); + + return NULL; + } + + return ls; +} + + +static int +clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb) +{ + int ret; + + if (!ls || !lksb) { + errno = EINVAL; + return -1; + } + + ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL); + + if (ret != 0) + return ret; + + /* lksb->sb_status should be EINPROG at this point */ + + if (wait_for_dlm_event(ls) < 0) { + errno = lksb->sb_status; + return -1; + } + + return 0; +} + + +static int +clu_close_lockspace(dlm_lshandle_t ls, const char *name) +{ + return dlm_release_lockspace(name, ls, 1); +} + + +static int +_clu_lock(int mode, + struct dlm_lksb *lksb, + int options, + const char *resource) +{ + int ret = 0, block = 0, conv = 0, err; + + block = !(options & LKF_NOQUEUE); + + errno = EINVAL; + if (!lksb) + return -1; + + memset(lksb, 0, sizeof(struct dlm_lksb)); + + /* + Try to use a conversion lock mechanism when possible + If the caller calls explicitly with a NULL lock, then + assume the caller knows what it is doing. + + Only take the NULL lock if: + (a) the user isn't specifying CONVERT; if they are, they + know what they are doing. + + ...and one of... + + (b) This is a blocking call, or + (c) The user requested a NULL lock explicitly. In this case, + short-out early; there's no reason to convert a NULL lock + to a NULL lock. + */ + if (!(options & LKF_CONVERT) && + (block || (mode == LKM_NLMODE))) { + /* Acquire NULL lock */ + pthread_mutex_lock(&_default_lock); + ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb, + (options & ~LKF_NOQUEUE), + resource); + err = errno; + pthread_mutex_unlock(&_default_lock); + if (ret == 0) { + if (mode == LKM_NLMODE) { + /* User only wanted a NULL lock... */ + return 0; + } + /* + Ok, NULL lock was taken, rest of blocking + call should be done using lock conversions. + */ + options |= LKF_CONVERT; + conv = 1; + } else { + switch(err) { + case EINVAL: + /* Oops, null locks don't work on this + plugin; use normal spam mode */ + break; + default: + errno = err; + return -1; + } + } + } + + while (1) { + pthread_mutex_lock(&_default_lock); + ret = clu_ls_lock(_default_ls, mode, lksb, + (options | LKF_NOQUEUE), + resource); + err = errno; + pthread_mutex_unlock(&_default_lock); + + if ((ret != 0) && (err == EAGAIN) && block) { + usleep(random()&32767); + continue; + } + + break; + } + + if (ret != 0 && conv) { + /* If we get some other error, release the NL lock we + took so we don't leak locks*/ + pthread_mutex_lock(&_default_lock); + clu_ls_unlock(_default_ls, lksb); + pthread_mutex_unlock(&_default_lock); + errno = err; + } + + return ret; +} + + +static int +_clu_unlock(struct dlm_lksb *lksb) +{ + int ret, err; + pthread_mutex_lock(&_default_lock); + ret = clu_ls_unlock(_default_ls, lksb); + err = errno; + pthread_mutex_unlock(&_default_lock); + + usleep(random()&32767); + errno = err; + return ret; +} + + +static void +_clu_lock_finished(const char *name) +{ + pthread_mutex_lock(&_default_lock); + if (_default_ls) + clu_close_lockspace(_default_ls, name); + pthread_mutex_unlock(&_default_lock); +} + + +int +clu_lock_init(const char *dflt_lsname) +{ + int ret, err; + + pthread_mutex_lock(&_default_lock); + if (_default_ls) { + pthread_mutex_unlock(&_default_lock); + return 0; + } + + if (!dflt_lsname || !strlen(dflt_lsname)) { + pthread_mutex_unlock(&_default_lock); + errno = EINVAL; + return -1; + } + + _default_ls = clu_open_lockspace(dflt_lsname); + ret = (_default_ls == NULL); + err = errno; + pthread_mutex_unlock(&_default_lock); + + clu_lock = _clu_lock; + clu_unlock = _clu_unlock; + clu_lock_finished = _clu_lock_finished; + + errno = err; + return ret; +} diff --git a/rgmanager/src/clulib/libcpglock.c b/rgmanager/src/clulib/libcpglock.c new file mode 100644 index 0000000..8e5c7ff --- /dev/null +++ b/rgmanager/src/clulib/libcpglock.c @@ -0,0 +1,286 @@ +#include <stdlib.h> +#include <sys/socket.h> +#include <errno.h> +#include <stdio.h> +#include <string.h> +#include <pthread.h> +#include <unistd.h> + +#include "cpglock.h" +#include "cpglock-internal.h" +#include "list.h" +#include "sock.h" + + +struct pending_node { + list_head(); + struct cpg_lock_msg m; +}; + + +struct cpg_lock_handle { + pthread_mutex_t mutex; + struct pending_node *pending; + int fd; + int pid; + int seq; +}; + + +static int +check_pending(struct pending_node **l, + struct cpg_lock_msg *exp, + struct cpg_lock_msg *ret) +{ + struct pending_node *p; + int x; + + list_for(l, p, x) { + if (!strcmp(exp->resource, p->m.resource) && + exp->owner_tid == p->m.owner_tid) { + list_remove(l, p); + memcpy(ret, &p->m, sizeof(*ret)); + free(p); + return 0; + } + } + + return 1; +} + + +static void +add_pending(struct pending_node **l, + struct cpg_lock_msg *m) +{ + struct pending_node *p = do_alloc(sizeof(*p)); + + memcpy(&p->m, m, sizeof(p->m)); + list_insert(l, p); +} + + +/* Not thread safe */ +int +cpg_lock_init(void **handle) +{ + struct cpg_lock_handle *h; + int esv; + + h = do_alloc(sizeof (*h)); + if (!h) + return -1; + + h->fd = sock_connect(CPG_LOCKD_SOCK, 3); + if (h->fd < 0) { + esv = errno; + free(h); + errno = esv; + return -1; + } + + h->pid = getpid(); + pthread_mutex_init(&h->mutex, NULL); + + *handle = (void *)h; + return 0; +} + + +int +cpg_lock(void *handle, const char *resource, lock_flag_t flags, struct cpg_lock *lock) +{ + struct cpg_lock_handle *h = handle; + struct cpg_lock_msg l, r; + struct timeval tv; + int ret = -1; + + if (!h) { + errno = EINVAL; + goto out; + } + + pthread_mutex_lock(&h->mutex); + + if (h->pid != (int)getpid()) { + errno = EBADF; + goto out; + } + + if (strlen(resource) > sizeof(l.resource)-1) { + errno = ENAMETOOLONG; + goto out; + } + + memset(&l, 0, sizeof(l)); + memset(&r, 0, sizeof(r)); + strncpy(l.resource, resource, sizeof(l.resource)); + strncpy(lock->resource, resource, sizeof(lock->resource)); + l.owner_pid = h->pid; + ++h->seq; + l.owner_tid = h->seq; + l.request = MSG_LOCK; + l.flags = (uint32_t)flags; + + if (write_retry(h->fd, &l, sizeof(l), 0) < 0) + goto out; + + /* Thread concurrency: in case multiple threads wake up + from select, peek at the message to see if it's ours */ + do { + if (check_pending(&h->pending, &l, &r) == 0) + break; + + tv.tv_sec = 0; + tv.tv_usec = random() & 16383; + + if (read_retry(h->fd, &r, sizeof(r), &tv) < 0) { + if (errno == ETIMEDOUT) { + pthread_mutex_unlock(&h->mutex); + usleep(random() & 16383); + pthread_mutex_lock(&h->mutex); + continue; + } + goto out; + } + + if (strcmp(r.resource, l.resource)) { + printf("NOTE: msg for wrong lock want: %s got: %s\n", l.resource, r.resource); + add_pending(&h->pending, &r); + pthread_mutex_unlock(&h->mutex); + usleep(random() & 16383); + pthread_mutex_lock(&h->mutex); + continue; + } + if (r.owner_tid != l.owner_tid) { + printf("NOTE: msg for wrong seq want: %d got: %d\n", l.owner_tid, r.owner_tid ); + add_pending(&h->pending, &r); + pthread_mutex_unlock(&h->mutex); + usleep(random() & 16383); + pthread_mutex_lock(&h->mutex); + continue; + } + break; + } while (1); + /* locked */ + + if (r.owner_nodeid == 0) + goto out; + + if (r.request == MSG_NAK) { + errno = EAGAIN; + return -1; + } + + if (r.request != MSG_GRANT) { + //ret = -1; + goto out; + } + + lock->state = LOCK_HELD; + lock->owner_nodeid = r.owner_nodeid; + lock->owner_pid = h->pid; /* XXX */ + lock->local_id = r.lockid; + + ret = 0; + +out: + pthread_mutex_unlock(&h->mutex); + return ret; +} + + +int +cpg_unlock(void *handle, struct cpg_lock *lock) +{ + struct cpg_lock_handle *h = handle; + struct cpg_lock_msg l; + int ret = -1; + + if (!h) { + errno = EINVAL; + goto out; + } + + /* Only block on lock requests, not unlock */ + if (h->pid != (int)getpid()) { + errno = EBADF; + goto out; + } + + if (lock->state != LOCK_HELD) { + errno = EINVAL; + goto out; + } + + if (!lock->local_id) { + errno = EINVAL; + goto out; + } + + strncpy(l.resource, lock->resource, sizeof(l.resource)); + l.request = MSG_UNLOCK; + l.owner_nodeid = lock->owner_nodeid; + l.owner_pid = lock->owner_pid; + l.lockid = lock->local_id; + //l.owner_tid = _gettid(); + l.owner_tid = 0; + lock->state = LOCK_FREE; + + ret = write_retry(h->fd, &l, sizeof(l), 0); +out: + return ret; +} + + +int +cpg_lock_dump(FILE *fp) +{ + struct cpg_lock_msg l; + int fd; + char c; + + fd = sock_connect(CPG_LOCKD_SOCK, 3); + if (fd < 0) + return -1; + + memset(&l, 0, sizeof(l)); + l.request = MSG_DUMP; + + if (write_retry(fd, &l, sizeof(l), 0) < 0) + return -1; + + while (read_retry(fd, &c, 1, 0) == 1) + fprintf(fp, "%c", c); + + close(fd); + return 0; +} + + + +/* Not thread safe */ +int +cpg_lock_fin(void *handle) +{ + struct cpg_lock_handle *h = handle; + + if (!h) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&h->mutex); + + if (h->pid != (int)getpid()) { + errno = EBADF; + return -1; + } + + close(h->fd); + + pthread_mutex_destroy(&h->mutex); + free(h); + return 0; +} + diff --git a/rgmanager/src/clulib/lock.c b/rgmanager/src/clulib/lock.c index 078d525..74cd8b1 100644 --- a/rgmanager/src/clulib/lock.c +++ b/rgmanager/src/clulib/lock.c @@ -1,294 +1,6 @@ -/** @file - * Locking. - */ -#include <errno.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> -#include <stdio.h> -#include <assert.h> -#include <sys/ioctl.h> #include <lock.h> -#include <sys/types.h> -#include <sys/select.h> -#include <pthread.h>
-/* Default lockspace stuff */ -static dlm_lshandle_t _default_ls = NULL; -static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER; +int (*clu_lock)(int, struct dlm_lksb *, int, const char *) = NULL; +int (*clu_unlock)(struct dlm_lksb *lksb) = NULL; +void (*clu_lock_finished)(const char *) = NULL;
- -static void -ast_function(void * __attribute__ ((unused)) arg) -{ -} - - -static int -wait_for_dlm_event(dlm_lshandle_t ls) -{ - fd_set rfds; - int fd = dlm_ls_get_fd(ls); - - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - - if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1) - return dlm_dispatch(fd); - - return -1; -} - - -int -clu_ls_lock(dlm_lshandle_t ls, - int mode, - struct dlm_lksb *lksb, - int options, - const char *resource) -{ - int ret; - - if (!ls || !lksb || !resource || !strlen(resource)) { - errno = EINVAL; - return -1; - } - - ret = dlm_ls_lock(ls, mode, lksb, options, resource, - strlen(resource), 0, ast_function, lksb, - NULL, NULL); - - if (ret < 0) { - if (errno == ENOENT) - assert(0); - - return -1; - } - - if ((ret = (wait_for_dlm_event(ls) < 0))) { - fprintf(stderr, "wait_for_dlm_event: %d / %d\n", - ret, errno); - return -1; - } - - if (lksb->sb_status == 0) - return 0; - - errno = lksb->sb_status; - return -1; -} - - - -dlm_lshandle_t -clu_open_lockspace(const char *lsname) -{ - dlm_lshandle_t ls = NULL; - - //printf("opening lockspace %s\n", lsname); - - while (!ls) { - ls = dlm_open_lockspace(lsname); - if (ls) - break; - - /* - printf("Failed to open: %s; trying create.\n", - strerror(errno)); - */ - - ls = dlm_create_lockspace(lsname, 0644); - if (ls) - break; - - /* Work around race: Someone was closing lockspace as - we were trying to open it. Retry. */ - if (errno == ENOENT) - continue; - - fprintf(stderr, "failed acquiring lockspace: %s\n", - strerror(errno)); - - return NULL; - } - - return ls; -} - - -int -clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb) -{ - int ret; - - if (!ls || !lksb) { - errno = EINVAL; - return -1; - } - - ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL); - - if (ret != 0) - return ret; - - /* lksb->sb_status should be EINPROG at this point */ - - if (wait_for_dlm_event(ls) < 0) { - errno = lksb->sb_status; - return -1; - } - - return 0; -} - - -int -clu_close_lockspace(dlm_lshandle_t ls, const char *name) -{ - return dlm_release_lockspace(name, ls, 1); -} - - -int -clu_lock(int mode, - struct dlm_lksb *lksb, - int options, - const char *resource) -{ - int ret = 0, block = 0, conv = 0, err; - - block = !(options & LKF_NOQUEUE); - - errno = EINVAL; - if (!lksb) - return -1; - - memset(lksb, 0, sizeof(struct dlm_lksb)); - - /* - Try to use a conversion lock mechanism when possible - If the caller calls explicitly with a NULL lock, then - assume the caller knows what it is doing. - - Only take the NULL lock if: - (a) the user isn't specifying CONVERT; if they are, they - know what they are doing. - - ...and one of... - - (b) This is a blocking call, or - (c) The user requested a NULL lock explicitly. In this case, - short-out early; there's no reason to convert a NULL lock - to a NULL lock. - */ - if (!(options & LKF_CONVERT) && - (block || (mode == LKM_NLMODE))) { - /* Acquire NULL lock */ - pthread_mutex_lock(&_default_lock); - ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb, - (options & ~LKF_NOQUEUE), - resource); - err = errno; - pthread_mutex_unlock(&_default_lock); - if (ret == 0) { - if (mode == LKM_NLMODE) { - /* User only wanted a NULL lock... */ - return 0; - } - /* - Ok, NULL lock was taken, rest of blocking - call should be done using lock conversions. - */ - options |= LKF_CONVERT; - conv = 1; - } else { - switch(err) { - case EINVAL: - /* Oops, null locks don't work on this - plugin; use normal spam mode */ - break; - default: - errno = err; - return -1; - } - } - } - - while (1) { - pthread_mutex_lock(&_default_lock); - ret = clu_ls_lock(_default_ls, mode, lksb, - (options | LKF_NOQUEUE), - resource); - err = errno; - pthread_mutex_unlock(&_default_lock); - - if ((ret != 0) && (err == EAGAIN) && block) { - usleep(random()&32767); - continue; - } - - break; - } - - if (ret != 0 && conv) { - /* If we get some other error, release the NL lock we - took so we don't leak locks*/ - pthread_mutex_lock(&_default_lock); - clu_ls_unlock(_default_ls, lksb); - pthread_mutex_unlock(&_default_lock); - errno = err; - } - - return ret; -} - - -int -clu_unlock(struct dlm_lksb *lksb) -{ - int ret, err; - pthread_mutex_lock(&_default_lock); - ret = clu_ls_unlock(_default_ls, lksb); - err = errno; - pthread_mutex_unlock(&_default_lock); - - usleep(random()&32767); - errno = err; - return ret; -} - - -int -clu_lock_init(const char *dflt_lsname) -{ - int ret, err; - - pthread_mutex_lock(&_default_lock); - if (_default_ls) { - pthread_mutex_unlock(&_default_lock); - return 0; - } - - if (!dflt_lsname || !strlen(dflt_lsname)) { - pthread_mutex_unlock(&_default_lock); - errno = EINVAL; - return -1; - } - - _default_ls = clu_open_lockspace(dflt_lsname); - ret = (_default_ls == NULL); - err = errno; - pthread_mutex_unlock(&_default_lock); - - errno = err; - return ret; -} - -void -clu_lock_finished(const char *name) -{ - pthread_mutex_lock(&_default_lock); - if (_default_ls) - clu_close_lockspace(_default_ls, name); - pthread_mutex_unlock(&_default_lock); -} diff --git a/rgmanager/src/clulib/sock.c b/rgmanager/src/clulib/sock.c new file mode 100644 index 0000000..3965a38 --- /dev/null +++ b/rgmanager/src/clulib/sock.c @@ -0,0 +1,354 @@ +/* + Copyright Red Hat, Inc. 2002-2003, 2012 + Copyright Mission Critical Linux, 2000 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/errno.h> +#include <time.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <fcntl.h> + +#include "sock.h" + +void * +do_alloc(size_t n) +{ + void *p; + + do { + p = malloc(n); + if (!p) { + usleep(10000); + } + } while(!p); + + memset(p, 0, n); + return p; +} + + +/** + * This is a wrapper around select which will retry in the case we receive + * EINTR. This is necessary for read_retry, since it wouldn't make sense + * to have read_retry terminate if and only if two EINTRs were received + * in a row - one during the read() call, one during the select call... + * + * See select(2) for description of parameters. + */ +int +select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds, + struct timeval *timeout) +{ + int rv; + + while (1) { + rv = select(fdmax, rfds, wfds, xfds, timeout); + if ((rv == -1) && (errno == EINTR)) + /* return on EBADF/EINVAL/ENOMEM; continue on EINTR */ + continue; + return rv; + } +} + +/** + * Retries a write in the event of a non-blocked interrupt signal. + * + * @param fd File descriptor to which we are writing. + * @param buf Data buffer to send. + * @param count Number of bytes in buf to send. + * @param timeout (struct timeval) telling us how long we should retry. + * @return The number of bytes written to the file descriptor, + * or -1 on error (with errno set appropriately). + */ +ssize_t +write_retry(int fd, void *buf, size_t count, struct timeval * timeout) +{ + int n, rv = 0; + ssize_t total = 0, remain = count; + fd_set wfds, xfds; + + while (total < count) { + + /* Create the write FD set of 1... */ + FD_ZERO(&wfds); + FD_SET(fd, &wfds); + FD_ZERO(&xfds); + FD_SET(fd, &xfds); + + /* wait for the fd to be available for writing */ + rv = select_retry(fd + 1, NULL, &wfds, &xfds, timeout); + if (rv == -1) + return -1; + else if (rv == 0) { + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(fd, &xfds)) { + errno = EPIPE; + return -1; + } + + /* + * Attempt to write to fd + */ + n = write(fd, (char *)buf + (off_t) total, remain); + + /* + * When we know our fd was select()ed and we receive 0 bytes + * when we write, the fd was closed. + */ + if ((n == 0) && (rv == 1)) { + errno = EPIPE; + return -1; + } + + if (n == -1) { + if ((errno == EAGAIN) || (errno == EINTR)) { + /* + * Not ready? + */ + continue; + } + + /* Other errors: EIO, EINVAL, etc */ + return -1; + } + + total += n; + remain -= n; + } + + return total; +} + +/** + * Retry reads until we (a) time out or (b) get our data. Of course, if + * timeout is NULL, it'll wait forever. + * + * @param sockfd File descriptor we want to read from. + * @param buf Preallocated buffer into which we will read data. + * @param count Number of bytes to read. + * @param timeout (struct timeval) describing how long we should retry. + * @return The number of bytes read on success, or -1 on failure. + Note that we will always return (count) or (-1). + */ +ssize_t +read_retry(int sockfd, void *buf, size_t count, struct timeval * timeout) +{ + int n, rv = 0; + ssize_t total = 0, remain = count; + fd_set rfds, xfds; + + memset(buf, 0, count); + + while (total < count) { + FD_ZERO(&rfds); + FD_SET(sockfd, &rfds); + FD_ZERO(&xfds); + FD_SET(sockfd, &xfds); + + /* + * Select on the socket, in case it closes while we're not + * looking... + */ + rv = select_retry(sockfd + 1, &rfds, NULL, &xfds, timeout); + if (rv == -1) + return -1; + else if (rv == 0) { + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(sockfd, &xfds)) { + errno = EPIPE; + return -1; + } + + /* + * Attempt to read off the socket + */ + n = read(sockfd, (char *)buf + (off_t) total, remain); + + /* + * When we know our socket was select()ed and we receive 0 bytes + * when we read, the socket was closed. + */ + if ((n == 0) && (rv == 1)) { + errno = EPIPE; + return -1; + } + + if (n == -1) { + if ((errno == EAGAIN) || (errno == EINTR)) { + /* + * Not ready? Wait for data to become available + */ + continue; + } + + /* Other errors: EPIPE, EINVAL, etc */ + return -1; + } + + total += n; + remain -= n; + + //printf("read-retry %d/%d remain %d \n", total, count, remain); + } + + return total; +} + + +int +sock_listen(const char *sockpath) +{ + int sock = -1; + struct sockaddr_un su; + mode_t om; + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) + goto fail; + + su.sun_family = PF_LOCAL; + snprintf(su.sun_path, (sizeof(su.sun_path)), "%s", sockpath); + + unlink(su.sun_path); + om = umask(077); + + if (bind(sock, (struct sockaddr *)&su, sizeof(su)) < 0) { + umask(om); + goto fail; + } + umask(om); + + if (listen(sock, SOMAXCONN) < 0) + goto fail; + return sock; +fail: + if (sock >= 0) + close(sock); + return -1; +} + + +int +sock_connect(const char *sockpath, int tout) +{ + struct timeval timeout = {tout, 0}; + int sock, flags, error, ret; + socklen_t len; + struct sockaddr_un sun; + fd_set rfds, wfds; + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) + return -1; + + sun.sun_family = PF_LOCAL; + snprintf(sun.sun_path, (sizeof(sun.sun_path)), "%s", sockpath); + + flags = fcntl(sock, F_GETFL, 0); + fcntl(sock, F_SETFL, flags | O_NONBLOCK); + + ret = connect(sock, (struct sockaddr *) &sun, sizeof(sun)); + + if (ret < 0 && (errno != EINPROGRESS)) { + close(sock); + return -1; + } + + if (ret == 0) + goto done; + + FD_ZERO(&rfds); + FD_SET(sock, &rfds); + wfds = rfds; + + ret = select(sock + 1, &rfds, &wfds, NULL, &timeout); + if (ret == 0) { + close(sock); + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(sock, &rfds) || FD_ISSET(sock, &wfds)) { + len = sizeof (error); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) { + close(sock); + return -1; + } + } else { + close(sock); + return -1; + } + +done: + + return sock; +} + + +int +sock_accept(int sockfd) +{ + int acceptfd; + + if (sockfd < 0) { + errno = EBADF; + return -1; + } + + while ((acceptfd = + accept(sockfd, (struct sockaddr *) NULL, NULL)) < 0) { + if (errno == EINTR) { + continue; + } + + return -1; + } + + return acceptfd; +} + + +void +hexdump(const void *buf, size_t len) +{ + size_t x; + + printf("%d bytes @ %p \n", (int)len, buf); + + for (x = 0; x < len; x++) { + printf(" %02x", (((char *)buf)[x])&0xff); + if (((x+1) % 16) == 0) + printf("\n"); + } + + printf("\n"); +} + + + diff --git a/rgmanager/src/daemons/Makefile b/rgmanager/src/daemons/Makefile index 7218c53..dbd75ea 100644 --- a/rgmanager/src/daemons/Makefile +++ b/rgmanager/src/daemons/Makefile @@ -1,11 +1,12 @@ TARGET1= rgmanager TARGET2= rg_test TARGET3= clurgmgrd +TARGET4= cpglockd
-SBINDIRT=$(TARGET1) $(TARGET2) +SBINDIRT=$(TARGET1) $(TARGET2) $(TARGET4) SBINSYMT=$(TARGET3)
-all: depends ${TARGET1} ${TARGET2} ${TARGET3} +all: depends ${TARGET1} ${TARGET2} ${TARGET3} $(TARGET4)
include ../../../make/defines.mk include $(OBJDIR)/make/cobj.mk @@ -41,6 +42,8 @@ OBJS2= test-noccs.o \ rg_locks-noccs.o \ event_config-noccs.o
+OBJS4= cpglockd.o + CFLAGS += -DSHAREDIR="${sharedir}" -D_GNU_SOURCE CFLAGS += -fPIC CFLAGS += -I${ccsincdir} -I${cmanincdir} -I${dlmincdir} -I${logtincdir} @@ -57,6 +60,7 @@ CCS_LDFLAGS += -L${ccslibdir} -lccs CMAN_LDFLAGS += -L${cmanlibdir} -lcman LOGSYS_LDFLAGS += -L${logtlibdir} -llogthread DLM_LDFLAGS += -L${dlmlibdir} -ldlm +CPG_LDFLAGS += -lcpg XML2_LDFLAGS += `xml2-config --libs` SLANG_LDFLAGS += -L${slanglibdir} -lslang EXTRA_LDFLAGS += -lpthread @@ -96,6 +100,10 @@ ${TARGET2}: ${OBJS2} ${LDDEPS} ${TARGET3}: ${TARGET1} ln -sf ${TARGET1} ${TARGET3}
+${TARGET4}: ${OBJS4} ${LDDEPS} + $(CC) -o $@ $^ $(CPG_LDFLAGS) $(EXTRA_LDFLAGS) $(LDFLAGS) + + check: rg_test cd tests && ./runtests.sh
diff --git a/rgmanager/src/daemons/cpglockd.c b/rgmanager/src/daemons/cpglockd.c new file mode 100644 index 0000000..55d832a --- /dev/null +++ b/rgmanager/src/daemons/cpglockd.c @@ -0,0 +1,1147 @@ +#include <stdio.h> +#include <signal.h> +#include <sys/types.h> +#include <sys/time.h> +#include <sys/select.h> +#include <errno.h> +#include <unistd.h> +#include <malloc.h> +#include <string.h> +#include <time.h> +#include <sys/uio.h> +#include <corosync/cpg.h> + +#include "sock.h" +#include "cpglock.h" +#include "cpglock-internal.h" +#include "list.h" + + +struct request_node { + list_head(); + struct cpg_lock l; +}; + +struct lock_node { + list_head(); + struct cpg_lock l; +}; + +struct client_node { + list_head(); + int fd; + int pid; +}; + +struct member_node { + list_head(); + int nodeid; +}; + +struct msg_node { + list_head(); + struct cpg_lock_msg m; +}; + +/* Local vars */ +static cpg_handle_t cpg; +static uint32_t my_node_id = 0; +static struct request_node *requests = NULL; +static struct lock_node *locks = NULL; +static struct client_node *clients = NULL; +static struct member_node *group_members = NULL; +static struct msg_node *messages = NULL; +static int total_members = 0; +static int local_lockid = 0; +static int message_count = 0; +static int joined = 0; + + +static const char * +ls2str(int x) +{ + switch(x){ + case LOCK_FREE: return "FREE"; + case LOCK_HELD: return "HELD"; + case LOCK_PENDING: return "PENDING"; + } + return "unknown"; +} + + +static const char * +rq2str(int x) +{ + switch(x){ + case MSG_LOCK: return "LOCK"; + case MSG_UNLOCK: return "UNLOCK"; + case MSG_GRANT: return "GRANT"; + case MSG_NAK: return "NAK"; + case MSG_PURGE: return "PURGE"; + case MSG_CONFCHG: return "CONFCHG"; + case MSG_JOIN: return "JOIN"; + case MSG_HALT: return "HALT"; + } + return "unknown"; +} + + +static void +dump_state(FILE *fp) +{ + struct request_node *r = NULL; + struct lock_node *l = NULL; + struct client_node *c = NULL; + struct member_node *m = NULL; + struct msg_node *s = NULL; + int x; + + fprintf(fp, "cpglockd state\n"); + fprintf(fp, "======== =====\n"); + + fprintf(fp, "Node ID: %d\n", my_node_id); + + if (group_members) { + fprintf(fp, "Participants:"); + list_for(&group_members, m, x) { + fprintf(fp, " %d", m->nodeid); + } + fprintf(fp, "\n"); + } + if (clients) { + fprintf(fp, "Clients:"); + list_for(&clients, c, x) { + fprintf(fp, " %d.%d", c->pid, c->fd ); + } + fprintf(fp, "\n"); + } + fprintf(fp, "\n"); + + if (locks) { + fprintf(fp, "Locks\n"); + fprintf(fp, "=====\n"); + list_for(&locks, l, x) { + fprintf(fp, " %s: %s", l->l.resource, ls2str(l->l.state)); + if (l->l.owner_nodeid) { + fprintf(fp, ", owner %d:%d:%d", l->l.owner_nodeid,l->l.owner_pid, l->l.owner_tid); + if (l->l.owner_nodeid == my_node_id && + l->l.state == LOCK_HELD) + fprintf(fp, ", Local ID %d", l->l.local_id); + } + fprintf(fp, "\n"); + } + fprintf(fp, "\n"); + } + if (requests) { + fprintf(fp, "Requests\n"); + fprintf(fp, "========\n"); + list_for(&requests, r, x) { + fprintf(fp, " %s: %s", r->l.resource, rq2str(r->l.state)); + if (r->l.owner_nodeid) { + fprintf(fp, ", from %d:%d:%d", r->l.owner_nodeid,r->l.owner_pid, r->l.owner_tid); + } + fprintf(fp, "\n"); + } + fprintf(fp, "\n"); + } + if (messages) { + fprintf(fp, "Message History\n"); + fprintf(fp, "======= =======\n"); + list_for(&messages, s, x) { + switch(s->m.request) { + case MSG_CONFCHG: + fprintf(fp, " CONFIG CHANGE\n"); + break; + case MSG_PURGE: + fprintf(fp, " PURGE for %d:%d\n", s->m.owner_nodeid, s->m.owner_pid); + break; + case MSG_JOIN: + fprintf(fp, " JOIN %d\n", s->m.owner_nodeid); + break; + default: + fprintf(fp, " %s: %s %d:%d:%d\n", rq2str(s->m.request), s->m.resource, s->m.owner_nodeid, s->m.owner_pid, s->m.owner_tid); + break; + } + } + fprintf(fp, "\n"); + } +} + + +static void +old_msg(struct cpg_lock_msg *m) +{ + struct msg_node *n; + + n = do_alloc(sizeof(*n)); + memcpy(&n->m, m, sizeof(n->m)); + list_append(&messages, n); + if (message_count < 10) { + ++message_count; + } else { + n = messages; + list_remove(&messages, n); + free(n); + } +} + + +static void +insert_client(int fd) +{ + struct client_node *n = NULL; + + n = do_alloc(sizeof(*n)); + n->fd = fd; + list_append(&clients, n); +} + + +/* forward request from client */ +static int +send_lock_msg(struct cpg_lock_msg *m) +{ + struct iovec iov; + + iov.iov_base = m; + iov.iov_len = sizeof (*m); + + return cpg_mcast_joined(cpg, CPG_TYPE_AGREED, &iov, 1); +} + + +/* forward request from client */ +static int +send_lock(struct cpg_lock_msg *m) +{ + m->owner_nodeid = my_node_id; + + return send_lock_msg(m); +} + + +static int +send_grant(struct request_node *n) +{ + struct cpg_lock_msg m; + + printf("-> sending grant %s to %d:%d:%d\n", + n->l.resource, n->l.owner_nodeid, n->l.owner_pid, n->l.owner_tid); + + memset(&m, 0, sizeof(m)); + strncpy(m.resource, n->l.resource, sizeof(m.resource)); + m.request = MSG_GRANT; + m.owner_nodeid = n->l.owner_nodeid; + m.owner_pid = n->l.owner_pid; + m.owner_tid = n->l.owner_tid; + + return send_lock_msg(&m); +} + + +static int +send_nak(struct cpg_lock_msg *m) +{ + m->request = MSG_NAK; + + return send_lock_msg(m); +} + + +static int +send_join(void) +{ + struct cpg_lock_msg m; + + m.request = MSG_JOIN; + m.owner_nodeid = my_node_id; + return send_lock_msg(&m); +} + + + +static int +send_unlock(struct cpg_lock_msg *m) +{ + m->request = MSG_UNLOCK; + return send_lock_msg(m); +} + + +/* + * Grant the lock in this request node to the next + * waiting client. + */ +static int +grant_next(struct cpg_lock_msg *m) +{ + struct request_node *r; + int x; + + list_for(&requests, r, x) { + if (strcmp(m->resource, r->l.resource)) + continue; + + /* Send grant */ + if (r->l.state == LOCK_PENDING) { + printf("LOCK %s: grant to %d:%d:%d\n", m->resource, + r->l.owner_nodeid, r->l.owner_pid, r->l.owner_tid); + /* don't send dup grants */ + r->l.state = LOCK_HELD; + send_grant(r); + } + return 1; + } + + return 0; +} + + +static void +purge_requests(uint32_t nodeid, uint32_t pid) +{ + struct request_node *r; + int found = 0, count = 0, x = 0; + + do { + found = 0; + list_for(&requests, r, x) { + if (r->l.owner_nodeid != nodeid || + (pid && + r->l.owner_pid != pid)) + continue; + + list_remove(&requests, r); + free(r); + found = 1; + ++count; + break; + } + } while (found); + + if (count) { + if (pid) { + printf("RECOVERY: purged %d requests from %d:%d\n", count, nodeid, pid); + } else { + printf("RECOVERY: purged %d requests from node %d\n", count, nodeid); + } + } +} + + + +static void +del_client(int fd) +{ + struct cpg_lock_msg m; + struct client_node *n; + struct lock_node *l; + int x, pid = 0, recovered = 0; + + list_for(&clients, n, x) { + if (n->fd == fd) { + list_remove(&clients, n); + close(n->fd); + pid = n->pid; + free(n); + break; + } + } + + if (!pid) + return; + + printf("RECOVERY: Looking for locks held by PID %d\n", pid); + + /* This may not be needed */ + purge_requests(my_node_id, pid); + + m.request = MSG_PURGE; + m.owner_nodeid = my_node_id; + m.owner_pid = pid; + + send_lock_msg(&m); + + list_for(&locks, l, x) { + if (l->l.owner_nodeid != my_node_id || + l->l.owner_pid != pid || + l->l.state != LOCK_HELD) + continue; + + printf("RECOVERY: Releasing %s \n", l->l.resource); + + l->l.state = LOCK_FREE; + strncpy(m.resource, l->l.resource, sizeof(m.resource)); + if (grant_next(&m) == 0) + send_unlock(&m); + } + + if (recovered) { + printf("RECOVERY: %d locks from local PID %d\n", recovered, pid); + } + printf("RECOVERY: Complete\n"); +} + + +static void +del_node(uint32_t nodeid) +{ + struct cpg_lock_msg m; + struct lock_node *l; + int x, recovered = 0, granted = 0; + + if (group_members->nodeid != my_node_id) + return; + + printf("RECOVERY: I am oldest node in the group, recovering locks\n"); + + /* pass 1: purge outstanding requests from this node. */ + + /* This may not be needed */ + purge_requests(nodeid, 0); + + m.request = MSG_PURGE; + m.owner_nodeid = nodeid; + m.owner_pid = 0; + + send_lock_msg(&m); + + list_for(&locks, l, x) { + if (l->l.owner_nodeid == nodeid && + l->l.state == LOCK_HELD) { + printf("RECOVERY: Releasing %s held by dead node %d\n", l->l.resource, + nodeid); + + l->l.state = LOCK_FREE; + strncpy(m.resource, l->l.resource, sizeof(m.resource)); + if (grant_next(&m) == 0) + send_unlock(&m); + ++recovered; + } else if (l->l.state == LOCK_FREE) { + if (grant_next(&m) == 0) + send_unlock(&m); + ++granted; + } + } + + if (recovered) { + printf("RECOVERY: %d locks from node %d\n", recovered, nodeid); + } + if (granted) { + printf("RECOVERY: %d pending locks granted\n", granted); + } + + printf("RECOVERY: Complete\n"); +} + + +static int +client_fdset(fd_set *set) +{ + int max = -1, x = 0; + struct client_node *n; + + FD_ZERO(set); + + list_for(&clients, n, x) { + FD_SET(n->fd, set); + if (n->fd > max) + max = n->fd; + } + + if (!x) + return 0; + + return max; +} + + +static struct client_node * +find_client(int pid) +{ + int x; + struct client_node *n; + + list_for(&clients, n, x) { + if (n->pid == pid) + return n; + } + + return NULL; +} + + +#if 0 +static void +send_fault(const char *resource) +{ + struct cpg_lock_msg m; + + strncpy(m.resource, resource, sizeof(m.resource)); + m.request = MSG_HALT; + m.owner_pid = 0; + m.owner_nodeid = my_node_id; + + send_lock_msg(&m); +} +#endif + + +static int +grant_client(struct lock_node *l) +{ + struct client_node *c; + struct cpg_lock_msg m; + + //memset(&m, 0, sizeof(m)); + strncpy(m.resource, l->l.resource, sizeof(m.resource)); + m.request = MSG_GRANT; + m.owner_pid = l->l.owner_pid; + m.owner_tid = l->l.owner_tid; + l->l.local_id = ++local_lockid; + m.lockid = l->l.local_id; + m.owner_nodeid = my_node_id; + + c = find_client(l->l.owner_pid); + if (!c) { + printf("can't find client for pid %d\n", l->l.owner_pid); + return 1; + } + + if (c->fd < 0) { + printf(" Client has bad fd\n"); + return -1; + } + + if (write_retry(c->fd, &m, sizeof(m), NULL) < 0) { + /* no client anymore; drop and send to next guy XXX */ + /* This should be handled by our main loop */ + //printf("Failed to notify client!\n"); + } + + return 0; +} + + +static int +nak_client(struct request_node *l) +{ + struct client_node *c; + struct cpg_lock_msg m; + + strncpy(m.resource, l->l.resource, sizeof(m.resource)); + m.request = MSG_NAK; + m.owner_pid = l->l.owner_pid; + m.owner_tid = l->l.owner_tid; + m.owner_nodeid = my_node_id; + + c = find_client(l->l.owner_pid); + if (!c) { + printf("can't find client for pid %d\n", l->l.owner_pid); + return 1; + } + + if (c->fd < 0) { + printf(" Client has bad fd\n"); + return -1; + } + + if (write_retry(c->fd, &m, sizeof(m), NULL) < 0) { + /* no client anymore; drop and send to next guy XXX */ + /* This should be handled by our main loop */ + //printf("Failed to notify client!\n"); + } + + return 0; +} + +static int +queue_request(struct cpg_lock_msg *m) +{ + struct request_node *r; + + r = do_alloc(sizeof(*r)); + strncpy(r->l.resource, m->resource, sizeof(r->l.resource)); + r->l.owner_nodeid = m->owner_nodeid; + r->l.owner_pid = m->owner_pid; + r->l.owner_tid = m->owner_tid; + r->l.state = LOCK_PENDING; + + list_insert(&requests, r); + return 0; +} + + +static int +process_lock(struct cpg_lock_msg *m) +{ + struct lock_node *l; + int x; + + if (!joined) + return 0; + + printf("LOCK %s: queue for %d:%d:%d\n", m->resource, + m->owner_nodeid, m->owner_pid, m->owner_tid); + queue_request(m); + + list_for(&locks, l, x) { + if (strcmp(m->resource, l->l.resource)) + continue; + + /* if it's owned locally, we need send a + GRANT to the first node on the request queue */ + if (l->l.owner_nodeid == my_node_id) { + if (l->l.state == LOCK_FREE) { + /* Set local state to PENDING to avoid double-grants */ + l->l.state = LOCK_PENDING; + grant_next(m); + } else { + /* state is PENDING or HELD */ + if (m->flags & FL_TRY) { + /* nack to client if needed */ + send_nak(m); + } + } + } + + + return 0; + } + + l = do_alloc(sizeof(*l)); + strncpy(l->l.resource, m->resource, sizeof(l->l.resource)); + l->l.state = LOCK_FREE; + list_insert(&locks, l); + + if (group_members->nodeid == my_node_id) { + /* Allocate a lock structure and immediately grant */ + l->l.state = LOCK_PENDING; + if (grant_next(m) == 0) + l->l.state = LOCK_FREE; + } + + return 0; +} + + +static int +is_member(uint32_t nodeid) +{ + struct member_node *n; + int x; + + list_for(&group_members, n, x) { + if (n->nodeid == nodeid) + return 1; + } + + return 0; +} + + +static int +process_grant(struct cpg_lock_msg *m, uint32_t nodeid) +{ + struct lock_node *l; + struct request_node *r; + int x, y; + + if (!joined) + return 0; + + list_for(&locks, l, x) { + if (strcmp(m->resource, l->l.resource)) + continue; + + if (l->l.state == LOCK_HELD) { + if (m->owner_pid == 0 || + m->owner_nodeid == 0) { + printf("GRANT averted\n"); + return 0; + } + } else { + l->l.state = LOCK_HELD; + } + + printf("GRANT %s: to %d:%d:%d\n", + m->resource, m->owner_nodeid, + m->owner_pid, m->owner_tid); + + l->l.owner_nodeid = m->owner_nodeid; + l->l.owner_pid = m->owner_pid; + l->l.owner_tid = m->owner_tid; + + list_for(&requests, r, y) { + if (strcmp(r->l.resource, m->resource)) + continue; + + if (r->l.owner_nodeid == m->owner_nodeid && + r->l.owner_pid == m->owner_pid && + r->l.owner_tid == m->owner_tid) { + list_remove(&requests, r); + free(r); + break; + } + } + + /* granted lock */ + if (l->l.owner_nodeid == my_node_id) { + if (grant_client(l) != 0) { + /* Grant to a nonexistent PID can + happen because we may have a pending + request after a fd was closed. + since we process on delivery, we + now simply make an unlock request + and move on */ + purge_requests(my_node_id, l->l.owner_pid); + if (grant_next(m) == 0) + send_unlock(m); + return 0; + } + } + + /* What if node has died with a GRANT in flight? */ + if (group_members->nodeid == my_node_id && + !is_member(l->l.owner_nodeid)) { + + printf("GRANT to non-member %d; giving to next requestor\n", l->l.owner_nodeid); + + l->l.state = LOCK_FREE; + if (grant_next(m) == 0) + send_unlock(m); + return 0; + } + return 0; + } + + /* Record lock state since we now know it */ + /* Allocate a lock structure */ + l = do_alloc(sizeof(*m)); + strncpy(l->l.resource, m->resource, sizeof(l->l.resource)); + l->l.state = LOCK_HELD; + l->l.owner_nodeid = m->owner_nodeid; + l->l.owner_pid = m->owner_pid; + l->l.owner_tid = m->owner_tid; + list_insert(&locks, l); + + return 0; +} + + +static int +process_nak(struct cpg_lock_msg *m, uint32_t nodeid) +{ + struct request_node *r = NULL; + int y; + + if (!joined) + return 0; + + list_for(&requests, r, y) { + if (strcmp(r->l.resource, m->resource)) + continue; + + if (r->l.owner_nodeid == m->owner_nodeid && + r->l.owner_pid == m->owner_pid && + r->l.owner_tid == m->owner_tid) { + list_remove(&requests, r); + if (r->l.owner_nodeid == my_node_id) { + if (nak_client(r) != 0) { + purge_requests(my_node_id, r->l.owner_pid); + } + } + free(r); + break; + } + } + + return 0; +} + + +static int +process_unlock(struct cpg_lock_msg *m, uint32_t nodeid) +{ + struct lock_node *l; + int x; + + if (!joined) + return 0; + + list_for(&locks, l, x) { + if (l->l.state != LOCK_HELD) + continue; + if (strcmp(m->resource, l->l.resource)) + continue; + + /* Held lock... if it's local, we need send a + GRANT to the first node on the request queue */ + if (l->l.owner_nodeid == m->owner_nodeid && + l->l.owner_pid == m->owner_pid) { + printf("UNLOCK %s: %d:%d:%d\n", m->resource, m->owner_nodeid, m->owner_pid, m->owner_tid); + l->l.state = LOCK_FREE; + if (grant_next(m) != 0) + l->l.state = LOCK_PENDING; + } + } + + return 0; +} + + +static int +find_lock(struct cpg_lock_msg *m) +{ + struct lock_node *l; + int x; + + if (m->resource[0] != 0) + return 0; + + list_for(&locks, l, x) { + if (m->lockid == l->l.local_id) { + strncpy(m->resource, l->l.resource, sizeof(m->resource)); + printf("LOCK %d -> %s\n", m->lockid, m->resource); + m->owner_nodeid = l->l.owner_nodeid; + m->owner_pid = l->l.owner_pid; + m->owner_tid = l->l.owner_tid; + m->lockid = 0; + return 0; + } + } + + return 1; +} + + +static int +process_join(struct cpg_lock_msg *m, uint32_t nodeid) +{ + struct member_node *n; + int x; + + list_for(&group_members, n, x) { + if (n->nodeid == nodeid) { + list_remove(&group_members, n); + list_append(&group_members, n); + printf("JOIN: moving %d to back\n", nodeid); + return 0; + } + } + + n = do_alloc(sizeof(*n)); + n->nodeid = nodeid; + printf("JOIN: node %d", n->nodeid); + if (nodeid == my_node_id) { + printf(" (self)"); + joined = 1; + } + total_members++; + printf("\n"); + list_insert(&group_members, n); + + return 0; +} + + +static int +process_request(struct cpg_lock_msg *m, uint32_t nodeid) +{ + if (m->request == MSG_HALT) { + printf("FAULT: Halting operations; see node %d\n", m->owner_nodeid); + while (1) + sleep(30); + } + + old_msg(m); + + switch (m->request) { + case MSG_LOCK: + process_lock(m); + break; + case MSG_NAK: + process_nak(m, nodeid); + break; + case MSG_GRANT: + process_grant(m, nodeid); + break; + case MSG_UNLOCK: + process_unlock(m, nodeid); + break; + case MSG_PURGE: + purge_requests(m->owner_nodeid, m->owner_pid); + break; + case MSG_JOIN: + process_join(m, nodeid); + break; + } + + return 0; +} + + +static void +cpg_deliver_func(cpg_handle_t h, + const struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + void *msg, + size_t msglen) +{ + + if (msglen != sizeof(struct cpg_lock_msg)) { + printf("Invalid message size %d\n", (int)msglen); + } + + process_request((struct cpg_lock_msg *)msg, nodeid); +} + + +static void +cpg_config_change(cpg_handle_t h, + const struct cpg_name *group_name, + const struct cpg_address *members, size_t memberlen, + const struct cpg_address *left, size_t leftlen, + const struct cpg_address *join, size_t joinlen) +{ + struct member_node *n; + size_t x, y; + struct cpg_lock_msg m; + + + memset(&m, 0, sizeof(m)); + strncpy(m.resource, "(none)", sizeof(m.resource)); + m.request = MSG_CONFCHG; + + old_msg(&m); + + if (total_members == 0) { + + printf("JOIN: Setting up initial node list\n"); + for (x = 0; x < memberlen; x++) { + for (y = 0; y < joinlen; y++) { + if (join[y].nodeid == members[x].nodeid) + continue; + if (members[x].nodeid == my_node_id) + continue; + + n = do_alloc(sizeof(*n)); + n->nodeid = members[x].nodeid; + printf("JOIN: node %d\n", n->nodeid); + list_insert(&group_members, n); + } + } + printf("JOIN: Done\n"); + + total_members = memberlen; + } + + //printf("members %d now, %d joined, %d left\n", memberlen, joinlen, leftlen); +#if 0 + + /* XXX process join on receipt of JOIN message rather than here + since ordered delivery is agreed, this prevents >1 member from + believing it is the oldest host */ + for (x = 0; x < joinlen; x++) { + n = do_alloc(sizeof(*n)); + n->nodeid = join[x].nodeid; + printf("ADD: node %d\n", n->nodeid); + list_insert(&group_members, n); + } +#endif + + for (x = 0; x < leftlen; x++) { + + list_for(&group_members, n, y) { + if (n->nodeid == left[x].nodeid) { + list_remove(&group_members, n); + printf("DELETE: node %d\n", n->nodeid); + del_node(n->nodeid); + free(n); + break; + } + } + + total_members -= leftlen; + if (total_members < 0) + total_members = 0; + } + +#if 0 + printf("MEMBERS:"); + list_for(&group_members, n, y) { + printf(" %d", n->nodeid); + } + printf("\n"); +#endif + + return; +} + + +static cpg_callbacks_t my_callbacks = { + .cpg_deliver_fn = cpg_deliver_func, + .cpg_confchg_fn = cpg_config_change +}; + + +static int +cpg_init(void) +{ + struct cpg_name gname; + + errno = EINVAL; + + gname.length = snprintf(gname.value, + sizeof(gname.value), + CPG_LOCKD_NAME); + if (gname.length >= sizeof(gname.value)) { + errno = ENAMETOOLONG; + return -1; + } + + if (gname.length <= 0) + return -1; + + memset(&cpg, 0, sizeof(cpg)); + if (cpg_initialize(&cpg, &my_callbacks) != CPG_OK) { + perror("cpg_initialize"); + return -1; + } + + if (cpg_join(cpg, &gname) != CPG_OK) { + perror("cpg_join"); + return -1; + } + + cpg_local_get(cpg, &my_node_id); + + return 0; +} + + +static int +cpg_fin(void) +{ + struct cpg_name gname; + + errno = EINVAL; + + gname.length = snprintf(gname.value, + sizeof(gname.value), + CPG_LOCKD_NAME); + if (gname.length >= sizeof(gname.value)) { + errno = ENAMETOOLONG; + return -1; + } + + if (gname.length <= 0) + return -1; + + cpg_leave(cpg, &gname); + cpg_finalize(cpg); + + return 0; +} + + +int +main(int argc, char **argv) +{ + fd_set rfds; + int fd; + int cpgfd; + int afd = -1; + int n,x; + + struct cpg_lock l; + struct cpg_lock_msg m; + struct client_node *client; + + l.state = 0; + + signal(SIGPIPE, SIG_IGN); + + fd = sock_listen(CPG_LOCKD_SOCK); + cpg_init(); + cpg_local_get(cpg, &my_node_id); + cpg_fd_get(cpg, &cpgfd); + if (send_join() < 0) + return -1; + + while (1) { + FD_ZERO(&rfds); + x = client_fdset(&rfds); + FD_SET(fd, &rfds); + if (fd > x) + x = fd; + FD_SET(cpgfd, &rfds); + if (cpgfd > x) + x = cpgfd; + + n = select_retry(x+1, &rfds, NULL, NULL, NULL); + if (n < 0) { + perror("select"); + return -1; + } + + if (FD_ISSET(fd, &rfds)) { + afd = accept(fd, NULL, NULL); + insert_client(afd); + --n; + } + + if (FD_ISSET(cpgfd, &rfds)) { + cpg_dispatch(cpg, CPG_DISPATCH_ONE); + --n; + } + + if (n <= 0) + continue; + + do { + list_for(&clients, client, x) { + if (!FD_ISSET(client->fd, &rfds)) + continue; + --n; + if (read_retry(client->fd, &m, sizeof(m), NULL) < 0) { + printf("Closing client fd %d pid %d: %d\n", + client->fd, client->pid, errno); + + del_client(client->fd); + break; + } + + /* send lock request */ + /* XXX check for dup connection */ + if (m.request == MSG_LOCK) { + client->pid = m.owner_pid; + send_lock(&m); + } + + if (m.request == MSG_UNLOCK) { + //printf("Unlock from fd %d\n", client->fd); + find_lock(&m); + if (grant_next(&m) == 0) + send_unlock(&m); + } + + if (m.request == MSG_DUMP) { + FILE *fp = fdopen(client->fd, "w"); + + list_remove(&clients, client); + dump_state(fp); + fclose(fp); + close(client->fd); + free(client); + break; + } + } + } while (n); + } + + cpg_fin(); + + return 0; +} diff --git a/rgmanager/src/daemons/main.c b/rgmanager/src/daemons/main.c index 931d95e..40717e6 100644 --- a/rgmanager/src/daemons/main.c +++ b/rgmanager/src/daemons/main.c @@ -987,14 +987,14 @@ int main(int argc, char **argv) { int rv, do_init = 1; - char foreground = 0, wd = 1; + char foreground = 0, wd = 1, cpg_locks = 0; cman_node_t me; msgctx_t *cluster_ctx; msgctx_t *local_ctx; pthread_t th; cman_handle_t clu = NULL;
- while ((rv = getopt(argc, argv, "wfdNq")) != EOF) { + while ((rv = getopt(argc, argv, "wfdNqC")) != EOF) { switch (rv) { case 'w': wd = 0; @@ -1011,6 +1011,9 @@ main(int argc, char **argv) case 'q': rgm_dbus_notify = 0; break; + case 'C': + cpg_locks = 1; + break; default: return 1; break; @@ -1057,10 +1060,19 @@ main(int argc, char **argv) return -1; }
- if (clu_lock_init(rgmanager_lsname) != 0) { - printf("Locks not working!\n"); - cman_finish(clu); - return -1; + if (!cpg_locks) { + if (clu_lock_init(rgmanager_lsname) != 0) { + printf("Locks not working!\n"); + cman_finish(clu); + return -1; + } + } else { + if (cpg_lock_initialize() != 0) { + printf("Locks not working!\n"); + cman_finish(clu); + return -1; + } + logt_print(LOG_INFO, "Using CPG for locking (EXPERIMENTAL)\n"); }
memset(&me, 0, sizeof(me));
cluster-commits@lists.stg.fedorahosted.org