From: David Teigland When a node is removed from a lockspace, recovery is required for that lockspace on all the remaining lockspace members. Recovery involves: a full rebuild of the distributed resource directory, selecting a new master node for locks/resources previously mastered on the removed node, and rebuilding master-copy locks on newly selected masters. Signed-off-by: Dave Teigland Signed-off-by: Patrick Caulfield Signed-off-by: Andrew Morton --- drivers/dlm/member.c | 356 +++++++++++++++++++++ drivers/dlm/member.h | 29 + drivers/dlm/rcom.c | 466 ++++++++++++++++++++++++++++ drivers/dlm/rcom.h | 24 + drivers/dlm/recover.c | 729 +++++++++++++++++++++++++++++++++++++++++++++ drivers/dlm/recover.h | 32 + drivers/dlm/recoverd.c | 714 ++++++++++++++++++++++++++++++++++++++++++++ drivers/dlm/recoverd.h | 24 + drivers/dlm/requestqueue.c | 144 ++++++++ drivers/dlm/requestqueue.h | 22 + 10 files changed, 2540 insertions(+) diff -puN /dev/null drivers/dlm/member.c --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/member.c 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,356 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "member_sysfs.h" +#include "lockspace.h" +#include "member.h" +#include "recoverd.h" +#include "recover.h" +#include "lowcomms.h" +#include "rcom.h" + +/* + * Following called by dlm_recoverd thread + */ + +static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) +{ + struct dlm_member *memb = NULL; + struct list_head *tmp; + struct list_head *newlist = &new->list; + struct list_head *head = &ls->ls_nodes; + + list_for_each(tmp, head) { + memb = list_entry(tmp, struct dlm_member, list); + if (new->nodeid < memb->nodeid) + break; + } + + if (!memb) + list_add_tail(newlist, head); + else { + /* FIXME: can use list macro here */ + newlist->prev = tmp->prev; + newlist->next = tmp; + tmp->prev->next = newlist; + tmp->prev = newlist; + } +} + +int dlm_add_member(struct dlm_ls *ls, int nodeid) +{ + struct dlm_member *memb; + + memb = kmalloc(sizeof(struct dlm_member), GFP_KERNEL); + if (!memb) + return -ENOMEM; + + memb->nodeid = nodeid; + add_ordered_member(ls, memb); + ls->ls_num_nodes++; + return 0; +} + +void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) +{ + list_move(&memb->list, &ls->ls_nodes_gone); + ls->ls_num_nodes--; +} + +int dlm_is_member(struct dlm_ls *ls, int nodeid) +{ + struct dlm_member *memb; + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->nodeid == nodeid) + return TRUE; + } + return FALSE; +} + +int dlm_is_removed(struct dlm_ls *ls, int nodeid) +{ + struct dlm_member *memb; + + list_for_each_entry(memb, &ls->ls_nodes_gone, list) { + if (memb->nodeid == nodeid) + return TRUE; + } + return FALSE; +} + +static void clear_memb_list(struct list_head *head) +{ + struct dlm_member *memb; + + while (!list_empty(head)) { + memb = list_entry(head->next, struct dlm_member, list); + list_del(&memb->list); + kfree(memb); + } +} + +void dlm_clear_members(struct dlm_ls *ls) +{ + clear_memb_list(&ls->ls_nodes); + ls->ls_num_nodes = 0; +} + +void dlm_clear_members_gone(struct dlm_ls *ls) +{ + clear_memb_list(&ls->ls_nodes_gone); +} + +void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event) +{ + struct dlm_member *memb, *safe; + + list_for_each_entry_safe(memb, safe, &ls->ls_nodes_gone, list) { + if (memb->gone_event <= finish_event) { + list_del(&memb->list); + kfree(memb); + } + } +} + +static void make_member_array(struct dlm_ls *ls) +{ + struct dlm_member *memb; + int i = 0, *array; + + if (ls->ls_node_array) { + kfree(ls->ls_node_array); + ls->ls_node_array = NULL; + } + + array = kmalloc(sizeof(int) * ls->ls_num_nodes, GFP_KERNEL); + if (!array) + return; + + list_for_each_entry(memb, &ls->ls_nodes, list) + array[i++] = memb->nodeid; + + ls->ls_node_array = array; +} + +/* send a status request to all members just to establish comms connections */ + +static void ping_members(struct dlm_ls *ls) +{ + struct dlm_member *memb; + list_for_each_entry(memb, &ls->ls_nodes, list) + dlm_rcom_status(ls, memb->nodeid); +} + +int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) +{ + struct dlm_member *memb, *safe; + int i, error, found, pos = 0, neg = 0, low = -1; + + /* move departed members from ls_nodes to ls_nodes_gone */ + + list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { + found = FALSE; + for (i = 0; i < rv->node_count; i++) { + if (memb->nodeid == rv->nodeids[i]) { + found = TRUE; + break; + } + } + + if (!found) { + neg++; + memb->gone_event = rv->event_id; + dlm_remove_member(ls, memb); + log_debug(ls, "remove member %d", memb->nodeid); + } + } + + /* add new members to ls_nodes */ + + for (i = 0; i < rv->node_count; i++) { + if (dlm_is_member(ls, rv->nodeids[i])) + continue; + dlm_add_member(ls, rv->nodeids[i]); + pos++; + log_debug(ls, "add member %d", rv->nodeids[i]); + } + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (low == -1 || memb->nodeid < low) + low = memb->nodeid; + } + ls->ls_low_nodeid = low; + + make_member_array(ls); + set_bit(LSFL_NODES_VALID, &ls->ls_flags); + *neg_out = neg; + + ping_members(ls); + + error = dlm_recover_members_wait(ls); + log_debug(ls, "total members %d", ls->ls_num_nodes); + return error; +} + +int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv) +{ + int i, error, nodeid, low = -1; + + dlm_clear_members(ls); + + log_debug(ls, "add members"); + + for (i = 0; i < rv->node_count; i++) { + nodeid = rv->nodeids[i]; + dlm_add_member(ls, nodeid); + + if (low == -1 || nodeid < low) + low = nodeid; + } + ls->ls_low_nodeid = low; + + make_member_array(ls); + set_bit(LSFL_NODES_VALID, &ls->ls_flags); + + ping_members(ls); + + error = dlm_recover_members_wait(ls); + log_debug(ls, "total members %d", ls->ls_num_nodes); + return error; +} + +/* + * Following called from member_sysfs.c + */ + +int dlm_ls_terminate(struct dlm_ls *ls) +{ + spin_lock(&ls->ls_recover_lock); + set_bit(LSFL_LS_TERMINATE, &ls->ls_flags); + set_bit(LSFL_JOIN_DONE, &ls->ls_flags); + set_bit(LSFL_LEAVE_DONE, &ls->ls_flags); + spin_unlock(&ls->ls_recover_lock); + wake_up(&ls->ls_wait_member); + log_error(ls, "dlm_ls_terminate"); + return 0; +} + +int dlm_ls_stop(struct dlm_ls *ls) +{ + int new; + + spin_lock(&ls->ls_recover_lock); + ls->ls_last_stop = ls->ls_last_start; + set_bit(LSFL_LS_STOP, &ls->ls_flags); + new = test_and_clear_bit(LSFL_LS_RUN, &ls->ls_flags); + spin_unlock(&ls->ls_recover_lock); + + /* + * This in_recovery lock does two things: + * + * 1) Keeps this function from returning until all threads are out + * of locking routines and locking is truely stopped. + * 2) Keeps any new requests from being processed until it's unlocked + * when recovery is complete. + */ + + if (new) + down_write(&ls->ls_in_recovery); + + /* + * The recoverd suspend/resume makes sure that dlm_recoverd (if + * running) has noticed the clearing of LS_RUN above and quit + * processing the previous recovery. This will be true for all nodes + * before any nodes get the start. + */ + + dlm_recoverd_suspend(ls); + clear_bit(LSFL_DIR_VALID, &ls->ls_flags); + clear_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags); + clear_bit(LSFL_NODES_VALID, &ls->ls_flags); + clear_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags); + dlm_recoverd_resume(ls); + dlm_recoverd_kick(ls); + return 0; +} + +int dlm_ls_start(struct dlm_ls *ls, int event_nr) +{ + struct dlm_recover *rv; + int error = 0; + + rv = kmalloc(sizeof(struct dlm_recover), GFP_KERNEL); + if (!rv) + return -ENOMEM; + memset(rv, 0, sizeof(struct dlm_recover)); + + spin_lock(&ls->ls_recover_lock); + + if (test_bit(LSFL_LS_RUN, &ls->ls_flags)) { + spin_unlock(&ls->ls_recover_lock); + log_error(ls, "start ignored: lockspace running"); + kfree(rv); + error = -EINVAL; + goto out; + } + + if (!ls->ls_nodeids_next) { + spin_unlock(&ls->ls_recover_lock); + log_error(ls, "start ignored: existing nodeids_next"); + kfree(rv); + error = -EINVAL; + goto out; + } + + if (event_nr <= ls->ls_last_start) { + spin_unlock(&ls->ls_recover_lock); + log_error(ls, "start event_nr %d not greater than last %d", + event_nr, ls->ls_last_start); + kfree(rv); + error = -EINVAL; + goto out; + } + + rv->nodeids = ls->ls_nodeids_next; + ls->ls_nodeids_next = NULL; + rv->node_count = ls->ls_nodeids_next_count; + rv->event_id = event_nr; + ls->ls_last_start = event_nr; + list_add_tail(&rv->list, &ls->ls_recover); + set_bit(LSFL_LS_START, &ls->ls_flags); + spin_unlock(&ls->ls_recover_lock); + + set_bit(LSFL_JOIN_DONE, &ls->ls_flags); + wake_up(&ls->ls_wait_member); + dlm_recoverd_kick(ls); + out: + return error; +} + +int dlm_ls_finish(struct dlm_ls *ls, int event_nr) +{ + spin_lock(&ls->ls_recover_lock); + if (event_nr != ls->ls_last_start) { + spin_unlock(&ls->ls_recover_lock); + log_error(ls, "finish event_nr %d doesn't match start %d", + event_nr, ls->ls_last_start); + return -EINVAL; + } + ls->ls_last_finish = event_nr; + set_bit(LSFL_LS_FINISH, &ls->ls_flags); + spin_unlock(&ls->ls_recover_lock); + dlm_recoverd_kick(ls); + return 0; +} + diff -puN /dev/null drivers/dlm/member.h --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/member.h 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,29 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#ifndef __MEMBER_DOT_H__ +#define __MEMBER_DOT_H__ + +int dlm_ls_terminate(struct dlm_ls *ls); +int dlm_ls_stop(struct dlm_ls *ls); +int dlm_ls_start(struct dlm_ls *ls, int event_nr); +int dlm_ls_finish(struct dlm_ls *ls, int event_nr); + +void dlm_clear_members(struct dlm_ls *ls); +void dlm_clear_members_gone(struct dlm_ls *ls); +void dlm_clear_members_finish(struct dlm_ls *ls, int finish_event); +int dlm_recover_members_first(struct dlm_ls *ls, struct dlm_recover *rv); +int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); +int dlm_is_removed(struct dlm_ls *ls, int nodeid); + +#endif /* __MEMBER_DOT_H__ */ + diff -puN /dev/null drivers/dlm/rcom.c --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/rcom.c 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,466 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "lockspace.h" +#include "member.h" +#include "lowcomms.h" +#include "midcomms.h" +#include "rcom.h" +#include "recover.h" +#include "dir.h" +#include "config.h" +#include "memory.h" +#include "lock.h" +#include "util.h" + + +static int rcom_response(struct dlm_ls *ls) +{ + return test_bit(LSFL_RCOM_READY, &ls->ls_flags); +} + +static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, + struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + char *mb; + int mb_len = sizeof(struct dlm_rcom) + len; + + mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); + if (!mh) { + log_print("create_rcom to %d type %d len %d ENOBUFS", + to_nodeid, type, len); + return -ENOBUFS; + } + memset(mb, 0, mb_len); + + rc = (struct dlm_rcom *) mb; + + rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); + rc->rc_header.h_lockspace = ls->ls_global_id; + rc->rc_header.h_nodeid = dlm_our_nodeid(); + rc->rc_header.h_length = mb_len; + rc->rc_header.h_cmd = DLM_RCOM; + + rc->rc_type = type; + + *mh_ret = mh; + *rc_ret = rc; + return 0; +} + +static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh, + struct dlm_rcom *rc) +{ + dlm_rcom_out(rc); + dlm_lowcomms_commit_buffer(mh); +} + +/* When replying to a status request, a node also sends back its + configuration values. The requesting node then checks that the remote + node is configured the same way as itself. */ + +static void make_config(struct dlm_ls *ls, struct rcom_config *rf) +{ + rf->rf_lvblen = ls->ls_lvblen; +} + +static int check_config(struct dlm_ls *ls, struct rcom_config *rf) +{ + if (rf->rf_lvblen != ls->ls_lvblen) + return -EINVAL; + return 0; +} + +static int make_status(struct dlm_ls *ls) +{ + int status = 0; + + if (test_bit(LSFL_NODES_VALID, &ls->ls_flags)) + status |= NODES_VALID; + + if (test_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags)) + status |= NODES_ALL_VALID; + + if (test_bit(LSFL_DIR_VALID, &ls->ls_flags)) + status |= DIR_VALID; + + if (test_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags)) + status |= DIR_ALL_VALID; + + if (test_bit(LSFL_LOCKS_VALID, &ls->ls_flags)) + status |= LOCKS_VALID; + + if (test_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags)) + status |= LOCKS_ALL_VALID; + + return status; +} + +int dlm_rcom_status(struct dlm_ls *ls, int nodeid) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error = 0; + + memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); + + if (nodeid == dlm_our_nodeid()) { + rc = (struct dlm_rcom *) ls->ls_recover_buf; + rc->rc_result = make_status(ls); + goto out; + } + + error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); + if (error) + goto out; + + send_rcom(ls, mh, rc); + + error = dlm_wait_function(ls, &rcom_response); + clear_bit(LSFL_RCOM_READY, &ls->ls_flags); + if (error) + goto out; + + rc = (struct dlm_rcom *) ls->ls_recover_buf; + error = check_config(ls, (struct rcom_config *) rc->rc_buf); + out: + return error; +} + +static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error, nodeid = rc_in->rc_header.h_nodeid; + + error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, + sizeof(struct rcom_config), &rc, &mh); + if (error) + return; + rc->rc_result = make_status(ls); + make_config(ls, (struct rcom_config *) rc->rc_buf); + + send_rcom(ls, mh, rc); +} + +static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); + set_bit(LSFL_RCOM_READY, &ls->ls_flags); + wake_up(&ls->ls_wait_general); +} + +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error = 0, len = sizeof(struct dlm_rcom); + + memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); + + if (nodeid == dlm_our_nodeid()) { + dlm_copy_master_names(ls, last_name, last_len, + ls->ls_recover_buf + len, + dlm_config.buffer_size - len, nodeid); + goto out; + } + + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); + if (error) + goto out; + memcpy(rc->rc_buf, last_name, last_len); + + send_rcom(ls, mh, rc); + + error = dlm_wait_function(ls, &rcom_response); + clear_bit(LSFL_RCOM_READY, &ls->ls_flags); + out: + return error; +} + +static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error, inlen, outlen; + int nodeid = rc_in->rc_header.h_nodeid; + + /* + * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while + * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes). + * It could only happen in rare cases where we get a late NAMES + * message from a previous instance of recovery. + */ + + if (!test_bit(LSFL_NODES_VALID, &ls->ls_flags)) { + log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid); + return; + } + + nodeid = rc_in->rc_header.h_nodeid; + inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); + outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom); + + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh); + if (error) + return; + + dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, + nodeid); + send_rcom(ls, mh, rc); +} + +static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); + set_bit(LSFL_RCOM_READY, &ls->ls_flags); + wake_up(&ls->ls_wait_general); +} + +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + struct dlm_ls *ls = r->res_ls; + int error; + + error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, + &rc, &mh); + if (error) + goto out; + memcpy(rc->rc_buf, r->res_name, r->res_length); + rc->rc_id = (unsigned long) r; + + send_rcom(ls, mh, rc); + out: + return error; +} + +static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid; + int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); + + error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); + if (error) + return; + + error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); + if (error) + ret_nodeid = error; + rc->rc_result = ret_nodeid; + rc->rc_id = rc_in->rc_id; + + send_rcom(ls, mh, rc); +} + +static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + dlm_recover_master_reply(ls, rc_in); +} + +static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, + struct rcom_lock *rl) +{ + memset(rl, 0, sizeof(*rl)); + + rl->rl_ownpid = lkb->lkb_ownpid; + rl->rl_lkid = lkb->lkb_id; + rl->rl_exflags = lkb->lkb_exflags; + rl->rl_flags = lkb->lkb_flags; + rl->rl_lvbseq = lkb->lkb_lvbseq; + rl->rl_rqmode = lkb->lkb_rqmode; + rl->rl_grmode = lkb->lkb_grmode; + rl->rl_status = lkb->lkb_status; + rl->rl_wait_type = lkb->lkb_wait_type; + + if (lkb->lkb_bastaddr) + rl->rl_asts |= AST_BAST; + if (lkb->lkb_astaddr) + rl->rl_asts |= AST_COMP; + + if (lkb->lkb_range) + memcpy(rl->rl_range, lkb->lkb_range, 4*sizeof(uint64_t)); + + rl->rl_namelen = r->res_length; + memcpy(rl->rl_name, r->res_name, r->res_length); + + /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ? + If so, receive_rcom_lock_args() won't take this copy. */ + + if (lkb->lkb_lvbptr) + memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); +} + +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +{ + struct dlm_ls *ls = r->res_ls; + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + struct rcom_lock *rl; + int error, len = sizeof(struct rcom_lock); + + if (lkb->lkb_lvbptr) + len += ls->ls_lvblen; + + error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); + if (error) + goto out; + + rl = (struct rcom_lock *) rc->rc_buf; + pack_rcom_lock(r, lkb, rl); + rc->rc_id = (unsigned long) r; + + send_rcom(ls, mh, rc); + out: + return error; +} + +static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + int error, nodeid = rc_in->rc_header.h_nodeid; + + dlm_recover_master_copy(ls, rc_in); + + error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, + sizeof(struct rcom_lock), &rc, &mh); + if (error) + return; + + /* We send back the same rcom_lock struct we received, but + dlm_recover_master_copy() has filled in rl_remid and rl_result */ + + memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); + rc->rc_id = rc_in->rc_id; + + send_rcom(ls, mh, rc); +} + +static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + if (!test_bit(LSFL_DIR_VALID, &ls->ls_flags)) { + log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u", + rc_in->rc_header.h_nodeid); + return; + } + + dlm_recover_process_copy(ls, rc_in); +} + +static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + char *mb; + int mb_len = sizeof(struct dlm_rcom); + + mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb); + if (!mh) + return -ENOBUFS; + memset(mb, 0, mb_len); + + rc = (struct dlm_rcom *) mb; + + rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); + rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace; + rc->rc_header.h_nodeid = dlm_our_nodeid(); + rc->rc_header.h_length = mb_len; + rc->rc_header.h_cmd = DLM_RCOM; + + rc->rc_type = DLM_RCOM_STATUS_REPLY; + rc->rc_result = 0; + + dlm_rcom_out(rc); + dlm_lowcomms_commit_buffer(mh); + + return 0; +} + +/* Called by dlm_recvd; corresponds to dlm_receive_message() but special + recovery-only comms are sent through here. */ + +void dlm_receive_rcom(struct dlm_header *hd, int nodeid) +{ + struct dlm_rcom *rc = (struct dlm_rcom *) hd; + struct dlm_ls *ls; + + dlm_rcom_in(rc); + + /* If the lockspace doesn't exist then still send a status message + back; it's possible that it just doesn't have its global_id yet. */ + + ls = dlm_find_lockspace_global(hd->h_lockspace); + if (!ls) { + send_ls_not_ready(nodeid, rc); + return; + } + + if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { + log_error(ls, "ignoring recovery message %x from %d", + rc->rc_type, nodeid); + goto out; + } + + if (nodeid != rc->rc_header.h_nodeid) { + log_error(ls, "bad rcom nodeid %d from %d", + rc->rc_header.h_nodeid, nodeid); + goto out; + } + + switch (rc->rc_type) { + case DLM_RCOM_STATUS: + receive_rcom_status(ls, rc); + break; + + case DLM_RCOM_NAMES: + receive_rcom_names(ls, rc); + break; + + case DLM_RCOM_LOOKUP: + receive_rcom_lookup(ls, rc); + break; + + case DLM_RCOM_LOCK: + receive_rcom_lock(ls, rc); + break; + + case DLM_RCOM_STATUS_REPLY: + receive_rcom_status_reply(ls, rc); + break; + + case DLM_RCOM_NAMES_REPLY: + receive_rcom_names_reply(ls, rc); + break; + + case DLM_RCOM_LOOKUP_REPLY: + receive_rcom_lookup_reply(ls, rc); + break; + + case DLM_RCOM_LOCK_REPLY: + receive_rcom_lock_reply(ls, rc); + break; + + default: + DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type);); + } + out: + dlm_put_lockspace(ls); +} + diff -puN /dev/null drivers/dlm/rcom.h --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/rcom.h 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,24 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#ifndef __RCOM_DOT_H__ +#define __RCOM_DOT_H__ + +int dlm_rcom_status(struct dlm_ls *ls, int nodeid); +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); +void dlm_receive_rcom(struct dlm_header *hd, int nodeid); + +#endif + diff -puN /dev/null drivers/dlm/recover.c --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/recover.c 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,729 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "lockspace.h" +#include "dir.h" +#include "config.h" +#include "ast.h" +#include "memory.h" +#include "rcom.h" +#include "lock.h" +#include "lowcomms.h" +#include "member.h" + +static struct timer_list dlm_timer; + + +/* + * Recovery waiting routines: these functions wait for a particular reply from + * a remote node, or for the remote node to report a certain status. They need + * to abort if the lockspace is stopped indicating a node has failed (perhaps + * the one being waited for). + */ + +int dlm_recovery_stopped(struct dlm_ls *ls) +{ + return test_bit(LSFL_LS_STOP, &ls->ls_flags); +} + +/* + * Wait until given function returns non-zero or lockspace is stopped (LS_STOP + * set due to failure of a node in ls_nodes). When another function thinks it + * could have completed the waited-on task, they should wake up ls_wait_general + * to get an immediate response rather than waiting for the timer to detect the + * result. A timer wakes us up periodically while waiting to see if we should + * abort due to a node failure. This should only be called by the dlm_recoverd + * thread. + */ + +static void dlm_wait_timer_fn(unsigned long data) +{ + struct dlm_ls *ls = (struct dlm_ls *) data; + mod_timer(&dlm_timer, jiffies + (dlm_config.recover_timer * HZ)); + wake_up(&ls->ls_wait_general); +} + +int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) +{ + int error = 0; + + init_timer(&dlm_timer); + dlm_timer.function = dlm_wait_timer_fn; + dlm_timer.data = (long) ls; + dlm_timer.expires = jiffies + (dlm_config.recover_timer * HZ); + add_timer(&dlm_timer); + + wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls)); + del_timer_sync(&dlm_timer); + + if (dlm_recovery_stopped(ls)) + error = -EINTR; + return error; +} + +/* + * An efficient way for all nodes to wait for all others to have a certain + * status. The node with the lowest nodeid polls all the others for their + * status (dlm_wait_status_all) and all the others poll the node with the low + * id for its accumulated result (dlm_wait_status_low). + */ + +static int dlm_wait_status_all(struct dlm_ls *ls, unsigned int wait_status) +{ + struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf; + struct dlm_member *memb; + int error = 0, delay; + + list_for_each_entry(memb, &ls->ls_nodes, list) { + delay = 0; + for (;;) { + error = dlm_recovery_stopped(ls); + if (error) + goto out; + + error = dlm_rcom_status(ls, memb->nodeid); + if (error) + goto out; + + if (rc->rc_result & wait_status) + break; + if (delay < 1000) + delay += 20; + msleep(delay); + } + } + out: + return error; +} + +static int dlm_wait_status_low(struct dlm_ls *ls, unsigned int wait_status) +{ + struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf; + int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; + + for (;;) { + error = dlm_recovery_stopped(ls); + if (error) + goto out; + + error = dlm_rcom_status(ls, nodeid); + if (error) + break; + + if (rc->rc_result & wait_status) + break; + if (delay < 1000) + delay += 20; + msleep(delay); + } + out: + return error; +} + +int dlm_recover_members_wait(struct dlm_ls *ls) +{ + int error; + + if (ls->ls_low_nodeid == dlm_our_nodeid()) { + error = dlm_wait_status_all(ls, NODES_VALID); + if (!error) + set_bit(LSFL_ALL_NODES_VALID, &ls->ls_flags); + } else + error = dlm_wait_status_low(ls, NODES_ALL_VALID); + + return error; +} + +int dlm_recover_directory_wait(struct dlm_ls *ls) +{ + int error; + + if (ls->ls_low_nodeid == dlm_our_nodeid()) { + error = dlm_wait_status_all(ls, DIR_VALID); + if (!error) + set_bit(LSFL_ALL_DIR_VALID, &ls->ls_flags); + } else + error = dlm_wait_status_low(ls, DIR_ALL_VALID); + + return error; +} + +int dlm_recover_locks_wait(struct dlm_ls *ls) +{ + int error; + + if (ls->ls_low_nodeid == dlm_our_nodeid()) { + error = dlm_wait_status_all(ls, LOCKS_VALID); + if (!error) + set_bit(LSFL_ALL_LOCKS_VALID, &ls->ls_flags); + } else + error = dlm_wait_status_low(ls, LOCKS_ALL_VALID); + + return error; +} + +/* + * The recover_list contains all the rsb's for which we've requested the new + * master nodeid. As replies are returned from the resource directories the + * rsb's are removed from the list. When the list is empty we're done. + * + * The recover_list is later similarly used for all rsb's for which we've sent + * new lkb's and need to receive new corresponding lkid's. + * + * We use the address of the rsb struct as a simple local identifier for the + * rsb so we can match an rcom reply with the rsb it was sent for. + */ + +static int recover_list_empty(struct dlm_ls *ls) +{ + int empty; + + spin_lock(&ls->ls_recover_list_lock); + empty = list_empty(&ls->ls_recover_list); + spin_unlock(&ls->ls_recover_list_lock); + + return empty; +} + +static void recover_list_add(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + + spin_lock(&ls->ls_recover_list_lock); + if (list_empty(&r->res_recover_list)) { + list_add_tail(&r->res_recover_list, &ls->ls_recover_list); + ls->ls_recover_list_count++; + dlm_hold_rsb(r); + } + spin_unlock(&ls->ls_recover_list_lock); +} + +static void recover_list_del(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + + spin_lock(&ls->ls_recover_list_lock); + list_del_init(&r->res_recover_list); + ls->ls_recover_list_count--; + spin_unlock(&ls->ls_recover_list_lock); + + dlm_put_rsb(r); +} + +static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id) +{ + struct dlm_rsb *r = NULL; + + spin_lock(&ls->ls_recover_list_lock); + + list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) { + if (id == (unsigned long) r) + goto out; + } + r = NULL; + out: + spin_unlock(&ls->ls_recover_list_lock); + return r; +} + +void recover_list_clear(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *s; + + spin_lock(&ls->ls_recover_list_lock); + list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { + list_del_init(&r->res_recover_list); + dlm_put_rsb(r); + ls->ls_recover_list_count--; + } + + if (ls->ls_recover_list_count != 0) { + log_error(ls, "warning: recover_list_count %d", + ls->ls_recover_list_count); + ls->ls_recover_list_count = 0; + } + spin_unlock(&ls->ls_recover_list_lock); +} + + +/* Master recovery: find new master node for rsb's that were + mastered on nodes that have been removed. + + dlm_recover_masters + recover_master + dlm_send_rcom_lookup -> receive_rcom_lookup + dlm_dir_lookup + receive_rcom_lookup_reply <- + dlm_recover_master_reply + set_new_master + set_master_lkbs + set_lock_master +*/ + +/* + * Set the lock master for all LKBs in a lock queue + * If we are the new master of the rsb, we may have received new + * MSTCPY locks from other nodes already which we need to ignore + * when setting the new nodeid. + */ + +static void set_lock_master(struct list_head *queue, int nodeid) +{ + struct dlm_lkb *lkb; + + list_for_each_entry(lkb, queue, lkb_statequeue) + if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) + lkb->lkb_nodeid = nodeid; +} + +static void set_master_lkbs(struct dlm_rsb *r) +{ + set_lock_master(&r->res_grantqueue, r->res_nodeid); + set_lock_master(&r->res_convertqueue, r->res_nodeid); + set_lock_master(&r->res_waitqueue, r->res_nodeid); +} + +/* + * Propogate the new master nodeid to locks + * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. + * The NEW_MASTER2 flag tells recover_lvb() which rsb's to consider. + */ + +static void set_new_master(struct dlm_rsb *r, int nodeid) +{ + lock_rsb(r); + + if (nodeid == dlm_our_nodeid()) + r->res_nodeid = 0; + else + r->res_nodeid = nodeid; + + set_master_lkbs(r); + + set_bit(RESFL_NEW_MASTER, &r->res_flags); + set_bit(RESFL_NEW_MASTER2, &r->res_flags); + unlock_rsb(r); +} + +/* + * We do async lookups on rsb's that need new masters. The rsb's + * waiting for a lookup reply are kept on the recover_list. + */ + +static int recover_master(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + + dir_nodeid = dlm_dir_nodeid(r); + + if (dir_nodeid == our_nodeid) { + error = dlm_dir_lookup(ls, our_nodeid, r->res_name, + r->res_length, &ret_nodeid); + if (error) + log_error(ls, "recover dir lookup error %d", error); + + set_new_master(r, ret_nodeid); + } else { + recover_list_add(r); + error = dlm_send_rcom_lookup(r, dir_nodeid); + } + + return error; +} + +/* + * Go through local root resources and for each rsb which has a master which + * has departed, get the new master nodeid from the directory. The dir will + * assign mastery to the first node to look up the new master. That means + * we'll discover in this lookup if we're the new master of any rsb's. + * + * We fire off all the dir lookup requests individually and asynchronously to + * the correct dir node. + */ + +int dlm_recover_masters(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int error, count = 0; + + log_debug(ls, "dlm_recover_masters"); + + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + if (is_master(r)) + continue; + + error = dlm_recovery_stopped(ls); + if (error) { + up_read(&ls->ls_root_sem); + goto out; + } + + if (dlm_is_removed(ls, r->res_nodeid)) { + recover_master(r); + count++; + } + + schedule(); + } + up_read(&ls->ls_root_sem); + + log_debug(ls, "dlm_recover_masters %d resources", count); + + error = dlm_wait_function(ls, &recover_list_empty); + out: + if (error) + recover_list_clear(ls); + return error; +} + +int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) +{ + struct dlm_rsb *r; + + r = recover_list_find(ls, rc->rc_id); + if (!r) { + log_error(ls, "dlm_recover_master_reply no id %"PRIx64"", + rc->rc_id); + goto out; + } + + set_new_master(r, rc->rc_result); + recover_list_del(r); + + if (recover_list_empty(ls)) + wake_up(&ls->ls_wait_general); + out: + return 0; +} + + +/* Lock recovery: rebuild the process-copy locks we hold on a + remastered rsb on the new rsb master. + + dlm_recover_locks + recover_locks + recover_locks_queue + dlm_send_rcom_lock -> receive_rcom_lock + dlm_recover_master_copy + receive_rcom_lock_reply <- + dlm_recover_process_copy +*/ + + +/* + * keep a count of the number of lkb's we send to the new master; when we get + * an equal number of replies then recovery for the rsb is done + */ + +static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) +{ + struct dlm_lkb *lkb; + int error = 0; + + list_for_each_entry(lkb, head, lkb_statequeue) { + error = dlm_send_rcom_lock(r, lkb); + if (error) + break; + r->res_recover_locks_count++; + } + + return error; +} + +static int all_queues_empty(struct dlm_rsb *r) +{ + if (!list_empty(&r->res_grantqueue) || + !list_empty(&r->res_convertqueue) || + !list_empty(&r->res_waitqueue)) + return FALSE; + return TRUE; +} + +static int recover_locks(struct dlm_rsb *r) +{ + int error = 0; + + lock_rsb(r); + if (all_queues_empty(r)) + goto out; + + DLM_ASSERT(!r->res_recover_locks_count, dlm_print_rsb(r);); + + error = recover_locks_queue(r, &r->res_grantqueue); + if (error) + goto out; + error = recover_locks_queue(r, &r->res_convertqueue); + if (error) + goto out; + error = recover_locks_queue(r, &r->res_waitqueue); + if (error) + goto out; + + if (r->res_recover_locks_count) + recover_list_add(r); + out: + unlock_rsb(r); + return error; +} + +int dlm_recover_locks(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int error, count = 0; + + log_debug(ls, "dlm_recover_locks"); + + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + if (is_master(r)) { + clear_bit(RESFL_NEW_MASTER, &r->res_flags); + continue; + } + + if (!test_bit(RESFL_NEW_MASTER, &r->res_flags)) + continue; + + error = dlm_recovery_stopped(ls); + if (error) { + up_read(&ls->ls_root_sem); + goto out; + } + + error = recover_locks(r); + if (error) { + up_read(&ls->ls_root_sem); + goto out; + } + + count += r->res_recover_locks_count; + } + up_read(&ls->ls_root_sem); + + log_debug(ls, "dlm_recover_locks %d locks", count); + + error = dlm_wait_function(ls, &recover_list_empty); + out: + if (error) + recover_list_clear(ls); + else + set_bit(LSFL_LOCKS_VALID, &ls->ls_flags); + return error; +} + +void dlm_recovered_lock(struct dlm_rsb *r) +{ + r->res_recover_locks_count--; + if (!r->res_recover_locks_count) { + clear_bit(RESFL_NEW_MASTER, &r->res_flags); + recover_list_del(r); + } + + if (recover_list_empty(r->res_ls)) + wake_up(&r->res_ls->ls_wait_general); +} + +/* + * The lvb needs to be recovered on all master rsb's. This includes setting + * the VALNOTVALID flag if necessary, and determining the correct lvb contents + * based on the lvb's of the locks held on the rsb. + * + * RESFL_VALNOTVALID is set if there are only NL/CR locks on the rsb. If it + * was already set prior to recovery, it's not cleared, regardless of locks. + * + * The LVB contents are only considered for changing when this is a new master + * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with + * mode > CR. If no lkb's exist with mode above CR, the lvb contents are taken + * from the lkb with the largest lvb sequence number. + */ + +static void recover_lvb(struct dlm_rsb *r) +{ + struct dlm_lkb *lkb, *high_lkb = NULL; + uint32_t high_seq = 0; + int lock_lvb_exists = FALSE; + int big_lock_exists = FALSE; + int lvblen = r->res_ls->ls_lvblen; + + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { + if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) + continue; + + lock_lvb_exists = TRUE; + + if (lkb->lkb_grmode > DLM_LOCK_CR) { + big_lock_exists = TRUE; + goto setflag; + } + + if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) { + high_lkb = lkb; + high_seq = lkb->lkb_lvbseq; + } + } + + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { + if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) + continue; + + lock_lvb_exists = TRUE; + + if (lkb->lkb_grmode > DLM_LOCK_CR) { + big_lock_exists = TRUE; + goto setflag; + } + + if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) { + high_lkb = lkb; + high_seq = lkb->lkb_lvbseq; + } + } + + setflag: + if (!lock_lvb_exists) + goto out; + + if (!big_lock_exists) + set_bit(RESFL_VALNOTVALID, &r->res_flags); + + /* don't mess with the lvb unless we're the new master */ + if (!test_bit(RESFL_NEW_MASTER2, &r->res_flags)) + goto out; + + if (!r->res_lvbptr) { + r->res_lvbptr = allocate_lvb(r->res_ls); + if (!r->res_lvbptr) + goto out; + } + + if (big_lock_exists) { + r->res_lvbseq = lkb->lkb_lvbseq; + memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen); + } else if (high_lkb) { + r->res_lvbseq = high_lkb->lkb_lvbseq; + memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen); + } else { + r->res_lvbseq = 0; + memset(r->res_lvbptr, 0, lvblen); + } + out: + return; +} + +/* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks + converting PR->CW or CW->PR need to have their lkb_grmode set. */ + +static void recover_conversion(struct dlm_rsb *r) +{ + struct dlm_lkb *lkb; + int grmode = -1; + + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { + if (lkb->lkb_grmode == DLM_LOCK_PR || + lkb->lkb_grmode == DLM_LOCK_CW) { + grmode = lkb->lkb_grmode; + break; + } + } + + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { + if (lkb->lkb_grmode != DLM_LOCK_IV) + continue; + if (grmode == -1) + lkb->lkb_grmode = lkb->lkb_rqmode; + else + lkb->lkb_grmode = grmode; + } +} + +void dlm_recover_rsbs(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int count = 0; + + log_debug(ls, "dlm_recover_rsbs"); + + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + lock_rsb(r); + if (is_master(r)) { + if (test_bit(RESFL_RECOVER_CONVERT, &r->res_flags)) + recover_conversion(r); + recover_lvb(r); + count++; + } + clear_bit(RESFL_RECOVER_CONVERT, &r->res_flags); + unlock_rsb(r); + } + up_read(&ls->ls_root_sem); + + log_debug(ls, "dlm_recover_rsbs %d rsbs", count); +} + +/* Create a single list of all root rsb's to be used during recovery */ + +int dlm_create_root_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + int i, error = 0; + + down_write(&ls->ls_root_sem); + if (!list_empty(&ls->ls_root_list)) { + log_error(ls, "root list not empty"); + error = -EINVAL; + goto out; + } + + for (i = 0; i < ls->ls_rsbtbl_size; i++) { + read_lock(&ls->ls_rsbtbl[i].lock); + list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) { + list_add(&r->res_root_list, &ls->ls_root_list); + dlm_hold_rsb(r); + } + read_unlock(&ls->ls_rsbtbl[i].lock); + } + out: + up_write(&ls->ls_root_sem); + return error; +} + +void dlm_release_root_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *safe; + + down_write(&ls->ls_root_sem); + list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) { + list_del_init(&r->res_root_list); + dlm_put_rsb(r); + } + up_write(&ls->ls_root_sem); +} + +void dlm_clear_toss_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *safe; + int i; + + for (i = 0; i < ls->ls_rsbtbl_size; i++) { + write_lock(&ls->ls_rsbtbl[i].lock); + list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss, + res_hashchain) { + list_del(&r->res_hashchain); + free_rsb(r); + } + write_unlock(&ls->ls_rsbtbl[i].lock); + } +} + diff -puN /dev/null drivers/dlm/recoverd.c --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/recoverd.c 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,714 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "lockspace.h" +#include "member.h" +#include "dir.h" +#include "ast.h" +#include "recover.h" +#include "lowcomms.h" +#include "lock.h" +#include "requestqueue.h" + +/* + * next_move actions + */ + +#define DO_STOP (1) +#define DO_START (2) +#define DO_FINISH (3) +#define DO_FINISH_STOP (4) +#define DO_FINISH_START (5) + +static void set_start_done(struct dlm_ls *ls, int event_id) +{ + spin_lock(&ls->ls_recover_lock); + ls->ls_startdone = event_id; + spin_unlock(&ls->ls_recover_lock); + + kobject_uevent(&ls->ls_kobj, KOBJ_CHANGE, NULL); +} + +static int enable_locking(struct dlm_ls *ls, int event_id) +{ + int error = 0; + + spin_lock(&ls->ls_recover_lock); + if (ls->ls_last_stop < event_id) { + set_bit(LSFL_LS_RUN, &ls->ls_flags); + up_write(&ls->ls_in_recovery); + } else { + error = -EINTR; + log_debug(ls, "enable_locking: abort %d", event_id); + } + spin_unlock(&ls->ls_recover_lock); + return error; +} + +static int ls_first_start(struct dlm_ls *ls, struct dlm_recover *rv) +{ + int error; + + log_debug(ls, "recover event %u (first)", rv->event_id); + + down(&ls->ls_recoverd_active); + + error = dlm_recover_members_first(ls, rv); + if (error) { + log_error(ls, "recover_members first failed %d", error); + goto out; + } + + error = dlm_recover_directory(ls); + if (error) { + log_error(ls, "recover_directory failed %d", error); + goto out; + } + + error = dlm_recover_directory_wait(ls); + if (error) { + log_error(ls, "recover_directory_wait failed %d", error); + goto out; + } + + log_debug(ls, "recover event %u done", rv->event_id); + set_start_done(ls, rv->event_id); + + out: + up(&ls->ls_recoverd_active); + return error; +} + +/* + * We are given here a new group of nodes which are in the lockspace. We first + * figure out the differences in ls membership from when we were last running. + * If nodes from before are gone, then there will be some lock recovery to do. + * If there are only nodes which have joined, then there's no lock recovery. + * + * Lockspace recovery for failed nodes must be completed before any nodes are + * allowed to join or leave the lockspace. + */ + +static int ls_reconfig(struct dlm_ls *ls, struct dlm_recover *rv) +{ + unsigned long start; + int error, neg = 0; + + log_debug(ls, "recover event %u", rv->event_id); + + down(&ls->ls_recoverd_active); + + /* + * Suspending and resuming dlm_astd ensures that no lkb's from this ls + * will be processed by dlm_astd during recovery. + */ + + dlm_astd_suspend(); + dlm_astd_resume(); + + /* + * This list of root rsb's will be the basis of most of the recovery + * routines. + */ + + dlm_create_root_list(ls); + + /* + * Free all the tossed rsb's so we don't have to recover them. + */ + + dlm_clear_toss_list(ls); + + /* + * Add or remove nodes from the lockspace's ls_nodes list. + * Also waits for all nodes to complete dlm_recover_members. + */ + + error = dlm_recover_members(ls, rv, &neg); + if (error) { + log_error(ls, "recover_members failed %d", error); + goto fail; + } + start = jiffies; + + /* + * Rebuild our own share of the directory by collecting from all other + * nodes their master rsb names that hash to us. + */ + + error = dlm_recover_directory(ls); + if (error) { + log_error(ls, "recover_directory failed %d", error); + goto fail; + } + + /* + * Purge directory-related requests that are saved in requestqueue. + * All dir requests from before recovery are invalid now due to the dir + * rebuild and will be resent by the requesting nodes. + */ + + dlm_purge_requestqueue(ls); + + /* + * Wait for all nodes to complete directory rebuild. + */ + + error = dlm_recover_directory_wait(ls); + if (error) { + log_error(ls, "recover_directory_wait failed %d", error); + goto fail; + } + + /* + * We may have outstanding operations that are waiting for a reply from + * a failed node. Mark these to be resent after recovery. Unlock and + * cancel ops can just be completed. + */ + + dlm_recover_waiters_pre(ls); + + error = dlm_recovery_stopped(ls); + if (error) + goto fail; + + if (neg) { + /* + * Clear lkb's for departed nodes. + */ + + dlm_purge_locks(ls); + + /* + * Get new master nodeid's for rsb's that were mastered on + * departed nodes. + */ + + error = dlm_recover_masters(ls); + if (error) { + log_error(ls, "recover_masters failed %d", error); + goto fail; + } + + /* + * Send our locks on remastered rsb's to the new masters. + */ + + error = dlm_recover_locks(ls); + if (error) { + log_error(ls, "recover_locks failed %d", error); + goto fail; + } + + error = dlm_recover_locks_wait(ls); + if (error) { + log_error(ls, "recover_locks_wait failed %d", error); + goto fail; + } + + /* + * Finalize state in master rsb's now that all locks can be + * checked. This includes conversion resolution and lvb + * settings. + */ + + dlm_recover_rsbs(ls); + } + + dlm_release_root_list(ls); + + log_debug(ls, "recover event %u done: %u ms", rv->event_id, + jiffies_to_msecs(jiffies - start)); + set_start_done(ls, rv->event_id); + up(&ls->ls_recoverd_active); + + return 0; + + fail: + log_debug(ls, "recover event %d error %d", rv->event_id, error); + up(&ls->ls_recoverd_active); + return error; +} + +/* + * Between calls to this routine for a ls, there can be multiple stop/start + * events where every start but the latest is cancelled by stops. There can + * only be a single finish because every finish requires us to call start_done. + * A single finish event could be followed by multiple stop/start events. This + * routine takes any combination of events and boils them down to one course of + * action. + */ + +static int next_move(struct dlm_ls *ls, struct dlm_recover **rv_out, + int *finish_out) +{ + LIST_HEAD(events); + unsigned int cmd = 0, stop, start, finish; + unsigned int last_stop, last_start, last_finish; + struct dlm_recover *rv = NULL, *start_rv = NULL; + + spin_lock(&ls->ls_recover_lock); + + stop = test_and_clear_bit(LSFL_LS_STOP, &ls->ls_flags) ? 1 : 0; + start = test_and_clear_bit(LSFL_LS_START, &ls->ls_flags) ? 1 : 0; + finish = test_and_clear_bit(LSFL_LS_FINISH, &ls->ls_flags) ? 1 : 0; + + last_stop = ls->ls_last_stop; + last_start = ls->ls_last_start; + last_finish = ls->ls_last_finish; + + while (!list_empty(&ls->ls_recover)) { + rv = list_entry(ls->ls_recover.next, struct dlm_recover, list); + list_del(&rv->list); + list_add_tail(&rv->list, &events); + } + + /* + * There are two cases where we need to adjust these event values: + * 1. - we get a first start + * - we get a stop + * - we process the start + stop here and notice this special case + * + * 2. - we get a first start + * - we process the start + * - we get a stop + * - we process the stop here and notice this special case + * + * In both cases, the first start we received was aborted by a + * stop before we received a finish. last_finish being zero is the + * indication that this is the "first" start, i.e. we've not yet + * finished a start; if we had, last_finish would be non-zero. + * Part of the problem arises from the fact that when we initially + * get start/stop/start, we may get the same event id for both starts + * (since the first was cancelled). + * + * In both cases, last_start and last_stop will be equal. + * In both cases, finish=0. + * In the first case start=1 && stop=1. + * In the second case start=0 && stop=1. + * + * In both cases, we need to make adjustments to values so: + * - we process the current event (now) as a normal stop + * - the next start we receive will be processed normally + * (taking into account the assertions below) + * + * In the first case, dlm_ls_start() will have printed the + * "repeated start" warning. + * + * In the first case we need to get rid of the recover event struct. + * + * - set stop=1, start=0, finish=0 for case 4 below + * - last_stop and last_start must be set equal per the case 4 assert + * - ls_last_stop = 0 so the next start will be larger + * - ls_last_start = 0 not really necessary (avoids dlm_ls_start print) + */ + + if (!last_finish && (last_start == last_stop)) { + log_debug(ls, "move reset %u,%u,%u ids %u,%u,%u", stop, + start, finish, last_stop, last_start, last_finish); + stop = 1; + start = 0; + finish = 0; + last_stop = 0; + last_start = 0; + ls->ls_last_stop = 0; + ls->ls_last_start = 0; + + while (!list_empty(&events)) { + rv = list_entry(events.next, struct dlm_recover, list); + list_del(&rv->list); + kfree(rv->nodeids); + kfree(rv); + } + } + spin_unlock(&ls->ls_recover_lock); + + log_debug(ls, "move flags %u,%u,%u ids %u,%u,%u", stop, start, finish, + last_stop, last_start, last_finish); + + /* + * Toss start events which have since been cancelled. + */ + + while (!list_empty(&events)) { + DLM_ASSERT(start,); + rv = list_entry(events.next, struct dlm_recover, list); + list_del(&rv->list); + + if (rv->event_id <= last_stop) { + log_debug(ls, "move skip event %u", rv->event_id); + kfree(rv->nodeids); + kfree(rv); + rv = NULL; + } else { + log_debug(ls, "move use event %u", rv->event_id); + DLM_ASSERT(!start_rv,); + start_rv = rv; + } + } + + /* + * Eight possible combinations of events. + */ + + /* 0 */ + if (!stop && !start && !finish) { + DLM_ASSERT(!start_rv,); + cmd = 0; + goto out; + } + + /* 1 */ + if (!stop && !start && finish) { + DLM_ASSERT(!start_rv,); + DLM_ASSERT(last_start > last_stop,); + DLM_ASSERT(last_finish == last_start,); + cmd = DO_FINISH; + *finish_out = last_finish; + goto out; + } + + /* 2 */ + if (!stop && start && !finish) { + DLM_ASSERT(start_rv,); + DLM_ASSERT(last_start > last_stop,); + cmd = DO_START; + *rv_out = start_rv; + goto out; + } + + /* 3 */ + if (!stop && start && finish) { + DLM_ASSERT(0, printk("finish and start with no stop\n");); + } + + /* 4 */ + if (stop && !start && !finish) { + DLM_ASSERT(!start_rv,); + DLM_ASSERT(last_start == last_stop,); + cmd = DO_STOP; + goto out; + } + + /* 5 */ + if (stop && !start && finish) { + DLM_ASSERT(!start_rv,); + DLM_ASSERT(last_finish == last_start,); + DLM_ASSERT(last_stop == last_start,); + cmd = DO_FINISH_STOP; + *finish_out = last_finish; + goto out; + } + + /* 6 */ + if (stop && start && !finish) { + if (start_rv) { + DLM_ASSERT(last_start > last_stop,); + cmd = DO_START; + *rv_out = start_rv; + } else { + DLM_ASSERT(last_stop == last_start,); + cmd = DO_STOP; + } + goto out; + } + + /* 7 */ + if (stop && start && finish) { + if (start_rv) { + DLM_ASSERT(last_start > last_stop,); + DLM_ASSERT(last_start > last_finish,); + cmd = DO_FINISH_START; + *finish_out = last_finish; + *rv_out = start_rv; + } else { + DLM_ASSERT(last_start == last_stop,); + DLM_ASSERT(last_start > last_finish,); + cmd = DO_FINISH_STOP; + *finish_out = last_finish; + } + goto out; + } + + out: + return cmd; +} + +/* + * This function decides what to do given every combination of current + * lockspace state and next lockspace state. + */ + +static void do_ls_recovery(struct dlm_ls *ls) +{ + struct dlm_recover *rv = NULL; + int error, cur_state, next_state = 0, do_now, finish_event = 0; + + do_now = next_move(ls, &rv, &finish_event); + if (!do_now) + goto out; + + cur_state = ls->ls_state; + next_state = 0; + + DLM_ASSERT(!test_bit(LSFL_LS_RUN, &ls->ls_flags), + log_error(ls, "curstate=%d donow=%d", cur_state, do_now);); + + /* + * LSST_CLEAR - we're not in any recovery state. We can get a stop or + * a stop and start which equates with a START. + */ + + if (cur_state == LSST_CLEAR) { + switch (do_now) { + case DO_STOP: + next_state = LSST_WAIT_START; + break; + + case DO_START: + error = ls_reconfig(ls, rv); + if (error) + next_state = LSST_WAIT_START; + else + next_state = LSST_RECONFIG_DONE; + break; + + case DO_FINISH: /* invalid */ + case DO_FINISH_STOP: /* invalid */ + case DO_FINISH_START: /* invalid */ + default: + DLM_ASSERT(0,); + } + goto out; + } + + /* + * LSST_WAIT_START - we're not running because of getting a stop or + * failing a start. We wait in this state for another stop/start or + * just the next start to begin another reconfig attempt. + */ + + if (cur_state == LSST_WAIT_START) { + switch (do_now) { + case DO_STOP: + break; + + case DO_START: + error = ls_reconfig(ls, rv); + if (error) + next_state = LSST_WAIT_START; + else + next_state = LSST_RECONFIG_DONE; + break; + + case DO_FINISH: /* invalid */ + case DO_FINISH_STOP: /* invalid */ + case DO_FINISH_START: /* invalid */ + default: + DLM_ASSERT(0,); + } + goto out; + } + + /* + * LSST_RECONFIG_DONE - we entered this state after successfully + * completing ls_reconfig and calling set_start_done. We expect to get + * a finish if everything goes ok. A finish could be followed by stop + * or stop/start before we get here to check it. Or a finish may never + * happen, only stop or stop/start. + */ + + if (cur_state == LSST_RECONFIG_DONE) { + switch (do_now) { + case DO_FINISH: + dlm_clear_members_finish(ls, finish_event); + next_state = LSST_CLEAR; + + error = enable_locking(ls, finish_event); + if (error) + break; + + error = dlm_process_requestqueue(ls); + if (error) + break; + + error = dlm_recover_waiters_post(ls); + if (error) + break; + + dlm_grant_after_purge(ls); + + dlm_astd_wake(); + + log_debug(ls, "recover event %u finished", finish_event); + break; + + case DO_STOP: + next_state = LSST_WAIT_START; + break; + + case DO_FINISH_STOP: + dlm_clear_members_finish(ls, finish_event); + next_state = LSST_WAIT_START; + break; + + case DO_FINISH_START: + dlm_clear_members_finish(ls, finish_event); + /* fall into DO_START */ + + case DO_START: + error = ls_reconfig(ls, rv); + if (error) + next_state = LSST_WAIT_START; + else + next_state = LSST_RECONFIG_DONE; + break; + + default: + DLM_ASSERT(0,); + } + goto out; + } + + /* + * LSST_INIT - state after ls is created and before it has been + * started. A start operation will cause the ls to be started for the + * first time. A failed start will cause to just wait in INIT for + * another stop/start. + */ + + if (cur_state == LSST_INIT) { + switch (do_now) { + case DO_START: + error = ls_first_start(ls, rv); + if (!error) + next_state = LSST_INIT_DONE; + break; + + case DO_STOP: + break; + + case DO_FINISH: /* invalid */ + case DO_FINISH_STOP: /* invalid */ + case DO_FINISH_START: /* invalid */ + default: + DLM_ASSERT(0,); + } + goto out; + } + + /* + * LSST_INIT_DONE - after the first start operation is completed + * successfully and set_start_done() called. If there are no errors, a + * finish will arrive next and we'll move to LSST_CLEAR. + */ + + if (cur_state == LSST_INIT_DONE) { + switch (do_now) { + case DO_STOP: + case DO_FINISH_STOP: + next_state = LSST_WAIT_START; + break; + + case DO_START: + case DO_FINISH_START: + error = ls_reconfig(ls, rv); + if (error) + next_state = LSST_WAIT_START; + else + next_state = LSST_RECONFIG_DONE; + break; + + case DO_FINISH: + next_state = LSST_CLEAR; + + enable_locking(ls, finish_event); + + dlm_process_requestqueue(ls); + + dlm_astd_wake(); + + log_debug(ls, "recover event %u finished", finish_event); + break; + + default: + DLM_ASSERT(0,); + } + goto out; + } + + out: + if (next_state) + ls->ls_state = next_state; + + if (rv) { + kfree(rv->nodeids); + kfree(rv); + } +} + +int dlm_recoverd(void *arg) +{ + struct dlm_ls *ls; + + ls = dlm_find_lockspace_local(arg); + + while (!kthread_should_stop()) { + set_current_state(TASK_INTERRUPTIBLE); + if (!test_bit(LSFL_WORK, &ls->ls_flags)) + schedule(); + set_current_state(TASK_RUNNING); + + if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags)) + do_ls_recovery(ls); + } + + dlm_put_lockspace(ls); + return 0; +} + +void dlm_recoverd_kick(struct dlm_ls *ls) +{ + set_bit(LSFL_WORK, &ls->ls_flags); + wake_up_process(ls->ls_recoverd_task); +} + +int dlm_recoverd_start(struct dlm_ls *ls) +{ + struct task_struct *p; + int error = 0; + + p = kthread_run(dlm_recoverd, ls, "dlm_recoverd"); + if (IS_ERR(p)) + error = PTR_ERR(p); + else + ls->ls_recoverd_task = p; + return error; +} + +void dlm_recoverd_stop(struct dlm_ls *ls) +{ + kthread_stop(ls->ls_recoverd_task); +} + +void dlm_recoverd_suspend(struct dlm_ls *ls) +{ + down(&ls->ls_recoverd_active); +} + +void dlm_recoverd_resume(struct dlm_ls *ls) +{ + up(&ls->ls_recoverd_active); +} + diff -puN /dev/null drivers/dlm/recoverd.h --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/recoverd.h 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,24 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#ifndef __RECOVERD_DOT_H__ +#define __RECOVERD_DOT_H__ + +void dlm_recoverd_kick(struct dlm_ls *ls); +void dlm_recoverd_stop(struct dlm_ls *ls); +int dlm_recoverd_start(struct dlm_ls *ls); +void dlm_recoverd_suspend(struct dlm_ls *ls); +void dlm_recoverd_resume(struct dlm_ls *ls); + +#endif /* __RECOVERD_DOT_H__ */ + diff -puN /dev/null drivers/dlm/recover.h --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/recover.h 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,32 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#ifndef __RECOVER_DOT_H__ +#define __RECOVER_DOT_H__ + +int dlm_recovery_stopped(struct dlm_ls *ls); +int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)); +int dlm_recover_masters(struct dlm_ls *ls); +int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc); +int dlm_recover_locks(struct dlm_ls *ls); +void dlm_recovered_lock(struct dlm_rsb *r); +int dlm_create_root_list(struct dlm_ls *ls); +void dlm_release_root_list(struct dlm_ls *ls); +void dlm_clear_toss_list(struct dlm_ls *ls); +void dlm_recover_rsbs(struct dlm_ls *ls); +int dlm_recover_members_wait(struct dlm_ls *ls); +int dlm_recover_directory_wait(struct dlm_ls *ls); +int dlm_recover_locks_wait(struct dlm_ls *ls); + +#endif /* __RECOVER_DOT_H__ */ + diff -puN /dev/null drivers/dlm/requestqueue.c --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/requestqueue.c 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,144 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#include "dlm_internal.h" +#include "member.h" +#include "lock.h" + +struct rq_entry { + struct list_head list; + int nodeid; + char request[1]; +}; + +/* + * Requests received while the lockspace is in recovery get added to the + * request queue and processed when recovery is complete. This happens when + * the lockspace is suspended on some nodes before it is on others, or the + * lockspace is enabled on some while still suspended on others. + */ + +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) +{ + struct rq_entry *e; + int length = hd->h_length; + + if (dlm_is_removed(ls, nodeid)) + return; + + e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); + if (!e) { + log_print("dlm_add_requestqueue: out of memory\n"); + return; + } + + e->nodeid = nodeid; + memcpy(e->request, hd, length); + + down(&ls->ls_requestqueue_lock); + list_add_tail(&e->list, &ls->ls_requestqueue); + up(&ls->ls_requestqueue_lock); +} + +int dlm_process_requestqueue(struct dlm_ls *ls) +{ + struct rq_entry *e; + struct dlm_header *hd; + int error = 0; + + down(&ls->ls_requestqueue_lock); + + for (;;) { + if (list_empty(&ls->ls_requestqueue)) { + up(&ls->ls_requestqueue_lock); + error = 0; + break; + } + e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); + up(&ls->ls_requestqueue_lock); + + hd = (struct dlm_header *) e->request; + error = dlm_receive_message(hd, e->nodeid, TRUE); + + if (error == -EINTR) { + /* entry is left on requestqueue */ + log_debug(ls, "process_requestqueue abort eintr"); + break; + } + + down(&ls->ls_requestqueue_lock); + list_del(&e->list); + kfree(e); + + if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) { + log_debug(ls, "process_requestqueue abort ls_run"); + up(&ls->ls_requestqueue_lock); + error = -EINTR; + break; + } + schedule(); + } + + return error; +} + +/* + * After recovery is done, locking is resumed and dlm_recoverd takes all the + * saved requests and processes them as they would have been by dlm_recvd. At + * the same time, dlm_recvd will start receiving new requests from remote + * nodes. We want to delay dlm_recvd processing new requests until + * dlm_recoverd has finished processing the old saved requests. + */ + +void dlm_wait_requestqueue(struct dlm_ls *ls) +{ + for (;;) { + down(&ls->ls_requestqueue_lock); + if (list_empty(&ls->ls_requestqueue)) + break; + if (!test_bit(LSFL_LS_RUN, &ls->ls_flags)) + break; + up(&ls->ls_requestqueue_lock); + schedule(); + } + up(&ls->ls_requestqueue_lock); +} + +/* + * Dir lookups and lookup replies send before recovery are invalid because the + * directory is rebuilt during recovery, so don't save any requests of this + * type. Don't save any requests from a node that's being removed either. + */ + +void dlm_purge_requestqueue(struct dlm_ls *ls) +{ + struct dlm_message *ms; + struct rq_entry *e, *safe; + uint32_t mstype; + + down(&ls->ls_requestqueue_lock); + list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { + + ms = (struct dlm_message *) e->request; + mstype = ms->m_type; + + if (dlm_is_removed(ls, e->nodeid) || + mstype == DLM_MSG_REMOVE || + mstype == DLM_MSG_LOOKUP || + mstype == DLM_MSG_LOOKUP_REPLY) { + list_del(&e->list); + kfree(e); + } + } + up(&ls->ls_requestqueue_lock); +} + diff -puN /dev/null drivers/dlm/requestqueue.h --- /dev/null 2003-09-15 06:40:47.000000000 -0700 +++ 25-akpm/drivers/dlm/requestqueue.h 2005-05-17 00:39:20.000000000 -0700 @@ -0,0 +1,22 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +#ifndef __REQUESTQUEUE_DOT_H__ +#define __REQUESTQUEUE_DOT_H__ + +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); +int dlm_process_requestqueue(struct dlm_ls *ls); +void dlm_wait_requestqueue(struct dlm_ls *ls); +void dlm_purge_requestqueue(struct dlm_ls *ls); + +#endif + _