Linux kernel & device driver programming

Cross-Referenced Linux and Device Driver Code

[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ]
Version: [ 2.6.11.8 ] [ 2.6.25 ] [ 2.6.25.8 ] [ 2.6.31.13 ] Architecture: [ i386 ]
  1 /******************************************************************************
  2 *******************************************************************************
  3 **
  4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
  5 **
  6 **  This copyrighted material is made available to anyone wishing to use,
  7 **  modify, copy, or redistribute it subject to the terms and conditions
  8 **  of the GNU General Public License v.2.
  9 **
 10 *******************************************************************************
 11 ******************************************************************************/
 12 
 13 /* Central locking logic has four stages:
 14 
 15    dlm_lock()
 16    dlm_unlock()
 17 
 18    request_lock(ls, lkb)
 19    convert_lock(ls, lkb)
 20    unlock_lock(ls, lkb)
 21    cancel_lock(ls, lkb)
 22 
 23    _request_lock(r, lkb)
 24    _convert_lock(r, lkb)
 25    _unlock_lock(r, lkb)
 26    _cancel_lock(r, lkb)
 27 
 28    do_request(r, lkb)
 29    do_convert(r, lkb)
 30    do_unlock(r, lkb)
 31    do_cancel(r, lkb)
 32 
 33    Stage 1 (lock, unlock) is mainly about checking input args and
 34    splitting into one of the four main operations:
 35 
 36        dlm_lock          = request_lock
 37        dlm_lock+CONVERT  = convert_lock
 38        dlm_unlock        = unlock_lock
 39        dlm_unlock+CANCEL = cancel_lock
 40 
 41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
 42    provided to the next stage.
 43 
 44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
 45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
 46 
 47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
 48    given rsb and lkb and queues callbacks.
 49 
 50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
 51    function being executed on the remote node.  The connecting send/receive
 52    calls on local (L) and remote (R) nodes:
 53 
 54    L: send_xxxx()              ->  R: receive_xxxx()
 55                                    R: do_xxxx()
 56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 57 */
 58 #include <linux/types.h>
 59 #include "dlm_internal.h"
 60 #include <linux/dlm_device.h>
 61 #include "memory.h"
 62 #include "lowcomms.h"
 63 #include "requestqueue.h"
 64 #include "util.h"
 65 #include "dir.h"
 66 #include "member.h"
 67 #include "lockspace.h"
 68 #include "ast.h"
 69 #include "lock.h"
 70 #include "rcom.h"
 71 #include "recover.h"
 72 #include "lvb_table.h"
 73 #include "user.h"
 74 #include "config.h"
 75 
 76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
 77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
 78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
 80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
 81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
 82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
 83 static int send_remove(struct dlm_rsb *r);
 84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 87                                     struct dlm_message *ms);
 88 static int receive_extralen(struct dlm_message *ms);
 89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
 90 static void del_timeout(struct dlm_lkb *lkb);
 91 
 92 /*
 93  * Lock compatibilty matrix - thanks Steve
 94  * UN = Unlocked state. Not really a state, used as a flag
 95  * PD = Padding. Used to make the matrix a nice power of two in size
 96  * Other states are the same as the VMS DLM.
 97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
 98  */
 99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178 
