fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / fs / ocfs2 / dlm / userdlm.c
index c3764f4..7d2f578 100644 (file)
@@ -102,10 +102,10 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
        spin_unlock(&lockres->l_lock);
 }
 
-#define user_log_dlm_error(_func, _stat, _lockres) do {                \
-       mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
-               "resource %s: %s\n", dlm_errname(_stat), _func, \
-               _lockres->l_name, dlm_errmsg(_stat));           \
+#define user_log_dlm_error(_func, _stat, _lockres) do {                        \
+       mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "          \
+               "resource %.*s: %s\n", dlm_errname(_stat), _func,       \
+               _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
 } while (0)
 
 /* WARNING: This function lives in a world where the only three lock
@@ -127,18 +127,23 @@ static void user_ast(void *opaque)
        struct user_lock_res *lockres = opaque;
        struct dlm_lockstatus *lksb;
 
-       mlog(0, "AST fired for lockres %s\n", lockres->l_name);
+       mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
        spin_lock(&lockres->l_lock);
 
        lksb = &(lockres->l_lksb);
        if (lksb->status != DLM_NORMAL) {
-               mlog(ML_ERROR, "lksb status value of %u on lockres %s\n",
-                    lksb->status, lockres->l_name);
+               mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
+                    lksb->status, lockres->l_namelen, lockres->l_name);
                spin_unlock(&lockres->l_lock);
                return;
        }
 
+       mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
+                       "Lockres %.*s, requested ivmode. flags 0x%x\n",
+                       lockres->l_namelen, lockres->l_name, lockres->l_flags);
+
        /* we're downconverting. */
        if (lockres->l_requested < lockres->l_level) {
                if (lockres->l_requested <=
@@ -166,15 +171,14 @@ static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
                BUG();
 }
 
-static void user_dlm_unblock_lock(void *opaque);
+static void user_dlm_unblock_lock(struct work_struct *work);
 
 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
 {
        if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
                user_dlm_grab_inode_ref(lockres);
 
-               INIT_WORK(&lockres->l_work, user_dlm_unblock_lock,
-                         lockres);
+               INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
 
                queue_work(user_dlm_worker, &lockres->l_work);
                lockres->l_flags |= USER_LOCK_QUEUED;
@@ -209,8 +213,8 @@ static void user_bast(void *opaque, int level)
 {
        struct user_lock_res *lockres = opaque;
 
-       mlog(0, "Blocking AST fired for lockres %s. Blocking level %d\n",
-               lockres->l_name, level);
+       mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
+            lockres->l_namelen, lockres->l_name, level);
 
        spin_lock(&lockres->l_lock);
        lockres->l_flags |= USER_LOCK_BLOCKED;
@@ -227,25 +231,41 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
 {
        struct user_lock_res *lockres = opaque;
 
-       mlog(0, "UNLOCK AST called on lock %s\n", lockres->l_name);
+       mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
-       if (status != DLM_NORMAL)
+       if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
                mlog(ML_ERROR, "Dlm returns status %d\n", status);
 
        spin_lock(&lockres->l_lock);
-       if (lockres->l_flags & USER_LOCK_IN_TEARDOWN)
+       /* The teardown flag gets set early during the unlock process,
+        * so test the cancel flag to make sure that this ast isn't
+        * for a concurrent cancel. */
+       if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
+           && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
                lockres->l_level = LKM_IVMODE;
-       else {
+       } else if (status == DLM_CANCELGRANT) {
+               /* We tried to cancel a convert request, but it was
+                * already granted. Don't clear the busy flag - the
+                * ast should've done this already. */
+               BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
+               lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
+               goto out_noclear;
+       } else {
+               BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
+               /* Cancel succeeded, we want to re-queue */
                lockres->l_requested = LKM_IVMODE; /* cancel an
                                                    * upconvert
                                                    * request. */
                lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
                /* we want the unblock thread to look at it again
                 * now. */
-               __user_dlm_queue_lockres(lockres);
+               if (lockres->l_flags & USER_LOCK_BLOCKED)
+                       __user_dlm_queue_lockres(lockres);
        }
 
        lockres->l_flags &= ~USER_LOCK_BUSY;
+out_noclear:
        spin_unlock(&lockres->l_lock);
 
        wake_up(&lockres->l_event);
@@ -258,31 +278,42 @@ static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
        iput(inode);
 }
 
-static void user_dlm_unblock_lock(void *opaque)
+static void user_dlm_unblock_lock(struct work_struct *work)
 {
        int new_level, status;
-       struct user_lock_res *lockres = (struct user_lock_res *) opaque;
+       struct user_lock_res *lockres =
+               container_of(work, struct user_lock_res, l_work);
        struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-       mlog(0, "processing lockres %s\n", lockres->l_name);
+       mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
        spin_lock(&lockres->l_lock);
 
-       BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
-       BUG_ON(!(lockres->l_flags & USER_LOCK_QUEUED));
+       mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
+                       "Lockres %.*s, flags 0x%x\n",
+                       lockres->l_namelen, lockres->l_name, lockres->l_flags);
 
-       /* notice that we don't clear USER_LOCK_BLOCKED here. That's
-        * for user_ast to do. */
+       /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
+        * set, we want user_ast clear it. */
        lockres->l_flags &= ~USER_LOCK_QUEUED;
 
+       /* It's valid to get here and no longer be blocked - if we get
+        * several basts in a row, we might be queued by the first
+        * one, the unblock thread might run and clear the queued
+        * flag, and finally we might get another bast which re-queues
+        * us before our ast for the downconvert is called. */
+       if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
+               spin_unlock(&lockres->l_lock);
+               goto drop_ref;
+       }
+
        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
-               mlog(0, "lock is in teardown so we do nothing\n");
                spin_unlock(&lockres->l_lock);
                goto drop_ref;
        }
 
        if (lockres->l_flags & USER_LOCK_BUSY) {
-               mlog(0, "BUSY flag detected...\n");
                if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
                        spin_unlock(&lockres->l_lock);
                        goto drop_ref;
@@ -296,14 +327,7 @@ static void user_dlm_unblock_lock(void *opaque)
                                   LKM_CANCEL,
                                   user_unlock_ast,
                                   lockres);
