HAMMER 21/many: B-Tree node locking finalization.
[dragonfly.git] / sys / vfs / hammer / hammer_object.c
index e0db60a..ce571f1 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  * 
- * $DragonFly: src/sys/vfs/hammer/hammer_object.c,v 1.11 2007/12/30 00:47:22 dillon Exp $
+ * $DragonFly: src/sys/vfs/hammer/hammer_object.c,v 1.21 2008/01/18 07:02:41 dillon Exp $
  */
 
 #include "hammer.h"
@@ -40,7 +40,6 @@ static int hammer_mem_add(hammer_transaction_t trans,
                             hammer_record_t record);
 static int hammer_mem_lookup(hammer_cursor_t cursor, hammer_inode_t ip);
 static int hammer_mem_first(hammer_cursor_t cursor, hammer_inode_t ip);
-static void hammer_free_mem_record(hammer_record_t record);
 
 /*
  * Red-black tree support.
@@ -58,9 +57,17 @@ hammer_rec_rb_compare(hammer_record_t rec1, hammer_record_t rec2)
        if (rec1->rec.base.base.key > rec2->rec.base.base.key)
                return(1);
 
-       if (rec1->rec.base.base.create_tid < rec2->rec.base.base.create_tid)
+       if (rec1->rec.base.base.delete_tid == 0) {
+               if (rec2->rec.base.base.delete_tid == 0)
+                       return(0);
+               return(1);
+       }
+       if (rec2->rec.base.base.delete_tid == 0)
                return(-1);
-       if (rec1->rec.base.base.create_tid > rec2->rec.base.base.create_tid)
+
+       if (rec1->rec.base.base.delete_tid < rec2->rec.base.base.delete_tid)
+               return(-1);
+       if (rec1->rec.base.base.delete_tid > rec2->rec.base.base.delete_tid)
                return(1);
         return(0);
 }
@@ -78,26 +85,17 @@ hammer_rec_compare(hammer_base_elm_t info, hammer_record_t rec)
         if (info->key > rec->rec.base.base.key)
                 return(2);
 
-        /*
-         * This test has a number of special cases.  create_tid in key1 is
-         * the as-of transction id, and delete_tid in key1 is NOT USED.
-         *
-         * A key1->create_tid of 0 matches any record regardles of when
-         * it was created or destroyed.  0xFFFFFFFFFFFFFFFFULL should be
-         * used to search for the most current state of the object.
-         *
-         * key2->create_tid is a HAMMER record and will never be
-         * 0.   key2->delete_tid is the deletion transaction id or 0 if
-         * the record has not yet been deleted.
-         */
-        if (info->create_tid) {
-                if (info->create_tid < rec->rec.base.base.create_tid)
-                        return(-1);
-                if (rec->rec.base.base.delete_tid &&
-                   info->create_tid >= rec->rec.base.base.delete_tid) {
-                        return(1);
-               }
-        }
+       if (info->delete_tid == 0) {
+               if (rec->rec.base.base.delete_tid == 0)
+                       return(0);
+               return(1);
+       }
+       if (rec->rec.base.base.delete_tid == 0)
+               return(-1);
+       if (info->delete_tid < rec->rec.base.base.delete_tid)
+               return(-1);
+       if (info->delete_tid > rec->rec.base.base.delete_tid)
+               return(1);
         return(0);
 }
 
@@ -121,8 +119,6 @@ hammer_rec_scan_cmp(hammer_record_t rec, void *data)
        r = hammer_rec_compare(&cursor->key_beg, rec);
        if (r > 1)
                return(-1);
-       if (r == 0)
-               return(0);
        r = hammer_rec_compare(&cursor->key_end, rec);
        if (r < -1)
                return(1);