179         dlm_print_rsb(r);
180 
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290 
291         del_timeout(lkb);
292 
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301 
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306 
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310         dlm_add_ast(lkb, AST_COMP, 0);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322 
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336 
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340 
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345 
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352 
353         return r;
354 }
355 
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361 
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368 
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375 
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381 
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390 
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392 
393         if (dlm_no_directory(ls))
394                 goto out;
395 
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410 
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         spin_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         spin_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420 
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434 
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r = NULL, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441 
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444 
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447 
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451 
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455 
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458 
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462 
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467 
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472 
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480 
481         spin_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496 
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499 
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504 
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509 
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514 
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524 
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527 
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532 
533         spin_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537 
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542 
543 /* See comment for unhold_lkb */
544 
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551 
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555 
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558 
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566 
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569 
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575 
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583 
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589 
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593 
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600 
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603 
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605 
606         /* counter can roll over so we must verify lkid is not in use */
607 
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610 
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619 
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623 
624         *lkb_ret = lkb;
625         return 0;
626 }
627 
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632 
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639 
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644 
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647 
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653 
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657 
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661 
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664 
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667 
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670 
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674 
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679 
680                 detach_lkb(lkb);
681 
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692 
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696 
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699 
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703 
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706 
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711 
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716 
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723 
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728 
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732 
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738 
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740 
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744 
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746 
747         lkb->lkb_timestamp = ktime_get();
748 
749         lkb->lkb_status = status;
750 
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774 
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781 
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789 
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806 
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809 
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814 
815         mutex_lock(&ls->ls_waiters_mutex);
816 
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822 
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837 
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843 
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847 
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860 
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865 
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871 
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878 
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885 
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888 
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895 
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903 
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914 
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917 
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922 
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926 
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932 
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939 
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941 
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949 
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954 
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960 
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963 
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968 
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976 
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980 
981         if (dlm_no_directory(r->res_ls))
982                 return;
983 
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991 
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994 
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999 
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011 
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016 
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020 
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030 
1031         return count;
1032 }
1033 
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037 
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045 
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049 
1050         if (is_master_copy(lkb))
1051                 return;
1052 
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061 
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069 
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073 
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081 
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087 
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094 
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098 
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103 
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106 
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110 
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114 
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121 
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124 
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128 
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136 
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145 
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151 
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154 
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159 
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166 
1167 /* lkb is master or local copy */
1168 
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172 
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176 
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178 
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182 
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185 
1186                 if (!r->res_lvbptr)
1187                         return;
1188 
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191 
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197 
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200 
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203 
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206 
1207                 if (!r->res_lvbptr)
1208                         return;
1209 
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215 
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219 
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224 
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229 
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232 
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235 
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238 
1239         if (!r->res_lvbptr)
1240                 return;
1241 
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246 
1247 /* lkb is process copy (pc) */
1248 
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253 
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256 
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259 
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269 
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275 
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279 
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288 
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294 
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299 
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303 
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307 
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309 
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330 
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335 
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345 
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348 
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355 
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362 
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366 
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375 
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383 
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391 
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397 
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400 
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409 
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419 
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426 
1427         return 0;
1428 }
1429 
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431 
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435 
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444 
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485 
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490 
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496 
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508 
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521 
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525 
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544 
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547 
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552 
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555 
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561 
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564 
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587 
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590 
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595 
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598 
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604 
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607 
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618 
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622 
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630 
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637 
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644 
1645         if (err)
1646                 *err = 0;
1647 
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651 
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657 
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674 
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681 
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686 
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698 
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705 
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708 
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714 
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720 
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724 
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730 
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737 
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744 
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746 
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750 
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757 
1758         return max_t(int, high, hi);
1759 }
1760 
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764 
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774 
1775         return high;
1776 }
1777 
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782 
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790 
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796 
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802 
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804 
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807 
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810 
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816 
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828 
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837 
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842 
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847 
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                         queue_bast(r, gr, lkb->lkb_rqmode);
1851                         gr->lkb_highbast = lkb->lkb_rqmode;
1852                 }
1853         }
1854 }
1855 
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858         send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860 
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863         send_bast_queue(r, &r->res_grantqueue, lkb);
1864         send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866 
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868 
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874 
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879 
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885 
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888         struct dlm_ls *ls = r->res_ls;
1889         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890 
1891         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                 r->res_first_lkid = lkb->lkb_id;
1894                 lkb->lkb_nodeid = r->res_nodeid;
1895                 return 0;
1896         }
1897 
1898         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                 return 1;
1901         }
1902 
1903         if (r->res_nodeid == 0) {
1904                 lkb->lkb_nodeid = 0;
1905                 return 0;
1906         }
1907 
1908         if (r->res_nodeid > 0) {
1909                 lkb->lkb_nodeid = r->res_nodeid;
1910                 return 0;
1911         }
1912 
1913         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914 
1915         dir_nodeid = dlm_dir_nodeid(r);
1916 
1917         if (dir_nodeid != our_nodeid) {
1918                 r->res_first_lkid = lkb->lkb_id;
1919                 send_lookup(r, lkb);
1920                 return 1;
1921         }
1922 
1923         for (i = 0; i < 2; i++) {
1924                 /* It's possible for dlm_scand to remove an old rsb for
1925                    this same resource from the toss list, us to create
1926                    a new one, look up the master locally, and find it
1927                    already exists just before dlm_scand does the
1928                    dir_remove() on the previous rsb. */
1929 
1930                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                        r->res_length, &ret_nodeid);
1932                 if (!error)
1933                         break;
1934                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                 schedule();
1936         }
1937         if (error && error != -EEXIST)
1938                 return error;
1939 
1940         if (ret_nodeid == our_nodeid) {
1941                 r->res_first_lkid = 0;
1942                 r->res_nodeid = 0;
1943                 lkb->lkb_nodeid = 0;
1944         } else {
1945                 r->res_first_lkid = lkb->lkb_id;
1946                 r->res_nodeid = ret_nodeid;
1947                 lkb->lkb_nodeid = ret_nodeid;
1948         }
1949         return 0;
1950 }
1951 
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954         struct dlm_lkb *lkb, *safe;
1955 
1956         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                 list_del_init(&lkb->lkb_rsb_lookup);
1958                 _request_lock(r, lkb);
1959                 schedule();
1960         }
1961 }
1962 
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964 
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967         struct dlm_lkb *lkb;
1968 
1969         if (!r->res_first_lkid)
1970                 return;
1971 
1972         switch (error) {
1973         case 0:
1974         case -EINPROGRESS:
1975                 r->res_first_lkid = 0;
1976                 process_lookup_list(r);
1977                 break;
1978 
1979         case -EAGAIN:
1980         case -EBADR:
1981         case -ENOTBLK:
1982                 /* the remote request failed and won't be retried (it was
1983                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                    lkb the first_lkid */
1985 
1986                 r->res_first_lkid = 0;
1987 
1988                 if (!list_empty(&r->res_lookup)) {
1989                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                          lkb_rsb_lookup);
1991                         list_del_init(&lkb->lkb_rsb_lookup);
1992                         r->res_first_lkid = lkb->lkb_id;
1993                         _request_lock(r, lkb);
1994                 }
1995                 break;
1996 
1997         default:
1998                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1999         }
2000 }
2001 
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                          int namelen, unsigned long timeout_cs,
2004                          void (*ast) (void *astparam),
2005                          void *astparam,
2006                          void (*bast) (void *astparam, int mode),
2007                          struct dlm_args *args)
2008 {
2009         int rv = -EINVAL;
2010 
2011         /* check for invalid arg usage */
2012 
2013         if (mode < 0 || mode > DLM_LOCK_EX)
2014                 goto out;
2015 
2016         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                 goto out;
2018 
2019         if (flags & DLM_LKF_CANCEL)
2020                 goto out;
2021 
2022         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                 goto out;
2024 
2025         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027 
2028         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                 goto out;
2030 
2031         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                 goto out;
2033 
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                 goto out;
2036 
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                 goto out;
2039 
2040         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                 goto out;
2042 
2043         if (!ast || !lksb)
2044                 goto out;
2045 
2046         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                 goto out;
2048 
2049         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                 goto out;
2051 
2052         /* these args will be copied to the lkb in validate_lock_args,
2053            it cannot be done now because when converting locks, fields in
2054            an active lkb cannot be modified before locking the rsb */
2055 
2056         args->flags = flags;
2057         args->astfn = ast;
2058         args->astparam = astparam;
2059         args->bastfn = bast;
2060         args->timeout = timeout_cs;
2061         args->mode = mode;
2062         args->lksb = lksb;
2063         rv = 0;
2064  out:
2065         return rv;
2066 }
2067 
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                       DLM_LKF_FORCEUNLOCK))
2072                 return -EINVAL;
2073 
2074         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                 return -EINVAL;
2076 
2077         args->flags = flags;
2078         args->astparam = astarg;
2079         return 0;
2080 }
2081 
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                               struct dlm_args *args)
2084 {
2085         int rv = -EINVAL;
2086 
2087         if (args->flags & DLM_LKF_CONVERT) {
2088                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                         goto out;
2090 
2091                 if (args->flags & DLM_LKF_QUECVT &&
2092                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                         goto out;
2094 
2095                 rv = -EBUSY;
2096                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                         goto out;
2098 
2099                 if (lkb->lkb_wait_type)
2100                         goto out;
2101 
2102                 if (is_overlap(lkb))
2103                         goto out;
2104         }
2105 
2106         lkb->lkb_exflags = args->flags;
2107         lkb->lkb_sbflags = 0;
2108         lkb->lkb_astfn = args->astfn;
2109         lkb->lkb_astparam = args->astparam;
2110         lkb->lkb_bastfn = args->bastfn;
2111         lkb->lkb_rqmode = args->mode;
2112         lkb->lkb_lksb = args->lksb;
2113         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114         lkb->lkb_ownpid = (int) current->pid;
2115         lkb->lkb_timeout_cs = args->timeout;
2116         rv = 0;
2117  out:
2118         if (rv)
2119                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                           lkb->lkb_status, lkb->lkb_wait_type,
2122                           lkb->lkb_resource->res_name);
2123         return rv;
2124 }
2125 
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128 
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132 
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136         int rv = -EINVAL;
2137 
2138         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                 dlm_print_lkb(lkb);
2141                 goto out;
2142         }
2143 
2144         /* an lkb may still exist even though the lock is EOL'ed due to a
2145            cancel, unlock or failed noqueue request; an app can't use these
2146            locks; return same error as if the lkid had not been found at all */
2147 
2148         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                 rv = -ENOENT;
2151                 goto out;
2152         }
2153 
2154         /* an lkb may be waiting for an rsb lookup to complete where the
2155            lookup was initiated by another lock */
2156 
2157         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                         list_del_init(&lkb->lkb_rsb_lookup);
2161                         queue_cast(lkb->lkb_resource, lkb,
2162                                    args->flags & DLM_LKF_CANCEL ?
2163                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2164                         unhold_lkb(lkb); /* undoes create_lkb() */
2165                 }
2166                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                 rv = -EBUSY;
2168                 goto out;
2169         }
2170 
2171         /* cancel not allowed with another cancel/unlock in progress */
2172 
2173         if (args->flags & DLM_LKF_CANCEL) {
2174                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                         goto out;
2176 
2177                 if (is_overlap(lkb))
2178                         goto out;
2179 
2180                 /* don't let scand try to do a cancel */
2181                 del_timeout(lkb);
2182 
2183                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                         rv = -EBUSY;
2186                         goto out;
2187                 }
2188 
2189                 /* there's nothing to cancel */
2190                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                     !lkb->lkb_wait_type) {
2192                         rv = -EBUSY;
2193                         goto out;
2194                 }
2195 
2196                 switch (lkb->lkb_wait_type) {
2197                 case DLM_MSG_LOOKUP:
2198                 case DLM_MSG_REQUEST:
2199                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                         rv = -EBUSY;
2201                         goto out;
2202                 case DLM_MSG_UNLOCK:
2203                 case DLM_MSG_CANCEL:
2204                         goto out;
2205                 }
2206                 /* add_to_waiters() will set OVERLAP_CANCEL */
2207                 goto out_ok;
2208         }
2209 
2210         /* do we need to allow a force-unlock if there's a normal unlock
2211            already in progress?  in what conditions could the normal unlock
2212            fail such that we'd want to send a force-unlock to be sure? */
2213 
2214         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                         goto out;
2217 
2218                 if (is_overlap_unlock(lkb))
2219                         goto out;
2220 
2221                 /* don't let scand try to do a cancel */
2222                 del_timeout(lkb);
2223 
2224                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                         rv = -EBUSY;
2227                         goto out;
2228                 }
2229 
2230                 switch (lkb->lkb_wait_type) {
2231                 case DLM_MSG_LOOKUP:
2232                 case DLM_MSG_REQUEST:
2233                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                         rv = -EBUSY;
2235                         goto out;
2236                 case DLM_MSG_UNLOCK:
2237                         goto out;
2238                 }
2239                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                 goto out_ok;
2241         }
2242 
2243         /* normal unlock not allowed if there's any op in progress */
2244         rv = -EBUSY;
2245         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                 goto out;
2247 
2248  out_ok:
2249         /* an overlapping op shouldn't blow away exflags from other op */
2250         lkb->lkb_exflags |= args->flags;
2251         lkb->lkb_sbflags = 0;
2252         lkb->lkb_astparam = args->astparam;
2253         rv = 0;
2254  out:
2255         if (rv)
2256                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                           args->flags, lkb->lkb_wait_type,
2259                           lkb->lkb_resource->res_name);
2260         return rv;
2261 }
2262 
2263 /*
2264  * Four stage 4 varieties:
2265  * do_request(), do_convert(), do_unlock(), do_cancel()
2266  * These are called on the master node for the given lock and
2267  * from the central locking logic.
2268  */
2269 
2270 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271 {
2272         int error = 0;
2273 
2274         if (can_be_granted(r, lkb, 1, NULL)) {
2275                 grant_lock(r, lkb);
2276                 queue_cast(r, lkb, 0);
2277                 goto out;
2278         }
2279 
2280         if (can_be_queued(lkb)) {
2281                 error = -EINPROGRESS;
2282                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                 send_blocking_asts(r, lkb);
2284                 add_timeout(lkb);
2285                 goto out;
2286         }
2287 
2288         error = -EAGAIN;
2289         if (force_blocking_asts(lkb))
2290                 send_blocking_asts_all(r, lkb);
2291         queue_cast(r, lkb, -EAGAIN);
2292 
2293  out:
2294         return error;
2295 }
2296 
2297 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2298 {
2299         int error = 0;
2300         int deadlk = 0;
2301 
2302         /* changing an existing lock may allow others to be granted */
2303 
2304         if (can_be_granted(r, lkb, 1, &deadlk)) {
2305                 grant_lock(r, lkb);
2306                 queue_cast(r, lkb, 0);
2307                 grant_pending_locks(r);
2308                 goto out;
2309         }
2310 
2311         /* can_be_granted() detected that this lock would block in a conversion
2312            deadlock, so we leave it on the granted queue and return EDEADLK in
2313            the ast for the convert. */
2314 
2315         if (deadlk) {
2316                 /* it's left on the granted queue */
2317                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2318                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2319                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2320                 revert_lock(r, lkb);
2321                 queue_cast(r, lkb, -EDEADLK);
2322                 error = -EDEADLK;
2323                 goto out;
2324         }
2325 
2326         /* is_demoted() means the can_be_granted() above set the grmode
2327            to NL, and left us on the granted queue.  This auto-demotion
2328            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2329            now grantable.  We have to try to grant other converting locks
2330            before we try again to grant this one. */
2331 
2332         if (is_demoted(lkb)) {
2333                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2334                 if (_can_be_granted(r, lkb, 1)) {
2335                         grant_lock(r, lkb);
2336                         queue_cast(r, lkb, 0);
2337                         grant_pending_locks(r);
2338                         goto out;
2339                 }
2340                 /* else fall through and move to convert queue */
2341         }
2342 
2343         if (can_be_queued(lkb)) {
2344                 error = -EINPROGRESS;
2345                 del_lkb(r, lkb);
2346                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2347                 send_blocking_asts(r, lkb);
2348                 add_timeout(lkb);
2349                 goto out;
2350         }
2351 
2352         error = -EAGAIN;
2353         if (force_blocking_asts(lkb))
2354                 send_blocking_asts_all(r, lkb);
2355         queue_cast(r, lkb, -EAGAIN);
2356 
2357  out:
2358         return error;
2359 }
2360 
2361 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362 {
2363         remove_lock(r, lkb);
2364         queue_cast(r, lkb, -DLM_EUNLOCK);
2365         grant_pending_locks(r);
2366         return -DLM_EUNLOCK;
2367 }
2368 
2369 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2370  
2371 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372 {
2373         int error;
2374 
2375         error = revert_lock(r, lkb);
2376         if (error) {
2377                 queue_cast(r, lkb, -DLM_ECANCEL);
2378                 grant_pending_locks(r);
2379                 return -DLM_ECANCEL;
2380         }
2381         return 0;
2382 }
2383 
2384 /*
2385  * Four stage 3 varieties:
2386  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2387  */
2388 
2389 /* add a new lkb to a possibly new rsb, called by requesting process */
2390 
2391 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392 {
2393         int error;
2394 
2395         /* set_master: sets lkb nodeid from r */
2396 
2397         error = set_master(r, lkb);
2398         if (error < 0)
2399                 goto out;
2400         if (error) {
2401                 error = 0;
2402                 goto out;
2403         }
2404 
2405         if (is_remote(r))
2406                 /* receive_request() calls do_request() on remote node */
2407                 error = send_request(r, lkb);
2408         else
2409                 error = do_request(r, lkb);
2410  out:
2411         return error;
2412 }
2413 
2414 /* change some property of an existing lkb, e.g. mode */
2415 
2416 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2417 {
2418         int error;
2419 
2420         if (is_remote(r))
2421                 /* receive_convert() calls do_convert() on remote node */
2422                 error = send_convert(r, lkb);
2423         else
2424                 error = do_convert(r, lkb);
2425 
2426         return error;
2427 }
2428 
2429 /* remove an existing lkb from the granted queue */
2430 
2431 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2432 {
2433         int error;
2434 
2435         if (is_remote(r))
2436                 /* receive_unlock() calls do_unlock() on remote node */
2437                 error = send_unlock(r, lkb);
2438         else
2439                 error = do_unlock(r, lkb);
2440 
2441         return error;
2442 }
2443 
2444 /* remove an existing lkb from the convert or wait queue */
2445 
2446 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2447 {
2448         int error;
2449 
2450         if (is_remote(r))
2451                 /* receive_cancel() calls do_cancel() on remote node */
2452                 error = send_cancel(r, lkb);
2453         else
2454                 error = do_cancel(r, lkb);
2455 
2456         return error;
2457 }
2458 
2459 /*
2460  * Four stage 2 varieties:
2461  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2462  */
2463 
2464 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2465                         int len, struct dlm_args *args)
2466 {
2467         struct dlm_rsb *r;
2468         int error;
2469 
2470         error = validate_lock_args(ls, lkb, args);
2471         if (error)
2472                 goto out;
2473 
2474         error = find_rsb(ls, name, len, R_CREATE, &r);
2475         if (error)
2476                 goto out;
2477 
2478         lock_rsb(r);
2479 
2480         attach_lkb(r, lkb);
2481         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2482 
2483         error = _request_lock(r, lkb);
2484 
2485         unlock_rsb(r);
2486         put_rsb(r);
2487 
2488  out:
2489         return error;
2490 }
2491 
2492 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493                         struct dlm_args *args)
2494 {
2495         struct dlm_rsb *r;
2496         int error;
2497 
2498         r = lkb->lkb_resource;
2499 
2500         hold_rsb(r);
2501         lock_rsb(r);
2502 
2503         error = validate_lock_args(ls, lkb, args);
2504         if (error)
2505                 goto out;
2506 
2507         error = _convert_lock(r, lkb);
2508  out:
2509         unlock_rsb(r);
2510         put_rsb(r);
2511         return error;
2512 }
2513 
2514 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2515                        struct dlm_args *args)
2516 {
2517         struct dlm_rsb *r;
2518         int error;
2519 
2520         r = lkb->lkb_resource;
2521 
2522         hold_rsb(r);
2523         lock_rsb(r);
2524 
2525         error = validate_unlock_args(lkb, args);
2526         if (error)
2527                 goto out;
2528 
2529         error = _unlock_lock(r, lkb);
2530  out:
2531         unlock_rsb(r);
2532         put_rsb(r);
2533         return error;
2534 }
2535 
2536 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2537                        struct dlm_args *args)
2538 {
2539         struct dlm_rsb *r;
2540         int error;
2541 
2542         r = lkb->lkb_resource;
2543 
2544         hold_rsb(r);
2545         lock_rsb(r);
2546 
2547         error = validate_unlock_args(lkb, args);
2548         if (error)
2549                 goto out;
2550 
2551         error = _cancel_lock(r, lkb);
2552  out:
2553         unlock_rsb(r);
2554         put_rsb(r);
2555         return error;
2556 }
2557 
2558 /*
2559  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2560  */
2561 
2562 int dlm_lock(dlm_lockspace_t *lockspace,
2563              int mode,
2564              struct dlm_lksb *lksb,
2565              uint32_t flags,
2566              void *name,
2567              unsigned int namelen,
2568              uint32_t parent_lkid,
2569              void (*ast) (void *astarg),
2570              void *astarg,
2571              void (*bast) (void *astarg, int mode))
2572 {
2573         struct dlm_ls *ls;
2574         struct dlm_lkb *lkb;
2575         struct dlm_args args;
2576         int error, convert = flags & DLM_LKF_CONVERT;
2577 
2578         ls = dlm_find_lockspace_local(lockspace);
2579         if (!ls)
2580                 return -EINVAL;
2581 
2582         dlm_lock_recovery(ls);
2583 
2584         if (convert)
2585                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2586         else
2587                 error = create_lkb(ls, &lkb);
2588 
2589         if (error)
2590                 goto out;
2591 
2592         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2593                               astarg, bast, &args);
2594         if (error)
2595                 goto out_put;
2596 
2597         if (convert)
2598                 error = convert_lock(ls, lkb, &args);
2599         else
2600                 error = request_lock(ls, lkb, name, namelen, &args);
2601 
2602         if (error == -EINPROGRESS)
2603                 error = 0;
2604  out_put:
2605         if (convert || error)
2606                 __put_lkb(ls, lkb);
2607         if (error == -EAGAIN || error == -EDEADLK)
2608                 error = 0;
2609  out:
2610         dlm_unlock_recovery(ls);
2611         dlm_put_lockspace(ls);
2612         return error;
2613 }
2614 
2615 int dlm_unlock(dlm_lockspace_t *lockspace,
2616                uint32_t lkid,
2617                uint32_t flags,
2618                struct dlm_lksb *lksb,
2619                void *astarg)
2620 {
2621         struct dlm_ls *ls;
2622         struct dlm_lkb *lkb;
2623         struct dlm_args args;
2624         int error;
2625 
2626         ls = dlm_find_lockspace_local(lockspace);
2627         if (!ls)
2628                 return -EINVAL;
2629 
2630         dlm_lock_recovery(ls);
2631 
2632         error = find_lkb(ls, lkid, &lkb);
2633         if (error)
2634                 goto out;
2635 
2636         error = set_unlock_args(flags, astarg, &args);
2637         if (error)
2638                 goto out_put;
2639 
2640         if (flags & DLM_LKF_CANCEL)
2641                 error = cancel_lock(ls, lkb, &args);
2642         else
2643                 error = unlock_lock(ls, lkb, &args);
2644 
2645         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2646                 error = 0;
2647         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2648                 error = 0;
2649  out_put:
2650         dlm_put_lkb(lkb);
2651  out:
2652         dlm_unlock_recovery(ls);
2653         dlm_put_lockspace(ls);
2654         return error;
2655 }
2656 
2657 /*
2658  * send/receive routines for remote operations and replies
2659  *
2660  * send_args
2661  * send_common
2662  * send_request                 receive_request
2663  * send_convert                 receive_convert
2664  * send_unlock                  receive_unlock
2665  * send_cancel                  receive_cancel
2666  * send_grant                   receive_grant
2667  * send_bast                    receive_bast
2668  * send_lookup                  receive_lookup
2669  * send_remove                  receive_remove
2670  *
2671  *                              send_common_reply
2672  * receive_request_reply        send_request_reply
2673  * receive_convert_reply        send_convert_reply
2674  * receive_unlock_reply         send_unlock_reply
2675  * receive_cancel_reply         send_cancel_reply
2676  * receive_lookup_reply         send_lookup_reply
2677  */
2678 
2679 static int _create_message(struct dlm_ls *ls, int mb_len,
2680                            int to_nodeid, int mstype,
2681                            struct dlm_message **ms_ret,
2682                            struct dlm_mhandle **mh_ret)
2683 {
2684         struct dlm_message *ms;
2685         struct dlm_mhandle *mh;
2686         char *mb;
2687 
2688         /* get_buffer gives us a message handle (mh) that we need to
2689            pass into lowcomms_commit and a message buffer (mb) that we
2690            write our data into */
2691 
2692         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2693         if (!mh)
2694                 return -ENOBUFS;
2695 
2696         memset(mb, 0, mb_len);
2697 
2698         ms = (struct dlm_message *) mb;
2699 
2700         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2701         ms->m_header.h_lockspace = ls->ls_global_id;
2702         ms->m_header.h_nodeid = dlm_our_nodeid();
2703         ms->m_header.h_length = mb_len;
2704         ms->m_header.h_cmd = DLM_MSG;
2705 
2706         ms->m_type = mstype;
2707 
2708         *mh_ret = mh;
2709         *ms_ret = ms;
2710         return 0;
2711 }
2712 
2713 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2714                           int to_nodeid, int mstype,
2715                           struct dlm_message **ms_ret,
2716                           struct dlm_mhandle **mh_ret)
2717 {
2718         int mb_len = sizeof(struct dlm_message);
2719 
2720         switch (mstype) {
2721         case DLM_MSG_REQUEST:
2722         case DLM_MSG_LOOKUP:
2723         case DLM_MSG_REMOVE:
2724                 mb_len += r->res_length;
2725                 break;
2726         case DLM_MSG_CONVERT:
2727         case DLM_MSG_UNLOCK:
2728         case DLM_MSG_REQUEST_REPLY:
2729         case DLM_MSG_CONVERT_REPLY:
2730         case DLM_MSG_GRANT:
2731                 if (lkb && lkb->lkb_lvbptr)
2732                         mb_len += r->res_ls->ls_lvblen;
2733                 break;
2734         }
2735 
2736         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2737                                ms_ret, mh_ret);
2738 }
2739 
2740 /* further lowcomms enhancements or alternate implementations may make
2741    the return value from this function useful at some point */
2742 
2743 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2744 {
2745         dlm_message_out(ms);
2746         dlm_lowcomms_commit_buffer(mh);
2747         return 0;
2748 }
2749 
2750 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2751                       struct dlm_message *ms)
2752 {
2753         ms->m_nodeid   = lkb->lkb_nodeid;
2754         ms->m_pid      = lkb->lkb_ownpid;
2755         ms->m_lkid     = lkb->lkb_id;
2756         ms->m_remid    = lkb->lkb_remid;
2757         ms->m_exflags  = lkb->lkb_exflags;
2758         ms->m_sbflags  = lkb->lkb_sbflags;
2759         ms->m_flags    = lkb->lkb_flags;
2760         ms->m_lvbseq   = lkb->lkb_lvbseq;
2761         ms->m_status   = lkb->lkb_status;
2762         ms->m_grmode   = lkb->lkb_grmode;
2763         ms->m_rqmode   = lkb->lkb_rqmode;
2764         ms->m_hash     = r->res_hash;
2765 
2766         /* m_result and m_bastmode are set from function args,
2767            not from lkb fields */
2768 
2769         if (lkb->lkb_bastfn)
2770                 ms->m_asts |= AST_BAST;
2771         if (lkb->lkb_astfn)
2772                 ms->m_asts |= AST_COMP;
2773 
2774         /* compare with switch in create_message; send_remove() doesn't
2775            use send_args() */
2776 
2777         switch (ms->m_type) {
2778         case DLM_MSG_REQUEST:
2779         case DLM_MSG_LOOKUP:
2780                 memcpy(ms->m_extra, r->res_name, r->res_length);
2781                 break;
2782         case DLM_MSG_CONVERT:
2783         case DLM_MSG_UNLOCK:
2784         case DLM_MSG_REQUEST_REPLY:
2785         case DLM_MSG_CONVERT_REPLY:
2786         case DLM_MSG_GRANT:
2787                 if (!lkb->lkb_lvbptr)
2788                         break;
2789                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2790                 break;
2791         }
2792 }
2793 
2794 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2795 {
2796         struct dlm_message *ms;
2797         struct dlm_mhandle *mh;
2798         int to_nodeid, error;
2799 
2800         error = add_to_waiters(lkb, mstype);
2801         if (error)
2802                 return error;
2803 
2804         to_nodeid = r->res_nodeid;
2805 
2806         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2807         if (error)
2808                 goto fail;
2809 
2810         send_args(r, lkb, ms);
2811 
2812         error = send_message(mh, ms);
2813         if (error)
2814                 goto fail;
2815         return 0;
2816 
2817  fail:
2818         remove_from_waiters(lkb, msg_reply_type(mstype));
2819         return error;
2820 }
2821 
2822 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2823 {
2824         return send_common(r, lkb, DLM_MSG_REQUEST);
2825 }
2826 
2827 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2828 {
2829         int error;
2830 
2831         error = send_common(r, lkb, DLM_MSG_CONVERT);
2832 
2833         /* down conversions go without a reply from the master */
2834         if (!error && down_conversion(lkb)) {
2835                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2836                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2837                 r->res_ls->ls_stub_ms.m_result = 0;
2838                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2839                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2840         }
2841 
2842         return error;
2843 }
2844 
2845 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2846    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2847    that the master is still correct. */
2848 
2849 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2850 {
2851         return send_common(r, lkb, DLM_MSG_UNLOCK);
2852 }
2853 
2854 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856         return send_common(r, lkb, DLM_MSG_CANCEL);
2857 }
2858 
2859 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2860 {
2861         struct dlm_message *ms;
2862         struct dlm_mhandle *mh;
2863         int to_nodeid, error;
2864 
2865         to_nodeid = lkb->lkb_nodeid;
2866 
2867         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2868         if (error)
2869                 goto out;
2870 
2871         send_args(r, lkb, ms);
2872 
2873         ms->m_result = 0;
2874 
2875         error = send_message(mh, ms);
2876  out:
2877         return error;
2878 }
2879 
2880 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2881 {
2882         struct dlm_message *ms;
2883         struct dlm_mhandle *mh;
2884         int to_nodeid, error;
2885 
2886         to_nodeid = lkb->lkb_nodeid;
2887 
2888         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2889         if (error)
2890                 goto out;
2891 
2892         send_args(r, lkb, ms);
2893 
2894         ms->m_bastmode = mode;
2895 
2896         error = send_message(mh, ms);
2897  out:
2898         return error;
2899 }
2900 
2901 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 {
2903         struct dlm_message *ms;
2904         struct dlm_mhandle *mh;
2905         int to_nodeid, error;
2906 
2907         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2908         if (error)
2909                 return error;
2910 
2911         to_nodeid = dlm_dir_nodeid(r);
2912 
2913         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2914         if (error)
2915                 goto fail;
2916 
2917         send_args(r, lkb, ms);
2918 
2919         error = send_message(mh, ms);
2920         if (error)
2921                 goto fail;
2922         return 0;
2923 
2924  fail:
2925         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2926         return error;
2927 }
2928 
2929 static int send_remove(struct dlm_rsb *r)
2930 {
2931         struct dlm_message *ms;
2932         struct dlm_mhandle *mh;
2933         int to_nodeid, error;
2934 
2935         to_nodeid = dlm_dir_nodeid(r);
2936 
2937         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2938         if (error)
2939                 goto out;
2940 
2941         memcpy(ms->m_extra, r->res_name, r->res_length);
2942         ms->m_hash = r->res_hash;
2943 
2944         error = send_message(mh, ms);
2945  out:
2946         return error;
2947 }
2948 
2949 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2950                              int mstype, int rv)
2951 {
2952         struct dlm_message *ms;
2953         struct dlm_mhandle *mh;
2954         int to_nodeid, error;
2955 
2956         to_nodeid = lkb->lkb_nodeid;
2957 
2958         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2959         if (error)
2960                 goto out;
2961 
2962         send_args(r, lkb, ms);
2963 
2964         ms->m_result = rv;
2965 
2966         error = send_message(mh, ms);
2967  out:
2968         return error;
2969 }
2970 
2971 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2972 {
2973         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2974 }
2975 
2976 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2977 {
2978         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2979 }
2980 
2981 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2982 {
2983         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2984 }
2985 
2986 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2987 {
2988         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2989 }
2990 
2991 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2992                              int ret_nodeid, int rv)
2993 {
2994         struct dlm_rsb *r = &ls->ls_stub_rsb;
2995         struct dlm_message *ms;
2996         struct dlm_mhandle *mh;
2997         int error, nodeid = ms_in->m_header.h_nodeid;
2998 
2999         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3000         if (error)
3001                 goto out;
3002 
3003         ms->m_lkid = ms_in->m_lkid;
3004         ms->m_result = rv;
3005         ms->m_nodeid = ret_nodeid;
3006 
3007         error = send_message(mh, ms);
3008  out:
3009         return error;
3010 }
3011 
3012 /* which args we save from a received message depends heavily on the type
3013    of message, unlike the send side where we can safely send everything about
3014    the lkb for any type of message */
3015 
3016 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3017 {
3018         lkb->lkb_exflags = ms->m_exflags;
3019         lkb->lkb_sbflags = ms->m_sbflags;
3020         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3021                          (ms->m_flags & 0x0000FFFF);
3022 }
3023 
3024 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3025 {
3026         lkb->lkb_sbflags = ms->m_sbflags;
3027         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3028                          (ms->m_flags & 0x0000FFFF);
3029 }
3030 
3031 static int receive_extralen(struct dlm_message *ms)
3032 {
3033         return (ms->m_header.h_length - sizeof(struct dlm_message));
3034 }
3035 
3036 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3037                        struct dlm_message *ms)
3038 {
3039         int len;
3040 
3041         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3042                 if (!lkb->lkb_lvbptr)
3043                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3044                 if (!lkb->lkb_lvbptr)
3045                         return -ENOMEM;
3046                 len = receive_extralen(ms);
3047                 if (len > DLM_RESNAME_MAXLEN)
3048                         len = DLM_RESNAME_MAXLEN;
3049                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3050         }
3051         return 0;
3052 }
3053 
3054 static void fake_bastfn(void *astparam, int mode)
3055 {
3056         log_print("fake_bastfn should not be called");
3057 }
3058 
3059 static void fake_astfn(void *astparam)
3060 {
3061         log_print("fake_astfn should not be called");
3062 }
3063 
3064 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3065                                 struct dlm_message *ms)
3066 {
3067         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3068         lkb->lkb_ownpid = ms->m_pid;
3069         lkb->lkb_remid = ms->m_lkid;
3070         lkb->lkb_grmode = DLM_LOCK_IV;
3071         lkb->lkb_rqmode = ms->m_rqmode;
3072 
3073         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3074         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3075 
3076         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3077                 /* lkb was just created so there won't be an lvb yet */
3078                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3079                 if (!lkb->lkb_lvbptr)
3080                         return -ENOMEM;
3081         }
3082 
3083         return 0;
3084 }
3085 
3086 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3087                                 struct dlm_message *ms)
3088 {
3089         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3090                 return -EBUSY;
3091 
3092         if (receive_lvb(ls, lkb, ms))
3093                 return -ENOMEM;
3094 
3095         lkb->lkb_rqmode = ms->m_rqmode;
3096         lkb->lkb_lvbseq = ms->m_lvbseq;
3097 
3098         return 0;
3099 }
3100 
3101 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3102                                struct dlm_message *ms)
3103 {
3104         if (receive_lvb(ls, lkb, ms))
3105                 return -ENOMEM;
3106         return 0;
3107 }
3108 
3109 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3110    uses to send a reply and that the remote end uses to process the reply. */
3111 
3112 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3113 {
3114         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3115         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3116         lkb->lkb_remid = ms->m_lkid;
3117 }
3118 
3119 /* This is called after the rsb is locked so that we can safely inspect
3120    fields in the lkb. */
3121 
3122 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3123 {
3124         int from = ms->m_header.h_nodeid;
3125         int error = 0;
3126 
3127         switch (ms->m_type) {
3128         case DLM_MSG_CONVERT:
3129         case DLM_MSG_UNLOCK:
3130         case DLM_MSG_CANCEL:
3131                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3132                         error = -EINVAL;
3133                 break;
3134 
3135         case DLM_MSG_CONVERT_REPLY:
3136         case DLM_MSG_UNLOCK_REPLY:
3137         case DLM_MSG_CANCEL_REPLY:
3138         case DLM_MSG_GRANT:
3139         case DLM_MSG_BAST:
3140                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3141                         error = -EINVAL;
3142                 break;
3143 
3144         case DLM_MSG_REQUEST_REPLY:
3145                 if (!is_process_copy(lkb))
3146                         error = -EINVAL;
3147                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3148                         error = -EINVAL;
3149                 break;
3150 
3151         default:
3152                 error = -EINVAL;
3153         }
3154 
3155         if (error)
3156                 log_error(lkb->lkb_resource->res_ls,
3157                           "ignore invalid message %d from %d %x %x %x %d",
3158                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3159                           lkb->lkb_flags, lkb->lkb_nodeid);
3160         return error;
3161 }
3162 
3163 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165         struct dlm_lkb *lkb;
3166         struct dlm_rsb *r;
3167         int error, namelen;
3168 
3169         error = create_lkb(ls, &lkb);
3170         if (error)
3171                 goto fail;
3172 
3173         receive_flags(lkb, ms);
3174         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3175         error = receive_request_args(ls, lkb, ms);
3176         if (error) {
3177                 __put_lkb(ls, lkb);
3178                 goto fail;
3179         }
3180 
3181         namelen = receive_extralen(ms);
3182 
3183         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3184         if (error) {
3185                 __put_lkb(ls, lkb);
3186                 goto fail;
3187         }
3188 
3189         lock_rsb(r);
3190 
3191         attach_lkb(r, lkb);
3192         error = do_request(r, lkb);
3193         send_request_reply(r, lkb, error);
3194 
3195         unlock_rsb(r);
3196         put_rsb(r);
3197 
3198         if (error == -EINPROGRESS)
3199                 error = 0;
3200         if (error)
3201                 dlm_put_lkb(lkb);
3202         return;
3203 
3204  fail:
3205         setup_stub_lkb(ls, ms);
3206         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3207 }
3208 
3209 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3210 {
3211         struct dlm_lkb *lkb;
3212         struct dlm_rsb *r;
3213         int error, reply = 1;
3214 
3215         error = find_lkb(ls, ms->m_remid, &lkb);
3216         if (error)
3217                 goto fail;
3218 
3219         r = lkb->lkb_resource;
3220 
3221         hold_rsb(r);
3222         lock_rsb(r);
3223 
3224         error = validate_message(lkb, ms);
3225         if (error)
3226                 goto out;
3227 
3228         receive_flags(lkb, ms);
3229         error = receive_convert_args(ls, lkb, ms);
3230         if (error)
3231                 goto out_reply;
3232         reply = !down_conversion(lkb);
3233 
3234         error = do_convert(r, lkb);
3235  out_reply:
3236         if (reply)
3237                 send_convert_reply(r, lkb, error);
3238  out:
3239         unlock_rsb(r);
3240         put_rsb(r);
3241         dlm_put_lkb(lkb);
3242         return;
3243 
3244  fail:
3245         setup_stub_lkb(ls, ms);
3246         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3247 }
3248 
3249 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3250 {
3251         struct dlm_lkb *lkb;
3252         struct dlm_rsb *r;
3253         int error;
3254 
3255         error = find_lkb(ls, ms->m_remid, &lkb);
3256         if (error)
3257                 goto fail;
3258 
3259         r = lkb->lkb_resource;
3260 
3261         hold_rsb(r);
3262         lock_rsb(r);
3263 
3264         error = validate_message(lkb, ms);
3265         if (error)
3266                 goto out;
3267 
3268         receive_flags(lkb, ms);
3269         error = receive_unlock_args(ls, lkb, ms);
3270         if (error)
3271                 goto out_reply;
3272 
3273         error = do_unlock(r, lkb);
3274  out_reply:
3275         send_unlock_reply(r, lkb, error);
3276  out:
3277         unlock_rsb(r);
3278         put_rsb(r);
3279         dlm_put_lkb(lkb);
3280         return;
3281 
3282  fail:
3283         setup_stub_lkb(ls, ms);
3284         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3285 }
3286 
3287 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3288 {
3289         struct dlm_lkb *lkb;
3290         struct dlm_rsb *r;
3291         int error;
3292 
3293         error = find_lkb(ls, ms->m_remid, &lkb);
3294         if (error)
3295                 goto fail;
3296 
3297         receive_flags(lkb, ms);
3298 
3299         r = lkb->lkb_resource;
3300 
3301         hold_rsb(r);
3302         lock_rsb(r);
3303 
3304         error = validate_message(lkb, ms);
3305         if (error)
3306                 goto out;
3307 
3308         error = do_cancel(r, lkb);
3309         send_cancel_reply(r, lkb, error);
3310  out:
3311         unlock_rsb(r);
3312         put_rsb(r);
3313         dlm_put_lkb(lkb);
3314         return;
3315 
3316  fail:
3317         setup_stub_lkb(ls, ms);
3318         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3319 }
3320 
3321 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3322 {
3323         struct dlm_lkb *lkb;
3324         struct dlm_rsb *r;
3325         int error;
3326 
3327         error = find_lkb(ls, ms->m_remid, &lkb);
3328         if (error) {
3329                 log_debug(ls, "receive_grant from %d no lkb %x",
3330                           ms->m_header.h_nodeid, ms->m_remid);
3331                 return;
3332         }
3333 
3334         r = lkb->lkb_resource;
3335 
3336         hold_rsb(r);
3337         lock_rsb(r);
3338 
3339         error = validate_message(lkb, ms);
3340         if (error)
3341                 goto out;
3342 
3343         receive_flags_reply(lkb, ms);
3344         if (is_altmode(lkb))
3345                 munge_altmode(lkb, ms);
3346         grant_lock_pc(r, lkb, ms);
3347         queue_cast(r, lkb, 0);
3348  out:
3349         unlock_rsb(r);
3350         put_rsb(r);
3351         dlm_put_lkb(lkb);
3352 }
3353 
3354 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3355 {
3356         struct dlm_lkb *lkb;
3357         struct dlm_rsb *r;
3358         int error;
3359 
3360         error = find_lkb(ls, ms->m_remid, &lkb);
3361         if (error) {
3362                 log_debug(ls, "receive_bast from %d no lkb %x",
3363                           ms->m_header.h_nodeid, ms->m_remid);
3364                 return;
3365         }
3366 
3367         r = lkb->lkb_resource;
3368 
3369         hold_rsb(r);
3370         lock_rsb(r);
3371 
3372         error = validate_message(lkb, ms);
3373         if (error)
3374                 goto out;
3375 
3376         queue_bast(r, lkb, ms->m_bastmode);
3377  out:
3378         unlock_rsb(r);
3379         put_rsb(r);
3380         dlm_put_lkb(lkb);
3381 }
3382 
3383 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3386 
3387         from_nodeid = ms->m_header.h_nodeid;
3388         our_nodeid = dlm_our_nodeid();
3389 
3390         len = receive_extralen(ms);
3391 
3392         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3393         if (dir_nodeid != our_nodeid) {
3394                 log_error(ls, "lookup dir_nodeid %d from %d",
3395                           dir_nodeid, from_nodeid);
3396                 error = -EINVAL;
3397                 ret_nodeid = -1;
3398                 goto out;
3399         }
3400 
3401         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3402 
3403         /* Optimization: we're master so treat lookup as a request */
3404         if (!error && ret_nodeid == our_nodeid) {
3405                 receive_request(ls, ms);
3406                 return;
3407         }
3408  out:
3409         send_lookup_reply(ls, ms, ret_nodeid, error);
3410 }
3411 
3412 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3413 {
3414         int len, dir_nodeid, from_nodeid;
3415 
3416         from_nodeid = ms->m_header.h_nodeid;
3417 
3418         len = receive_extralen(ms);
3419 
3420         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3421         if (dir_nodeid != dlm_our_nodeid()) {
3422                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3423                           dir_nodeid, from_nodeid);
3424                 return;
3425         }
3426 
3427         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3428 }
3429 
3430 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3431 {
3432         do_purge(ls, ms->m_nodeid, ms->m_pid);
3433 }
3434 
3435 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3436 {
3437         struct dlm_lkb *lkb;
3438         struct dlm_rsb *r;
3439         int error, mstype, result;
3440 
3441         error = find_lkb(ls, ms->m_remid, &lkb);
3442         if (error) {
3443                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3444                           ms->m_header.h_nodeid, ms->m_remid);
3445                 return;
3446         }
3447 
3448         r = lkb->lkb_resource;
3449         hold_rsb(r);
3450         lock_rsb(r);
3451 
3452         error = validate_message(lkb, ms);
3453         if (error)
3454                 goto out;
3455 
3456         mstype = lkb->lkb_wait_type;
3457         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3458         if (error)
3459                 goto out;
3460 
3461         /* Optimization: the dir node was also the master, so it took our
3462            lookup as a request and sent request reply instead of lookup reply */
3463         if (mstype == DLM_MSG_LOOKUP) {
3464                 r->res_nodeid = ms->m_header.h_nodeid;
3465                 lkb->lkb_nodeid = r->res_nodeid;
3466         }
3467 
3468         /* this is the value returned from do_request() on the master */
3469         result = ms->m_result;
3470 
3471         switch (result) {
3472         case -EAGAIN:
3473                 /* request would block (be queued) on remote master */
3474                 queue_cast(r, lkb, -EAGAIN);
3475                 confirm_master(r, -EAGAIN);
3476                 unhold_lkb(lkb); /* undoes create_lkb() */
3477                 break;
3478 
3479         case -EINPROGRESS:
3480         case 0:
3481                 /* request was queued or granted on remote master */
3482                 receive_flags_reply(lkb, ms);
3483                 lkb->lkb_remid = ms->m_lkid;
3484                 if (is_altmode(lkb))
3485                         munge_altmode(lkb, ms);
3486                 if (result) {
3487                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3488                         add_timeout(lkb);
3489                 } else {
3490                         grant_lock_pc(r, lkb, ms);
3491                         queue_cast(r, lkb, 0);
3492                 }
3493                 confirm_master(r, result);
3494                 break;
3495 
3496         case -EBADR:
3497         case -ENOTBLK:
3498                 /* find_rsb failed to find rsb or rsb wasn't master */
3499                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3500                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3501                 r->res_nodeid = -1;
3502                 lkb->lkb_nodeid = -1;
3503 
3504                 if (is_overlap(lkb)) {
3505                         /* we'll ignore error in cancel/unlock reply */
3506                         queue_cast_overlap(r, lkb);
3507                         confirm_master(r, result);
3508                         unhold_lkb(lkb); /* undoes create_lkb() */
3509                 } else
3510                         _request_lock(r, lkb);
3511                 break;
3512 
3513         default:
3514                 log_error(ls, "receive_request_reply %x error %d",
3515                           lkb->lkb_id, result);
3516         }
3517 
3518         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3519                 log_debug(ls, "receive_request_reply %x result %d unlock",
3520                           lkb->lkb_id, result);
3521                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3522                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3523                 send_unlock(r, lkb);
3524         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3525                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3526                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3527                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3528                 send_cancel(r, lkb);
3529         } else {
3530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3531                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3532         }
3533  out:
3534         unlock_rsb(r);
3535         put_rsb(r);
3536         dlm_put_lkb(lkb);
3537 }
3538 
3539 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3540                                     struct dlm_message *ms)
3541 {
3542         /* this is the value returned from do_convert() on the master */
3543         switch (ms->m_result) {
3544         case -EAGAIN:
3545                 /* convert would block (be queued) on remote master */
3546                 queue_cast(r, lkb, -EAGAIN);
3547                 break;
3548 
3549         case -EDEADLK:
3550                 receive_flags_reply(lkb, ms);
3551                 revert_lock_pc(r, lkb);
3552                 queue_cast(r, lkb, -EDEADLK);
3553                 break;
3554 
3555         case -EINPROGRESS:
3556                 /* convert was queued on remote master */
3557                 receive_flags_reply(lkb, ms);
3558                 if (is_demoted(lkb))
3559                         munge_demoted(lkb, ms);
3560                 del_lkb(r, lkb);
3561                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3562                 add_timeout(lkb);
3563                 break;
3564 
3565         case 0:
3566                 /* convert was granted on remote master */
3567                 receive_flags_reply(lkb, ms);
3568                 if (is_demoted(lkb))
3569                         munge_demoted(lkb, ms);
3570                 grant_lock_pc(r, lkb, ms);
3571                 queue_cast(r, lkb, 0);
3572                 break;
3573 
3574         default:
3575                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3576                           lkb->lkb_id, ms->m_result);
3577         }
3578 }
3579 
3580 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3581 {
3582         struct dlm_rsb *r = lkb->lkb_resource;
3583         int error;
3584 
3585         hold_rsb(r);
3586         lock_rsb(r);
3587 
3588         error = validate_message(lkb, ms);
3589         if (error)
3590                 goto out;
3591 
3592         /* stub reply can happen with waiters_mutex held */
3593         error = remove_from_waiters_ms(lkb, ms);
3594         if (error)
3595                 goto out;
3596 
3597         __receive_convert_reply(r, lkb, ms);
3598  out:
3599         unlock_rsb(r);
3600         put_rsb(r);
3601 }
3602 
3603 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3604 {
3605         struct dlm_lkb *lkb;
3606         int error;
3607 
3608         error = find_lkb(ls, ms->m_remid, &lkb);
3609         if (error) {
3610                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3611                           ms->m_header.h_nodeid, ms->m_remid);
3612                 return;
3613         }
3614 
3615         _receive_convert_reply(lkb, ms);
3616         dlm_put_lkb(lkb);
3617 }
3618 
3619 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3620 {
3621         struct dlm_rsb *r = lkb->lkb_resource;
3622         int error;
3623 
3624         hold_rsb(r);
3625         lock_rsb(r);
3626 
3627         error = validate_message(lkb, ms);
3628         if (error)
3629                 goto out;
3630 
3631         /* stub reply can happen with waiters_mutex held */
3632         error = remove_from_waiters_ms(lkb, ms);
3633         if (error)
3634                 goto out;
3635 
3636         /* this is the value returned from do_unlock() on the master */
3637 
3638         switch (ms->m_result) {
3639         case -DLM_EUNLOCK:
3640                 receive_flags_reply(lkb, ms);
3641                 remove_lock_pc(r, lkb);
3642                 queue_cast(r, lkb, -DLM_EUNLOCK);
3643                 break;
3644         case -ENOENT:
3645                 break;
3646         default:
3647                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3648                           lkb->lkb_id, ms->m_result);
3649         }
3650  out:
3651         unlock_rsb(r);
3652         put_rsb(r);
3653 }
3654 
3655 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656 {
3657         struct dlm_lkb *lkb;
3658         int error;
3659 
3660         error = find_lkb(ls, ms->m_remid, &lkb);
3661         if (error) {
3662                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3663                           ms->m_header.h_nodeid, ms->m_remid);
3664                 return;
3665         }
3666 
3667         _receive_unlock_reply(lkb, ms);
3668         dlm_put_lkb(lkb);
3669 }
3670 
3671 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3672 {
3673         struct dlm_rsb *r = lkb->lkb_resource;
3674         int error;
3675 
3676         hold_rsb(r);
3677         lock_rsb(r);
3678 
3679         error = validate_message(lkb, ms);
3680         if (error)
3681                 goto out;
3682 
3683         /* stub reply can happen with waiters_mutex held */
3684         error = remove_from_waiters_ms(lkb, ms);
3685         if (error)
3686                 goto out;
3687 
3688         /* this is the value returned from do_cancel() on the master */
3689 
3690         switch (ms->m_result) {
3691         case -DLM_ECANCEL:
3692                 receive_flags_reply(lkb, ms);
3693                 revert_lock_pc(r, lkb);
3694                 queue_cast(r, lkb, -DLM_ECANCEL);
3695                 break;
3696         case 0:
3697                 break;
3698         default:
3699                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3700                           lkb->lkb_id, ms->m_result);
3701         }
3702  out:
3703         unlock_rsb(r);
3704         put_rsb(r);
3705 }
3706 
3707 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3708 {
3709         struct dlm_lkb *lkb;
3710         int error;
3711 
3712         error = find_lkb(ls, ms->m_remid, &lkb);
3713         if (error) {
3714                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3715                           ms->m_header.h_nodeid, ms->m_remid);
3716                 return;
3717         }
3718 
3719         _receive_cancel_reply(lkb, ms);
3720         dlm_put_lkb(lkb);
3721 }
3722 
3723 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3724 {
3725         struct dlm_lkb *lkb;
3726         struct dlm_rsb *r;
3727         int error, ret_nodeid;
3728 
3729         error = find_lkb(ls, ms->m_lkid, &lkb);
3730         if (error) {
3731                 log_error(ls, "receive_lookup_reply no lkb");
3732                 return;
3733         }
3734 
3735         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3736            FIXME: will a non-zero error ever be returned? */
3737 
3738         r = lkb->lkb_resource;
3739         hold_rsb(r);
3740         lock_rsb(r);
3741 
3742         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3743         if (error)
3744                 goto out;
3745 
3746         ret_nodeid = ms->m_nodeid;
3747         if (ret_nodeid == dlm_our_nodeid()) {
3748                 r->res_nodeid = 0;
3749                 ret_nodeid = 0;
3750                 r->res_first_lkid = 0;
3751         } else {
3752                 /* set_master() will copy res_nodeid to lkb_nodeid */
3753                 r->res_nodeid = ret_nodeid;
3754         }
3755 
3756         if (is_overlap(lkb)) {
3757                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3758                           lkb->lkb_id, lkb->lkb_flags);
3759                 queue_cast_overlap(r, lkb);
3760                 unhold_lkb(lkb); /* undoes create_lkb() */
3761                 goto out_list;
3762         }
3763 
3764         _request_lock(r, lkb);
3765 
3766  out_list:
3767         if (!ret_nodeid)
3768                 process_lookup_list(r);
3769  out:
3770         unlock_rsb(r);
3771         put_rsb(r);
3772         dlm_put_lkb(lkb);
3773 }
3774 
3775 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3776 {
3777         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3778                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3779                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3780                           ms->m_remid, ms->m_result);
3781                 return;
3782         }
3783 
3784         switch (ms->m_type) {
3785 
3786         /* messages sent to a master node */
3787 
3788         case DLM_MSG_REQUEST:
3789                 receive_request(ls, ms);
3790                 break;
3791 
3792         case DLM_MSG_CONVERT:
3793                 receive_convert(ls, ms);
3794                 break;
3795 
3796         case DLM_MSG_UNLOCK:
3797                 receive_unlock(ls, ms);
3798                 break;
3799 
3800         case DLM_MSG_CANCEL:
3801                 receive_cancel(ls, ms);
3802                 break;
3803 
3804         /* messages sent from a master node (replies to above) */
3805 
3806         case DLM_MSG_REQUEST_REPLY:
3807                 receive_request_reply(ls, ms);
3808                 break;
3809 
3810         case DLM_MSG_CONVERT_REPLY:
3811                 receive_convert_reply(ls, ms);
3812                 break;
3813 
3814         case DLM_MSG_UNLOCK_REPLY:
3815                 receive_unlock_reply(ls, ms);
3816                 break;
3817 
3818         case DLM_MSG_CANCEL_REPLY:
3819                 receive_cancel_reply(ls, ms);
3820                 break;
3821 
3822         /* messages sent from a master node (only two types of async msg) */
3823 
3824         case DLM_MSG_GRANT:
3825                 receive_grant(ls, ms);
3826                 break;
3827 
3828         case DLM_MSG_BAST:
3829                 receive_bast(ls, ms);
3830                 break;
3831 
3832         /* messages sent to a dir node */
3833 
3834         case DLM_MSG_LOOKUP:
3835                 receive_lookup(ls, ms);
3836                 break;
3837 
3838         case DLM_MSG_REMOVE:
3839                 receive_remove(ls, ms);
3840                 break;
3841 
3842         /* messages sent from a dir node (remove has no reply) */
3843 
3844         case DLM_MSG_LOOKUP_REPLY:
3845                 receive_lookup_reply(ls, ms);
3846                 break;
3847 
3848         /* other messages */
3849 
3850         case DLM_MSG_PURGE:
3851                 receive_purge(ls, ms);
3852                 break;
3853 
3854         default:
3855                 log_error(ls, "unknown message type %d", ms->m_type);
3856         }
3857 
3858         dlm_astd_wake();
3859 }
3860 
3861 /* If the lockspace is in recovery mode (locking stopped), then normal
3862    messages are saved on the requestqueue for processing after recovery is
3863    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3864    messages off the requestqueue before we process new ones. This occurs right
3865    after recovery completes when we transition from saving all messages on
3866    requestqueue, to processing all the saved messages, to processing new
3867    messages as they arrive. */
3868 
3869 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3870                                 int nodeid)
3871 {
3872         if (dlm_locking_stopped(ls)) {
3873                 dlm_add_requestqueue(ls, nodeid, ms);
3874         } else {
3875                 dlm_wait_requestqueue(ls);
3876                 _receive_message(ls, ms);
3877         }
3878 }
3879 
3880 /* This is called by dlm_recoverd to process messages that were saved on
3881    the requestqueue. */
3882 
3883 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3884 {
3885         _receive_message(ls, ms);
3886 }
3887 
3888 /* This is called by the midcomms layer when something is received for
3889    the lockspace.  It could be either a MSG (normal message sent as part of
3890    standard locking activity) or an RCOM (recovery message sent as part of
3891    lockspace recovery). */
3892 
3893 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3894 {
3895         struct dlm_header *hd = &p->header;
3896         struct dlm_ls *ls;
3897         int type = 0;
3898 
3899         switch (hd->h_cmd) {
3900         case DLM_MSG:
3901                 dlm_message_in(&p->message);
3902                 type = p->message.m_type;
3903                 break;
3904         case DLM_RCOM:
3905                 dlm_rcom_in(&p->rcom);
3906                 type = p->rcom.rc_type;
3907                 break;
3908         default:
3909                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3910                 return;
3911         }
3912 
3913         if (hd->h_nodeid != nodeid) {
3914                 log_print("invalid h_nodeid %d from %d lockspace %x",
3915                           hd->h_nodeid, nodeid, hd->h_lockspace);
3916                 return;
3917         }
3918 
3919         ls = dlm_find_lockspace_global(hd->h_lockspace);
3920         if (!ls) {
3921                 if (dlm_config.ci_log_debug)
3922                         log_print("invalid lockspace %x from %d cmd %d type %d",
3923                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3924 
3925                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3926                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3927                 return;
3928         }
3929 
3930         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3931            be inactive (in this ls) before transitioning to recovery mode */
3932 
3933         down_read(&ls->ls_recv_active);
3934         if (hd->h_cmd == DLM_MSG)
3935                 dlm_receive_message(ls, &p->message, nodeid);
3936         else
3937                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3938         up_read(&ls->ls_recv_active);
3939 
3940         dlm_put_lockspace(ls);
3941 }
3942 
3943 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3944 {
3945         if (middle_conversion(lkb)) {
3946                 hold_lkb(lkb);
3947                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3948                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3949                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3950                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3951                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3952 
3953                 /* Same special case as in receive_rcom_lock_args() */
3954                 lkb->lkb_grmode = DLM_LOCK_IV;
3955                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3956                 unhold_lkb(lkb);
3957 
3958         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3959                 lkb->lkb_flags |= DLM_IFL_RESEND;
3960         }
3961 
3962         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3963            conversions are async; there's no reply from the remote master */
3964 }
3965 
3966 /* A waiting lkb needs recovery if the master node has failed, or
3967    the master node is changing (only when no directory is used) */
3968 
3969 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3970 {
3971         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3972                 return 1;
3973 
3974         if (!dlm_no_directory(ls))
3975                 return 0;
3976 
3977         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3978                 return 1;
3979 
3980         return 0;
3981 }
3982 
3983 /* Recovery for locks that are waiting for replies from nodes that are now
3984    gone.  We can just complete unlocks and cancels by faking a reply from the
3985    dead node.  Requests and up-conversions we flag to be resent after
3986    recovery.  Down-conversions can just be completed with a fake reply like
3987    unlocks.  Conversions between PR and CW need special attention. */
3988 
3989 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3990 {
3991         struct dlm_lkb *lkb, *safe;
3992         int wait_type, stub_unlock_result, stub_cancel_result;
3993 
3994         mutex_lock(&ls->ls_waiters_mutex);
3995 
3996         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3997                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3998                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3999 
4000                 /* all outstanding lookups, regardless of destination  will be
4001                    resent after recovery is done */
4002 
4003                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4004                         lkb->lkb_flags |= DLM_IFL_RESEND;
4005                         continue;
4006                 }
4007 
4008                 if (!waiter_needs_recovery(ls, lkb))
4009                         continue;
4010 
4011                 wait_type = lkb->lkb_wait_type;
4012                 stub_unlock_result = -DLM_EUNLOCK;
4013                 stub_cancel_result = -DLM_ECANCEL;
4014 
4015                 /* Main reply may have been received leaving a zero wait_type,
4016                    but a reply for the overlapping op may not have been
4017                    received.  In that case we need to fake the appropriate
4018                    reply for the overlap op. */
4019 
4020                 if (!wait_type) {
4021                         if (is_overlap_cancel(lkb)) {
4022                                 wait_type = DLM_MSG_CANCEL;
4023                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4024                                         stub_cancel_result = 0;
4025                         }
4026                         if (is_overlap_unlock(lkb)) {
4027                                 wait_type = DLM_MSG_UNLOCK;
4028                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4029                                         stub_unlock_result = -ENOENT;
4030                         }
4031 
4032                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4033                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4034                                   stub_cancel_result, stub_unlock_result);
4035                 }
4036 
4037                 switch (wait_type) {
4038 
4039                 case DLM_MSG_REQUEST:
4040                         lkb->lkb_flags |= DLM_IFL_RESEND;
4041                         break;
4042 
4043                 case DLM_MSG_CONVERT:
4044                         recover_convert_waiter(ls, lkb);
4045                         break;
4046 
4047                 case DLM_MSG_UNLOCK:
4048                         hold_lkb(lkb);
4049                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4050                         ls->ls_stub_ms.m_result = stub_unlock_result;
4051                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4052                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4053                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4054                         dlm_put_lkb(lkb);
4055                         break;
4056 
4057                 case DLM_MSG_CANCEL:
4058                         hold_lkb(lkb);
4059                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4060                         ls->ls_stub_ms.m_result = stub_cancel_result;
4061                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4062                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4063                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4064                         dlm_put_lkb(lkb);
4065                         break;
4066 
4067                 default:
4068                         log_error(ls, "invalid lkb wait_type %d %d",
4069                                   lkb->lkb_wait_type, wait_type);
4070                 }
4071                 schedule();
4072         }
4073         mutex_unlock(&ls->ls_waiters_mutex);
4074 }
4075 
4076 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4077 {
4078         struct dlm_lkb *lkb;
4079         int found = 0;
4080 
4081         mutex_lock(&ls->ls_waiters_mutex);
4082         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4083                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4084                         hold_lkb(lkb);
4085                         found = 1;
4086                         break;
4087                 }
4088         }
4089         mutex_unlock(&ls->ls_waiters_mutex);
4090 
4091         if (!found)
4092                 lkb = NULL;
4093         return lkb;
4094 }
4095 
4096 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4097    master or dir-node for r.  Processing the lkb may result in it being placed
4098    back on waiters. */
4099 
4100 /* We do this after normal locking has been enabled and any saved messages
4101    (in requestqueue) have been processed.  We should be confident that at
4102    this point we won't get or process a reply to any of these waiting
4103    operations.  But, new ops may be coming in on the rsbs/locks here from
4104    userspace or remotely. */
4105 
4106 /* there may have been an overlap unlock/cancel prior to recovery or after
4107    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4108    overlap flag would just have been set and nothing new sent.  we can be
4109    confident here than any replies to either the initial op or overlap ops
4110    prior to recovery have been received. */
4111 
4112 int dlm_recover_waiters_post(struct dlm_ls *ls)
4113 {
4114         struct dlm_lkb *lkb;
4115         struct dlm_rsb *r;
4116         int error = 0, mstype, err, oc, ou;
4117 
4118         while (1) {
4119                 if (dlm_locking_stopped(ls)) {
4120                         log_debug(ls, "recover_waiters_post aborted");
4121                         error = -EINTR;
4122                         break;
4123                 }
4124 
4125                 lkb = find_resend_waiter(ls);
4126                 if (!lkb)
4127                         break;
4128 
4129                 r = lkb->lkb_resource;
4130                 hold_rsb(r);
4131                 lock_rsb(r);
4132 
4133                 mstype = lkb->lkb_wait_type;
4134                 oc = is_overlap_cancel(lkb);
4135                 ou = is_overlap_unlock(lkb);
4136                 err = 0;
4137 
4138                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4139                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4140 
4141                 /* At this point we assume that we won't get a reply to any
4142                    previous op or overlap op on this lock.  First, do a big
4143                    remove_from_waiters() for all previous ops. */
4144 
4145                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4146                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4147                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4148                 lkb->lkb_wait_type = 0;
4149                 lkb->lkb_wait_count = 0;
4150                 mutex_lock(&ls->ls_waiters_mutex);
4151                 list_del_init(&lkb->lkb_wait_reply);
4152                 mutex_unlock(&ls->ls_waiters_mutex);
4153                 unhold_lkb(lkb); /* for waiters list */
4154 
4155                 if (oc || ou) {
4156                         /* do an unlock or cancel instead of resending */
4157                         switch (mstype) {
4158                         case DLM_MSG_LOOKUP:
4159                         case DLM_MSG_REQUEST:
4160                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4161                                                         -DLM_ECANCEL);
4162                                 unhold_lkb(lkb); /* undoes create_lkb() */
4163                                 break;
4164                         case DLM_MSG_CONVERT:
4165                                 if (oc) {
4166                                         queue_cast(r, lkb, -DLM_ECANCEL);
4167                                 } else {
4168                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4169                                         _unlock_lock(r, lkb);
4170                                 }
4171                                 break;
4172                         default:
4173                                 err = 1;
4174                         }
4175                 } else {
4176                         switch (mstype) {
4177                         case DLM_MSG_LOOKUP:
4178                         case DLM_MSG_REQUEST:
4179                                 _request_lock(r, lkb);
4180                                 if (is_master(r))
4181                                         confirm_master(r, 0);
4182                                 break;
4183                         case DLM_MSG_CONVERT:
4184                                 _convert_lock(r, lkb);
4185                                 break;
4186                         default:
4187                                 err = 1;
4188                         }
4189                 }
4190 
4191                 if (err)
4192                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4193                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4194                 unlock_rsb(r);
4195                 put_rsb(r);
4196                 dlm_put_lkb(lkb);
4197         }
4198 
4199         return error;
4200 }
4201 
4202 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4203                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4204 {
4205         struct dlm_ls *ls = r->res_ls;
4206         struct dlm_lkb *lkb, *safe;
4207 
4208         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4209                 if (test(ls, lkb)) {
4210                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4211                         del_lkb(r, lkb);
4212                         /* this put should free the lkb */
4213                         if (!dlm_put_lkb(lkb))
4214                                 log_error(ls, "purged lkb not released");
4215                 }
4216         }
4217 }
4218 
4219 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4220 {
4221         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4222 }
4223 
4224 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4225 {
4226         return is_master_copy(lkb);
4227 }
4228 
4229 static void purge_dead_locks(struct dlm_rsb *r)
4230 {
4231         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4232         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4233         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4234 }
4235 
4236 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4237 {
4238         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4239         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4240         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4241 }
4242 
4243 /* Get rid of locks held by nodes that are gone. */
4244 
4245 int dlm_purge_locks(struct dlm_ls *ls)
4246 {
4247         struct dlm_rsb *r;
4248 
4249         log_debug(ls, "dlm_purge_locks");
4250 
4251         down_write(&ls->ls_root_sem);
4252         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4253                 hold_rsb(r);
4254                 lock_rsb(r);
4255                 if (is_master(r))
4256                         purge_dead_locks(r);
4257                 unlock_rsb(r);
4258                 unhold_rsb(r);
4259 
4260                 schedule();
4261         }
4262         up_write(&ls->ls_root_sem);
4263 
4264         return 0;
4265 }
4266 
4267 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4268 {
4269         struct dlm_rsb *r, *r_ret = NULL;
4270 
4271         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4272         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4273                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4274                         continue;
4275                 hold_rsb(r);
4276                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4277                 r_ret = r;
4278                 break;
4279         }
4280         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4281         return r_ret;
4282 }
4283 
4284 void dlm_grant_after_purge(struct dlm_ls *ls)
4285 {
4286         struct dlm_rsb *r;
4287         int bucket = 0;
4288 
4289         while (1) {
4290                 r = find_purged_rsb(ls, bucket);
4291                 if (!r) {
4292                         if (bucket == ls->ls_rsbtbl_size - 1)
4293                                 break;
4294                         bucket++;
4295                         continue;
4296                 }
4297                 lock_rsb(r);
4298                 if (is_master(r)) {
4299                         grant_pending_locks(r);
4300                         confirm_master(r, 0);
4301                 }
4302                 unlock_rsb(r);
4303                 put_rsb(r);
4304                 schedule();
4305         }
4306 }
4307 
4308 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4309                                          uint32_t remid)
4310 {
4311         struct dlm_lkb *lkb;
4312 
4313         list_for_each_entry(lkb, head, lkb_statequeue) {
4314                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4315                         return lkb;
4316         }
4317         return NULL;
4318 }
4319 
4320 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4321                                     uint32_t remid)
4322 {
4323         struct dlm_lkb *lkb;
4324 
4325         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4326         if (lkb)
4327                 return lkb;
4328         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4329         if (lkb)
4330                 return lkb;
4331         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4332         if (lkb)
4333                 return lkb;
4334         return NULL;
4335 }
4336 
4337 /* needs at least dlm_rcom + rcom_lock */
4338 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4339                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4340 {
4341         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4342 
4343         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4344         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4345         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4346         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4347         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4348         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4349         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4350         lkb->lkb_rqmode = rl->rl_rqmode;
4351         lkb->lkb_grmode = rl->rl_grmode;
4352         /* don't set lkb_status because add_lkb wants to itself */
4353 
4354         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4355         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4356 
4357         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4358                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4359                          sizeof(struct rcom_lock);
4360                 if (lvblen > ls->ls_lvblen)
4361                         return -EINVAL;
4362                 lkb->lkb_lvbptr = dlm_allocate_lv