-               if (status == DLM_CANCELGRANT) {
-                       /* If we got this, then the ast was fired
-                        * before we could cancel. We cleanup our
-                        * state, and restart the function. */
-                       spin_lock(&lockres->l_lock);
-                       lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
-                       spin_unlock(&lockres->l_lock);
-               } else if (status != DLM_NORMAL)
+               if (status != DLM_NORMAL)
                        user_log_dlm_error("dlmunlock", status, lockres);
                goto drop_ref;
        }
@@ -341,6 +365,7 @@ static void user_dlm_unblock_lock(void *opaque)
                         &lockres->l_lksb,
                         LKM_CONVERT|LKM_VALBLK,
                         lockres->l_name,
+                        lockres->l_namelen,
                         user_ast,
                         lockres,
                         user_bast);
@@ -389,16 +414,16 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
 
        if (level != LKM_EXMODE &&
            level != LKM_PRMODE) {
-               mlog(ML_ERROR, "lockres %s: invalid request!\n",
-                    lockres->l_name);
+               mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+                    lockres->l_namelen, lockres->l_name);
                status = -EINVAL;
                goto bail;
        }
 
-       mlog(0, "lockres %s: asking for %s lock, passed flags = 0x%x\n",
-               lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
-               lkm_flags);
+       mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
+            lockres->l_namelen, lockres->l_name,
+            (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
+            lkm_flags);
 
 again:
        if (signal_pending(current)) {
@@ -443,15 +468,13 @@ again:
                BUG_ON(level == LKM_IVMODE);
                BUG_ON(level == LKM_NLMODE);
 
-               mlog(0, "lock %s, get lock from %d to level = %d\n",
-                       lockres->l_name, lockres->l_level, level);
-
                /* call dlm_lock to upgrade lock now */
                status = dlmlock(dlm,
                                 level,
                                 &lockres->l_lksb,
                                 local_flags,
                                 lockres->l_name,
+                                lockres->l_namelen,
                                 user_ast,
                                 lockres,
                                 user_bast);
@@ -467,9 +490,6 @@ again:
                        goto bail;
                }
 
-               mlog(0, "lock %s, successfull return from dlmlock\n",
-                       lockres->l_name);
-
                user_wait_on_busy_lock(lockres);
                goto again;
        }
@@ -477,9 +497,6 @@ again:
        user_dlm_inc_holders(lockres, level);
        spin_unlock(&lockres->l_lock);
 
-       mlog(0, "lockres %s: Got %s lock!\n", lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
        status = 0;
 bail:
        return status;
@@ -507,13 +524,11 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
 {
        if (level != LKM_EXMODE &&
            level != LKM_PRMODE) {
-               mlog(ML_ERROR, "lockres %s: invalid request!\n", lockres->l_name);
+               mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+                    lockres->l_namelen, lockres->l_name);
                return;
        }
 
-       mlog(0, "lockres %s: dropping %s lock\n", lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
        spin_lock(&lockres->l_lock);
        user_dlm_dec_holders(lockres, level);
        __user_dlm_cond_queue_lockres(lockres);
@@ -571,6 +586,7 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
        memcpy(lockres->l_name,
               dentry->d_name.name,
               dentry->d_name.len);
+       lockres->l_namelen = dentry->d_name.len;
 }
 
 int user_dlm_destroy_lock(struct user_lock_res *lockres)
@@ -578,13 +594,18 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
        int status = -EBUSY;
        struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-       mlog(0, "asked to destroy %s\n", lockres->l_name);
+       mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
 
        spin_lock(&lockres->l_lock);
-       while (lockres->l_flags & USER_LOCK_BUSY) {
+       if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
                spin_unlock(&lockres->l_lock);
+               return 0;
+       }
 
-               mlog(0, "lock %s is busy\n", lockres->l_name);
+       lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
+
+       while (lockres->l_flags & USER_LOCK_BUSY) {
+               spin_unlock(&lockres->l_lock);
 
                user_wait_on_busy_lock(lockres);
 
@@ -593,23 +614,19 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
 
        if (lockres->l_ro_holders || lockres->l_ex_holders) {
                spin_unlock(&lockres->l_lock);
-               mlog(0, "lock %s has holders\n", lockres->l_name);
                goto bail;
        }
 
        status = 0;
        if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
                spin_unlock(&lockres->l_lock);
-               mlog(0, "lock %s is not attached\n", lockres->l_name);
                goto bail;
        }
 
        lockres->l_flags &= ~USER_LOCK_ATTACHED;
        lockres->l_flags |= USER_LOCK_BUSY;
-       lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
        spin_unlock(&lockres->l_lock);
 
-       mlog(0, "unlocking lockres %s\n", lockres->l_name);
        status = dlmunlock(dlm,
                           &lockres->l_lksb,
                           LKM_VALBLK,
@@ -634,7 +651,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name)
        u32 dlm_key;
        char *domain;
 
-       domain = kmalloc(name->len + 1, GFP_KERNEL);
+       domain = kmalloc(name->len + 1, GFP_NOFS);
        if (!domain) {
                mlog_errno(-ENOMEM);
                return ERR_PTR(-ENOMEM);