@@ -135,77 +131,47 @@ RB_GENERATE_XLOOKUP(hammer_rec_rb_tree, INFO, hammer_record, rb_node,
 
 /*
  * Allocate a record for the caller to finish filling in.  The record is
- * returned referenced and locked.
+ * returned referenced.
  */
 hammer_record_t
 hammer_alloc_mem_record(hammer_inode_t ip)
 {
        hammer_record_t record;
 
+       ++hammer_count_records;
        record = kmalloc(sizeof(*record), M_HAMMER, M_WAITOK|M_ZERO);
        record->ip = ip;
+       record->rec.base.base.btype = HAMMER_BTREE_TYPE_RECORD;
        hammer_ref(&record->lock);
-       hammer_lock_ex(&record->lock);
        return (record);
 }
 
 /*
- * Release a memory record.  If the record was marked for defered deletion,
- * and no references remain, the record is physically destroyed.
+ * Release a memory record.  Records marked for deletion are immediately
+ * removed from the RB-Tree but otherwise left intact until the last ref
+ * goes away.
  */
 void
-hammer_rel_mem_record(struct hammer_record **recordp)
+hammer_rel_mem_record(struct hammer_record *record)
 {
-       hammer_record_t rec;
-
-       if ((rec = *recordp) != NULL) {
-               hammer_unref(&rec->lock);
-               if (rec->lock.refs == 0) {
-                       if (rec->flags & HAMMER_RECF_DELETED)
-                               hammer_free_mem_record(rec);
+       hammer_unref(&record->lock);
+       if (record->flags & HAMMER_RECF_DELETED) {
+               if (record->flags & HAMMER_RECF_ONRBTREE) {
+                       RB_REMOVE(hammer_rec_rb_tree, &record->ip->rec_tree,
+                                 record);
+                       record->flags &= ~HAMMER_RECF_ONRBTREE;
+               }
+               if (record->lock.refs == 0) {
+                       if (record->flags & HAMMER_RECF_ALLOCDATA) {
+                               --hammer_count_record_datas;
+                               kfree(record->data, M_HAMMER);
+                               record->flags &= ~HAMMER_RECF_ALLOCDATA;
+                       }
+                       record->data = NULL;
+                       --hammer_count_records;
+                       kfree(record, M_HAMMER);
                }
-               *recordp = NULL;
-       }
-}
-
-/*
- * Drop a locked hammer in-memory record.  This function unlocks and
- * dereferences the record.  If delete != 0 the record is marked for
- * deletion.  Physical deletion only occurs when the last reference goes
- * away.
- */
-void
-hammer_drop_mem_record(hammer_record_t rec, int delete)
-{
-       if (delete)
-               rec->flags |= HAMMER_RECF_DELETED;
-       hammer_unlock(&rec->lock);
-       hammer_rel_mem_record(&rec);
-}
-
-/*
- * Free a record.  Clean the structure up even though we are throwing it
- * away as a sanity check.  The actual free operation is delayed while
- * the record is referenced.  However, the record is removed from the RB
- * tree immediately.
- */
-static void
-hammer_free_mem_record(hammer_record_t record)
-{
-       if (record->flags & HAMMER_RECF_ONRBTREE) {
-               RB_REMOVE(hammer_rec_rb_tree, &record->ip->rec_tree, record);
-               record->flags &= ~HAMMER_RECF_ONRBTREE;
-       }
-       if (record->lock.refs) {
-               record->flags |= HAMMER_RECF_DELETED;
-               return;
-       }
-       if (record->flags & HAMMER_RECF_ALLOCDATA) {
-               kfree(record->data, M_HAMMER);
-               record->flags &= ~HAMMER_RECF_ALLOCDATA;
        }
-       record->data = NULL;
-       kfree(record, M_HAMMER);
 }
 
 /*
@@ -221,8 +187,10 @@ hammer_mem_lookup(hammer_cursor_t cursor, hammer_inode_t ip)
 {
        int error;
 
-       if (cursor->iprec)
-               hammer_rel_mem_record(&cursor->iprec);
+       if (cursor->iprec) {
+               hammer_rel_mem_record(cursor->iprec);
+               cursor->iprec = NULL;
+       }
        if (cursor->ip) {
                hammer_rec_rb_tree_scan_info_done(&cursor->scan,
                                                  &cursor->ip->rec_tree);
@@ -257,12 +225,11 @@ hammer_rec_scan_callback(hammer_record_t rec, void *data)
        /*
         * Skip if not visible due to our as-of TID
         */
-        if (cursor->key_beg.create_tid) {
-                if (cursor->key_beg.create_tid < rec->rec.base.base.create_tid)
+        if (cursor->flags & HAMMER_CURSOR_ASOF) {
+                if (cursor->asof < rec->rec.base.base.create_tid)
                         return(0);
                 if (rec->rec.base.base.delete_tid &&
-                   cursor->key_beg.create_tid >=
-                    rec->rec.base.base.delete_tid) {
+                   cursor->asof >= rec->rec.base.base.delete_tid) {
                         return(0);
                }
         }
@@ -282,14 +249,17 @@ static
 int
 hammer_mem_first(hammer_cursor_t cursor, hammer_inode_t ip)
 {
-       if (cursor->iprec)
-               hammer_rel_mem_record(&cursor->iprec);
+       if (cursor->iprec) {
+               hammer_rel_mem_record(cursor->iprec);
+               cursor->iprec = NULL;
+       }
        if (cursor->ip) {
                hammer_rec_rb_tree_scan_info_done(&cursor->scan,
                                                  &cursor->ip->rec_tree);
        }
        cursor->ip = ip;
        hammer_rec_rb_tree_scan_info_link(&cursor->scan, &ip->rec_tree);
+
        cursor->scan.node = NULL;
        hammer_rec_rb_tree_RB_SCAN(&ip->rec_tree, hammer_rec_scan_cmp,
                                   hammer_rec_scan_callback, cursor);
@@ -313,8 +283,10 @@ hammer_mem_done(hammer_cursor_t cursor)
                                                  &cursor->ip->rec_tree);
                cursor->ip = NULL;
        }
-        if (cursor->iprec)
-               hammer_rel_mem_record(&cursor->iprec);
+        if (cursor->iprec) {
+               hammer_rel_mem_record(cursor->iprec);
+               cursor->iprec = NULL;
+       }
 }
 
 /************************************************************************
@@ -360,14 +332,14 @@ hammer_ip_add_directory(struct hammer_transaction *trans,
                record->data = (void *)record->rec.entry.den_name;
                record->flags |= HAMMER_RECF_EMBEDDED_DATA;
        } else {
+               ++hammer_count_record_datas;
                record->data = kmalloc(bytes, M_HAMMER, M_WAITOK);
                record->flags |= HAMMER_RECF_ALLOCDATA;
        }
        bcopy(ncp->nc_name, record->data, bytes);
        record->rec.entry.base.data_len = bytes;
        ++ip->ino_rec.ino_nlinks;
-       hammer_modify_inode(trans, ip,
-                           HAMMER_INODE_RDIRTY | HAMMER_INODE_TID);
+       hammer_modify_inode(trans, ip, HAMMER_INODE_RDIRTY);
        error = hammer_mem_add(trans, record);
        return(error);
 }
@@ -377,6 +349,9 @@ hammer_ip_add_directory(struct hammer_transaction *trans,
  * cursor must be seeked to the directory entry record being deleted.
  *
  * NOTE: HAMMER_CURSOR_DELETE may not have been set.  XXX remove flag.
+ *
+ * This function can return EDEADLK requiring the caller to terminate
+ * the cursor and retry.
  */
 int
 hammer_ip_del_directory(struct hammer_transaction *trans,
@@ -389,15 +364,20 @@ hammer_ip_del_directory(struct hammer_transaction *trans,
 
        /*
         * One less link.  The file may still be open in the OS even after
-        * all links have gone away so we don't destroy the inode's data
-        * here.
+        * all links have gone away so we only try to sync if the OS has
+        * no references and nlinks falls to 0.
+        *
+        * We have to terminate the cursor before syncing the inode to
+        * avoid deadlocking against ourselves.
         */
        if (error == 0) {
                --ip->ino_rec.ino_nlinks;
-               hammer_modify_inode(trans, ip,
-                                   HAMMER_INODE_RDIRTY | HAMMER_INODE_TID);
-               if (ip->vp == NULL || (ip->vp->v_flag & VINACTIVE))
+               hammer_modify_inode(trans, ip, HAMMER_INODE_RDIRTY);
+               if (ip->ino_rec.ino_nlinks == 0 &&
+                   (ip->vp == NULL || (ip->vp->v_flag & VINACTIVE))) {
+                       hammer_done_cursor(cursor);
                        hammer_sync_inode(ip, MNT_NOWAIT, 1);
+               }
 
        }
        return(error);
@@ -430,6 +410,7 @@ hammer_ip_add_record(struct hammer_transaction *trans, hammer_record_t record)
        if (record->data) {
                if ((char *)record->data < (char *)&record->rec ||
                    (char *)record->data >= (char *)(&record->rec + 1)) {
+                       ++hammer_count_record_datas;
                        data = kmalloc(bytes, M_HAMMER, M_WAITOK);
                        record->flags |= HAMMER_RECF_ALLOCDATA;
                        bcopy(record->data, data, bytes);
@@ -438,8 +419,7 @@ hammer_ip_add_record(struct hammer_transaction *trans, hammer_record_t record)
                        record->flags |= HAMMER_RECF_EMBEDDED_DATA;
                }
        }
-       hammer_modify_inode(trans, ip,
-                           HAMMER_INODE_RDIRTY | HAMMER_INODE_TID);
+       hammer_modify_inode(trans, ip, HAMMER_INODE_RDIRTY);
        error = hammer_mem_add(trans, record);
        return(error);
 }
@@ -460,15 +440,17 @@ hammer_ip_sync_data(hammer_transaction_t trans, hammer_inode_t ip,
        void *bdata;
        int error;
 
-       error = hammer_init_cursor_ip(&cursor, ip);
+retry:
+       error = hammer_init_cursor_hmp(&cursor, &ip->cache[0], ip->hmp);
        if (error)
                return(error);
        cursor.key_beg.obj_id = ip->obj_id;
        cursor.key_beg.key = offset + bytes;
-       cursor.key_beg.create_tid = trans->tid;
+       cursor.key_beg.create_tid = 0;
        cursor.key_beg.delete_tid = 0;
        cursor.key_beg.rec_type = HAMMER_RECTYPE_DATA;
-       cursor.flags = HAMMER_CURSOR_INSERT;
+       cursor.asof = trans->tid;
+       cursor.flags |= HAMMER_CURSOR_INSERT | HAMMER_CURSOR_ASOF;
 
        /*
         * Issue a lookup to position the cursor and locate the cluster
@@ -500,25 +482,39 @@ hammer_ip_sync_data(hammer_transaction_t trans, hammer_inode_t ip,
        /*
         * Fill everything in and insert our B-Tree node.
         */
-       rec->base.base = cursor.key_beg;
+       hammer_modify_buffer(cursor.record_buffer);
+       rec->base.base.btype = HAMMER_BTREE_TYPE_RECORD;
+       rec->base.base.obj_id = ip->obj_id;
+       rec->base.base.key = offset + bytes;
+       rec->base.base.create_tid = trans->tid;
+       rec->base.base.delete_tid = 0;
+       rec->base.base.rec_type = HAMMER_RECTYPE_DATA;
        rec->base.data_crc = crc32(data, bytes);
        rec->base.rec_id = 0;   /* XXX */
        rec->base.data_offset = hammer_bclu_offset(cursor.data_buffer, bdata);
        rec->base.data_len = bytes;
-       hammer_modify_buffer(cursor.record_buffer);
 
-       bcopy(data, bdata, bytes);
        hammer_modify_buffer(cursor.data_buffer);
+       bcopy(data, bdata, bytes);
 
-       elm.leaf.base = cursor.key_beg;
+       elm.leaf.base = rec->base.base;
        elm.leaf.rec_offset = hammer_bclu_offset(cursor.record_buffer, rec);
        elm.leaf.data_offset = rec->base.data_offset;
        elm.leaf.data_len = bytes;
        elm.leaf.data_crc = rec->base.data_crc;
 
+       /*
+        * Data records can wind up on-disk before the inode itself is
+        * on-disk.  One must assume data records may be on-disk if either
+        * HAMMER_INODE_DONDISK or HAMMER_INODE_ONDISK is set
+        */
+       ip->flags |= HAMMER_INODE_DONDISK;
+
        error = hammer_btree_insert(&cursor, &elm);
-       if (error == 0)
+       if (error == 0) {
+               hammer_update_syncid(cursor.record_buffer->cluster, trans->tid);
                goto done;
+       }
 
        hammer_free_record_ptr(cursor.record_buffer, rec);
 fail1:
@@ -531,6 +527,8 @@ done:
        if (error == ENOSPC)
                hammer_load_spike(&cursor, spike);
        hammer_done_cursor(&cursor);
+       if (error == EDEADLK)
+               goto retry;
        return(error);
 }
 
@@ -544,33 +542,67 @@ hammer_ip_sync_record(hammer_record_t record, struct hammer_cursor **spike)
 {
        struct hammer_cursor cursor;
        hammer_record_ondisk_t rec;
+       hammer_mount_t hmp;
        union hammer_btree_elm elm;
        void *bdata;
        int error;
 
-       error = hammer_init_cursor_ip(&cursor, record->ip);
+retry:
+       error = hammer_init_cursor_hmp(&cursor, &record->ip->cache[0],
+                                      record->ip->hmp);
        if (error)
                return(error);
        cursor.key_beg = record->rec.base.base;
-       cursor.flags = HAMMER_CURSOR_INSERT;
+       cursor.flags |= HAMMER_CURSOR_INSERT;
 
        /*
         * Issue a lookup to position the cursor and locate the cluster.  The
-        * target key should not exist.
+        * target key should not exist.  If we are creating a directory entry
+        * we may have to iterate the low 32 bits of the key to find an unused
+        * key.
         *
         * If we run out of space trying to adjust the B-Tree for the
         * insert, re-lookup without the insert flag so the cursor
         * is properly positioned for the spike.
         */
-       error = hammer_btree_lookup(&cursor);
-       if (error == 0) {
-               kprintf("hammer_ip_sync_record: duplicate rec at (%016llx)\n",
-                       record->rec.base.base.key);
-               error = EIO;
+       for (;;) {
+               error = hammer_btree_lookup(&cursor);
+               if (error)
+                       break;
+               if (record->rec.base.base.rec_type != HAMMER_RECTYPE_DIRENTRY) {
+                       kprintf("hammer_ip_sync_record: duplicate rec "
+                               "at (%016llx)\n", record->rec.base.base.key);
+                       Debugger("duplicate record1");
+                       error = EIO;
+                       break;
+               }
+               hmp = cursor.node->cluster->volume->hmp;
+               if (++hmp->namekey_iterator == 0)
+                       ++hmp->namekey_iterator;
+               record->rec.base.base.key &= ~(0xFFFFFFFFLL);
+               record->rec.base.base.key |= hmp->namekey_iterator;
+               cursor.key_beg.key = record->rec.base.base.key;
        }
        if (error != ENOENT)
                goto done;
 
+       /*
+        * Mark the record as undergoing synchronization.  Our cursor is
+        * holding a locked B-Tree node for the insertion which interlocks
+        * anyone trying to access this record.
+        *
+        * XXX There is still a race present related to iterations.  An
+        * iteration may process the record, a sync may occur, and then
+        * later process the B-Tree element for the same record.
+        *
+        * We do not try to synchronize a deleted record.
+        */
+       if (record->flags & (HAMMER_RECF_DELETED | HAMMER_RECF_SYNCING)) {
+               error = 0;
+               goto done;
+       }
+       record->flags |= HAMMER_RECF_SYNCING;
+
        /*
         * Allocate record and data space now that we know which cluster
         * the B-Tree node ended up in.
@@ -583,7 +615,7 @@ hammer_ip_sync_record(hammer_record_t record, struct hammer_cursor **spike)
                                          record->rec.base.data_len, &error,
                                          &cursor.data_buffer);
                if (bdata == NULL)
-                       goto done;
+                       goto fail2;
        }
        rec = hammer_alloc_record(cursor.node->cluster, &error,
                                  &cursor.record_buffer);
@@ -595,6 +627,7 @@ hammer_ip_sync_record(hammer_record_t record, struct hammer_cursor **spike)
         *
         * XXX assign rec_id here
         */
+       hammer_modify_buffer(cursor.record_buffer);
        *rec = record->rec;
        if (bdata) {
                rec->base.data_crc = crc32(record->data,
@@ -614,23 +647,30 @@ hammer_ip_sync_record(hammer_record_t record, struct hammer_cursor **spike)
                         * Data separate from record
                         */
                        rec->base.data_offset = hammer_bclu_offset(cursor.data_buffer,bdata);
-                       bcopy(record->data, bdata, rec->base.data_len);
                        hammer_modify_buffer(cursor.data_buffer);
+                       bcopy(record->data, bdata, rec->base.data_len);
                }
        }
        rec->base.rec_id = 0;   /* XXX */
 
-       hammer_modify_buffer(cursor.record_buffer);
-
-       elm.leaf.base = cursor.key_beg;
+       elm.leaf.base = record->rec.base.base;
        elm.leaf.rec_offset = hammer_bclu_offset(cursor.record_buffer, rec);
        elm.leaf.data_offset = rec->base.data_offset;
        elm.leaf.data_len = rec->base.data_len;
        elm.leaf.data_crc = rec->base.data_crc;
 
        error = hammer_btree_insert(&cursor, &elm);
-       if (error == 0)
+
+       /*
+        * Clean up on success, or fall through on error.
+        */
+       if (error == 0) {
+               record->flags |= HAMMER_RECF_DELETED;
+               record->flags &= ~HAMMER_RECF_SYNCING;
+               hammer_update_syncid(cursor.record_buffer->cluster,
+                                    record->rec.base.base.create_tid);
                goto done;
+       }
 
        hammer_free_record_ptr(cursor.record_buffer, rec);
 fail1:
@@ -638,6 +678,8 @@ fail1:
                hammer_free_data_ptr(cursor.data_buffer, bdata,
                                     record->rec.base.data_len);
        }
+fail2:
+       record->flags &= ~HAMMER_RECF_SYNCING;
 done:
        /*
         * If ENOSPC in cluster fill in the spike structure and return
@@ -646,6 +688,8 @@ done:
        if (error == ENOSPC)
                hammer_load_spike(&cursor, spike);
        hammer_done_cursor(&cursor);
+       if (error == EDEADLK)
+               goto retry;
        return(error);
 }
 
@@ -656,6 +700,9 @@ done:
  *
  * The target cursor will be modified by this call.  Note in particular
  * that HAMMER_CURSOR_INSERT is set.
+ *
+ * NOTE: This can return EDEADLK, requiring the caller to release its cursor
+ * and retry the operation.
  */
 int
 hammer_write_record(hammer_cursor_t cursor, hammer_record_ondisk_t orec,
@@ -681,6 +728,7 @@ hammer_write_record(hammer_cursor_t cursor, hammer_record_ondisk_t orec,
        if (error == 0) {
                kprintf("hammer_ip_sync_record: duplicate rec at (%016llx)\n",
                        orec->base.base.key);
+               Debugger("duplicate record2");
                error = EIO;
        }
        if (error != ENOENT)
@@ -710,6 +758,7 @@ hammer_write_record(hammer_cursor_t cursor, hammer_record_ondisk_t orec,
         *
         * XXX assign rec_id here
         */
+       hammer_modify_buffer(cursor->record_buffer);
        *nrec = *orec;
        nrec->base.data_offset = 0;
        if (bdata) {
@@ -728,14 +777,12 @@ hammer_write_record(hammer_cursor_t cursor, hammer_record_ondisk_t orec,
                         * Data separate from record
                         */
                        nrec->base.data_offset = hammer_bclu_offset(cursor->data_buffer, bdata);
-                       bcopy(data, bdata, nrec->base.data_len);
                        hammer_modify_buffer(cursor->data_buffer);
+                       bcopy(data, bdata, nrec->base.data_len);
                }
        }
        nrec->base.rec_id = 0;  /* XXX */
 
-       hammer_modify_buffer(cursor->record_buffer);
-
        elm.leaf.base = nrec->base.base;
        elm.leaf.rec_offset = hammer_bclu_offset(cursor->record_buffer, nrec);
        elm.leaf.data_offset = nrec->base.data_offset;
@@ -743,8 +790,11 @@ hammer_write_record(hammer_cursor_t cursor, hammer_record_ondisk_t orec,
        elm.leaf.data_crc = nrec->base.data_crc;
 
        error = hammer_btree_insert(cursor, &elm);
-       if (error == 0)
+       if (error == 0) {
+               hammer_update_syncid(cursor->record_buffer->cluster,
+                                    nrec->base.base.create_tid);
                goto done;
+       }
 
        hammer_free_record_ptr(cursor->record_buffer, nrec);
 fail1:
@@ -763,8 +813,8 @@ done:
  * A unique 64 bit key is generated in-memory and may be regenerated a
  * second time when the directory record is flushed to the on-disk B-Tree.
  *
- * A locked and referenced record is passed to this function.  This function
- * eats the lock and reference.
+ * A referenced record is passed to this function.  This function
+ * eats the reference.  If an error occurs the record will be deleted.
  */
 static
 int
@@ -772,7 +822,8 @@ hammer_mem_add(struct hammer_transaction *trans, hammer_record_t record)
 {
        while (RB_INSERT(hammer_rec_rb_tree, &record->ip->rec_tree, record)) {
                if (record->rec.base.base.rec_type != HAMMER_RECTYPE_DIRENTRY){
-                       hammer_drop_mem_record(record, 1);
+                       record->flags |= HAMMER_RECF_DELETED;
+                       hammer_rel_mem_record(record);
                        return (EEXIST);
                }
                if (++trans->hmp->namekey_iterator == 0)
@@ -781,7 +832,8 @@ hammer_mem_add(struct hammer_transaction *trans, hammer_record_t record)
                record->rec.base.base.key |= trans->hmp->namekey_iterator;
        }
        record->flags |= HAMMER_RECF_ONRBTREE;
-       hammer_drop_mem_record(record, 0);
+       hammer_modify_inode(trans, record->ip, HAMMER_INODE_XDIRTY);
+       hammer_rel_mem_record(record);
        return(0);
 }
 
@@ -819,7 +871,7 @@ hammer_ip_lookup(hammer_cursor_t cursor, struct hammer_inode *ip)
        /*
         * If the inode has on-disk components search the on-disk B-Tree.
         */
-       if ((ip->flags & HAMMER_INODE_ONDISK) == 0)
+       if ((ip->flags & (HAMMER_INODE_ONDISK|HAMMER_INODE_DONDISK)) == 0)
                return(error);
        error = hammer_btree_lookup(cursor);
        if (error == 0)
@@ -834,6 +886,9 @@ hammer_ip_lookup(hammer_cursor_t cursor, struct hammer_inode *ip)
  *
  * When 0 is returned hammer_ip_next() may be used to iterate additional
  * records within the requested range.
+ *
+ * This function can return EDEADLK, requiring the caller to terminate
+ * the cursor and try again.
  */
 int
 hammer_ip_first(hammer_cursor_t cursor, struct hammer_inode *ip)
@@ -846,8 +901,10 @@ hammer_ip_first(hammer_cursor_t cursor, struct hammer_inode *ip)
        cursor->flags &= ~HAMMER_CURSOR_DELBTREE;
        cursor->flags |= HAMMER_CURSOR_ATEDISK | HAMMER_CURSOR_ATEMEM;
        cursor->flags |= HAMMER_CURSOR_DISKEOF | HAMMER_CURSOR_MEMEOF;
-       if (cursor->iprec)
-               hammer_rel_mem_record(&cursor->iprec);
+       if (cursor->iprec) {
+               hammer_rel_mem_record(cursor->iprec);
+               cursor->iprec = NULL;
+       }
 
        /*
         * Search the on-disk B-Tree.  hammer_btree_lookup() only does an
@@ -857,10 +914,14 @@ hammer_ip_first(hammer_cursor_t cursor, struct hammer_inode *ip)
         * The ATEDISK flag is used by hammer_btree_iterate to determine
         * whether it must index forwards or not.  It is also used here
         * to select the next record from in-memory or on-disk.
+        *
+        * EDEADLK can only occur if the lookup hit an empty internal
+        * element and couldn't delete it.  Since this could only occur
+        * in-range, we can just iterate from the failure point.
         */
-       if (ip->flags & HAMMER_INODE_ONDISK) {
+       if (ip->flags & (HAMMER_INODE_ONDISK|HAMMER_INODE_DONDISK)) {
                error = hammer_btree_lookup(cursor);
-               if (error == ENOENT) {
+               if (error == ENOENT || error == EDEADLK) {
                        cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
                        error = hammer_btree_iterate(cursor);
                }
@@ -923,6 +984,7 @@ hammer_ip_next(hammer_cursor_t cursor)
        if (cursor->flags & (HAMMER_CURSOR_ATEDISK|HAMMER_CURSOR_DELBTREE)) {
                if ((cursor->flags & HAMMER_CURSOR_DISKEOF) == 0) {
                        error = hammer_btree_iterate(cursor);
+                       cursor->flags &= ~HAMMER_CURSOR_DELBTREE;
                        if (error == 0)
                                cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
                        else
@@ -942,7 +1004,10 @@ hammer_ip_next(hammer_cursor_t cursor)
         */
        if (cursor->flags & HAMMER_CURSOR_ATEMEM) {
                if ((cursor->flags & HAMMER_CURSOR_MEMEOF) == 0) {
-                       hammer_rel_mem_record(&cursor->iprec);
+                       if (cursor->iprec) {
+                               hammer_rel_mem_record(cursor->iprec);
+                               cursor->iprec = NULL;
+                       }
                        rec = cursor->scan.node;        /* next node */
                        while (rec) {
                                if (hammer_rec_scan_cmp(rec, cursor) != 0)
@@ -952,8 +1017,8 @@ hammer_ip_next(hammer_cursor_t cursor)
                                rec = hammer_rec_rb_tree_RB_NEXT(rec);
                        }
                        if (cursor->iprec) {
+                               KKASSERT(cursor->iprec == rec);
                                cursor->flags &= ~HAMMER_CURSOR_ATEMEM;
-                               hammer_ref(&cursor->iprec->lock);
                                cursor->scan.node =
                                        hammer_rec_rb_tree_RB_NEXT(rec);
                        } else {
@@ -1050,12 +1115,15 @@ hammer_ip_delete_range(hammer_transaction_t trans, hammer_inode_t ip,
        int error;
        int64_t off;
 
-       hammer_init_cursor_ip(&cursor, ip);
+retry:
+       hammer_init_cursor_hmp(&cursor, &ip->cache[0], ip->hmp);
 
        cursor.key_beg.obj_id = ip->obj_id;
-       cursor.key_beg.create_tid = ip->obj_asof;
+       cursor.key_beg.create_tid = 0;
        cursor.key_beg.delete_tid = 0;
        cursor.key_beg.obj_type = 0;
+       cursor.asof = ip->obj_asof;
+       cursor.flags |= HAMMER_CURSOR_ASOF;
 
        cursor.key_end = cursor.key_beg;
        if (ip->ino_rec.base.base.obj_type == HAMMER_OBJTYPE_DBFILE) {
@@ -1130,11 +1198,9 @@ hammer_ip_delete_range(hammer_transaction_t trans, hammer_inode_t ip,
                         * we missing a + 1 somewhere?  Note that ran_end
                         * could overflow.
                         */
-                       if (base->key > ran_end) {
-                               if (base->key - rec->base.data_len > ran_end) {
-                                       kprintf("right edge OOB\n");
+                       if (base->key - 1 > ran_end) {
+                               if (base->key - rec->base.data_len > ran_end)
                                        break;
-                               }
                                panic("hammer right edge case\n");
                        }
                }
@@ -1152,11 +1218,17 @@ hammer_ip_delete_range(hammer_transaction_t trans, hammer_inode_t ip,
                error = hammer_ip_next(&cursor);
        }
        hammer_done_cursor(&cursor);
+       if (error == EDEADLK)
+               goto retry;
        if (error == ENOENT)
                error = 0;
        return(error);
 }
 
+/*
+ * Delete all records associated with an inode except the inode record
+ * itself.
+ */
 int
 hammer_ip_delete_range_all(hammer_transaction_t trans, hammer_inode_t ip)
 {
@@ -1165,20 +1237,22 @@ hammer_ip_delete_range_all(hammer_transaction_t trans, hammer_inode_t ip)
        hammer_base_elm_t base;
        int error;
 
-       hammer_init_cursor_ip(&cursor, ip);
+retry:
+       hammer_init_cursor_hmp(&cursor, &ip->cache[0], ip->hmp);
 
        cursor.key_beg.obj_id = ip->obj_id;
-       cursor.key_beg.create_tid = ip->obj_asof;
+       cursor.key_beg.create_tid = 0;
        cursor.key_beg.delete_tid = 0;
        cursor.key_beg.obj_type = 0;
-       cursor.key_beg.rec_type = 0;
+       cursor.key_beg.rec_type = HAMMER_RECTYPE_INODE + 1;
        cursor.key_beg.key = HAMMER_MIN_KEY;
 
        cursor.key_end = cursor.key_beg;
        cursor.key_end.rec_type = 0xFFFF;
        cursor.key_end.key = HAMMER_MAX_KEY;
 
-       cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE;
+       cursor.asof = ip->obj_asof;
+       cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE | HAMMER_CURSOR_ASOF;
 
        error = hammer_ip_first(&cursor, ip);
 
@@ -1204,13 +1278,18 @@ hammer_ip_delete_range_all(hammer_transaction_t trans, hammer_inode_t ip)
                error = hammer_ip_next(&cursor);
        }
        hammer_done_cursor(&cursor);
+       if (error == EDEADLK)
+               goto retry;
        if (error == ENOENT)
                error = 0;
        return(error);
 }
 
 /*
- * Delete the record at the current cursor
+ * Delete the record at the current cursor.
+ *
+ * NOTE: This can return EDEADLK, requiring the caller to terminate the
+ * cursor and retry.
  */
 int
 hammer_ip_delete_record(hammer_cursor_t cursor, hammer_tid_t tid)
@@ -1222,9 +1301,8 @@ hammer_ip_delete_record(hammer_cursor_t cursor, hammer_tid_t tid)
        /*
         * In-memory (unsynchronized) records can simply be freed.
         */
-       cursor->flags &= ~HAMMER_CURSOR_DELBTREE;
        if (cursor->record == &cursor->iprec->rec) {
-               hammer_free_mem_record(cursor->iprec); /* XXX */
+               cursor->iprec->flags |= HAMMER_RECF_DELETED;
                return(0);
        }
 
@@ -1236,11 +1314,17 @@ hammer_ip_delete_record(hammer_cursor_t cursor, hammer_tid_t tid)
        hmp = cursor->node->cluster->volume->hmp;
 
        if (error == 0) {
-               elm = &cursor->node->ondisk->elms[cursor->index];
-               cursor->record->base.base.delete_tid = tid;
-               elm->leaf.base.delete_tid = tid;
                hammer_modify_buffer(cursor->record_buffer);
-               hammer_modify_node(cursor->node);
+               cursor->record->base.base.delete_tid = tid;
+
+               error = hammer_cursor_upgrade(cursor);
+               if (error == 0) {
+                       hammer_modify_node(cursor->node);
+                       elm = &cursor->node->ondisk->elms[cursor->index];
+                       elm->leaf.base.delete_tid = tid;
+                       hammer_update_syncid(cursor->record_buffer->cluster,
+                                            tid);
+               }
        }
 
        /*
@@ -1282,10 +1366,45 @@ hammer_ip_delete_record(hammer_cursor_t cursor, hammer_tid_t tid)
                }
                hammer_rel_cluster(cluster, 0);
                if (error) {
-                       kprintf("hammer_ip_delete_record: unable to physically delete the record!\n");
+                       panic("hammer_ip_delete_record: unable to physically delete the record!\n");
                        error = 0;
                }
        }
        return(error);
 }
 
+/*
+ * Determine whether a directory is empty or not.  Returns 0 if the directory
+ * is empty, ENOTEMPTY if it isn't, plus other possible errors.
+ */
+int
+hammer_ip_check_directory_empty(hammer_transaction_t trans, hammer_inode_t ip)
+{
+       struct hammer_cursor cursor;
+       int error;
+
+       hammer_init_cursor_hmp(&cursor, &ip->cache[0], ip->hmp);
+
+       cursor.key_beg.obj_id = ip->obj_id;
+       cursor.key_beg.create_tid = 0;
+       cursor.key_beg.delete_tid = 0;
+       cursor.key_beg.obj_type = 0;
+       cursor.key_beg.rec_type = HAMMER_RECTYPE_INODE + 1;
+       cursor.key_beg.key = HAMMER_MIN_KEY;
+
+       cursor.key_end = cursor.key_beg;
+       cursor.key_end.rec_type = 0xFFFF;
+       cursor.key_end.key = HAMMER_MAX_KEY;
+
+       cursor.asof = ip->obj_asof;
+       cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE | HAMMER_CURSOR_ASOF;
+
+       error = hammer_ip_first(&cursor, ip);
+       if (error == ENOENT)
+               error = 0;
+       else if (error == 0)
+               error = ENOTEMPTY;
+       hammer_done_cursor(&cursor);
+       return(error);
+}
+