diff --git a/Documentation/filesystems/caching/backend-api.txt b/Documentation/filesystems/caching/backend-api.txt index d78bab9622c6..277d1e810670 100644 --- a/Documentation/filesystems/caching/backend-api.txt +++ b/Documentation/filesystems/caching/backend-api.txt @@ -299,6 +299,15 @@ performed on the denizens of the cache. These are held in a structure of type: enough space in the cache to permit this. + (*) Check coherency state of an object [mandatory]: + + int (*check_consistency)(struct fscache_object *object) + + This method is called to have the cache check the saved auxiliary data of + the object against the netfs's idea of the state. 0 should be returned + if they're consistent and -ESTALE otherwise. -ENOMEM and -ERESTARTSYS + may also be returned. + (*) Update object [mandatory]: int (*update_object)(struct fscache_object *object) diff --git a/Documentation/filesystems/caching/netfs-api.txt b/Documentation/filesystems/caching/netfs-api.txt index 97e6c0ecc5ef..11a0a40ce445 100644 --- a/Documentation/filesystems/caching/netfs-api.txt +++ b/Documentation/filesystems/caching/netfs-api.txt @@ -32,7 +32,7 @@ This document contains the following sections: (9) Setting the data file size (10) Page alloc/read/write (11) Page uncaching - (12) Index and data file update + (12) Index and data file consistency (13) Miscellaneous cookie operations (14) Cookie unregistration (15) Index invalidation @@ -433,7 +433,7 @@ to the caller. The attribute adjustment excludes read and write operations. ===================== -PAGE READ/ALLOC/WRITE +PAGE ALLOC/READ/WRITE ===================== And the sixth step is to store and retrieve pages in the cache. There are @@ -499,7 +499,7 @@ Else if there's a copy of the page resident in the cache: (*) An argument that's 0 on success or negative for an error code. If an error occurs, it should be assumed that the page contains no usable - data. + data. fscache_readpages_cancel() may need to be called. end_io_func() will be called in process context if the read is results in an error, but it might be called in interrupt context if the read is @@ -623,6 +623,22 @@ some of the pages being read and some being allocated. Those pages will have been marked appropriately and will need uncaching. +CANCELLATION OF UNREAD PAGES +---------------------------- + +If one or more pages are passed to fscache_read_or_alloc_pages() but not then +read from the cache and also not read from the underlying filesystem then +those pages will need to have any marks and reservations removed. This can be +done by calling: + + void fscache_readpages_cancel(struct fscache_cookie *cookie, + struct list_head *pages); + +prior to returning to the caller. The cookie argument should be as passed to +fscache_read_or_alloc_pages(). Every page in the pages list will be examined +and any that have PG_fscache set will be uncached. + + ============== PAGE UNCACHING ============== @@ -690,9 +706,18 @@ written to the cache and for the cache to finish with the page generally. No error is returned. -========================== -INDEX AND DATA FILE UPDATE -========================== +=============================== +INDEX AND DATA FILE CONSISTENCY +=============================== + +To find out whether auxiliary data for an object is up to data within the +cache, the following function can be called: + + int fscache_check_consistency(struct fscache_cookie *cookie) + +This will call back to the netfs to check whether the auxiliary data associated +with a cookie is correct. It returns 0 if it is and -ESTALE if it isn't; it +may also return -ENOMEM and -ERESTARTSYS. To request an update of the index data for an index or other object, the following function should be called: diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 191cd177fef2..39c51cc7fabc 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1561,11 +1561,12 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) obj_request, obj_request->img_request, obj_request->result, xferred, length); /* - * ENOENT means a hole in the image. We zero-fill the - * entire length of the request. A short read also implies - * zero-fill to the end of the request. Either way we - * update the xferred count to indicate the whole request - * was satisfied. + * ENOENT means a hole in the image. We zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. An error requires the whole + * length of the request to be reported finished with an error + * to the block layer. In each case we update the xferred + * count to indicate the whole request was satisfied. */ rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); if (obj_request->result == -ENOENT) { @@ -1574,14 +1575,13 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) else zero_pages(obj_request->pages, 0, length); obj_request->result = 0; - obj_request->xferred = length; } else if (xferred < length && !obj_request->result) { if (obj_request->type == OBJ_REQUEST_BIO) zero_bio_chain(obj_request->bio_list, xferred); else zero_pages(obj_request->pages, xferred, length); - obj_request->xferred = length; } + obj_request->xferred = length; obj_request_done_set(obj_request); } @@ -2167,9 +2167,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, struct rbd_obj_request *obj_request = NULL; struct rbd_obj_request *next_obj_request; bool write_request = img_request_write_test(img_request); - struct bio *bio_list = 0; + struct bio *bio_list = NULL; unsigned int bio_offset = 0; - struct page **pages = 0; + struct page **pages = NULL; u64 img_offset; u64 resid; u16 opcode; @@ -2207,6 +2207,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_segment_name_free(object_name); if (!obj_request) goto out_unwind; + /* + * set obj_request->img_request before creating the + * osd_request so that it gets the right snapc + */ + rbd_img_obj_request_add(img_request, obj_request); if (type == OBJ_REQUEST_BIO) { unsigned int clone_size; @@ -2248,11 +2253,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, obj_request->pages, length, offset & ~PAGE_MASK, false, false); - /* - * set obj_request->img_request before formatting - * the osd_request so that it gets the right snapc - */ - rbd_img_obj_request_add(img_request, obj_request); if (write_request) rbd_osd_req_format_write(obj_request); else @@ -3706,12 +3706,14 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, if (ret < sizeof (size_buf)) return -ERANGE; - if (order) + if (order) { *order = size_buf.order; + dout(" order %u", (unsigned int)*order); + } *snap_size = le64_to_cpu(size_buf.size); - dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", - (unsigned long long)snap_id, (unsigned int)*order, + dout(" snap_id 0x%016llx snap_size = %llu\n", + (unsigned long long)snap_id, (unsigned long long)*snap_size); return 0; diff --git a/fs/cachefiles/interface.c b/fs/cachefiles/interface.c index d4c1206af9fc..43eb5592cdea 100644 --- a/fs/cachefiles/interface.c +++ b/fs/cachefiles/interface.c @@ -377,6 +377,31 @@ static void cachefiles_sync_cache(struct fscache_cache *_cache) ret); } +/* + * check if the backing cache is updated to FS-Cache + * - called by FS-Cache when evaluates if need to invalidate the cache + */ +static bool cachefiles_check_consistency(struct fscache_operation *op) +{ + struct cachefiles_object *object; + struct cachefiles_cache *cache; + const struct cred *saved_cred; + int ret; + + _enter("{OBJ%x}", op->object->debug_id); + + object = container_of(op->object, struct cachefiles_object, fscache); + cache = container_of(object->fscache.cache, + struct cachefiles_cache, cache); + + cachefiles_begin_secure(cache, &saved_cred); + ret = cachefiles_check_auxdata(object); + cachefiles_end_secure(cache, saved_cred); + + _leave(" = %d", ret); + return ret; +} + /* * notification the attributes on an object have changed * - called with reads/writes excluded by FS-Cache @@ -522,4 +547,5 @@ const struct fscache_cache_ops cachefiles_cache_ops = { .write_page = cachefiles_write_page, .uncache_page = cachefiles_uncache_page, .dissociate_pages = cachefiles_dissociate_pages, + .check_consistency = cachefiles_check_consistency, }; diff --git a/fs/cachefiles/internal.h b/fs/cachefiles/internal.h index 49382519907a..5349473df1b1 100644 --- a/fs/cachefiles/internal.h +++ b/fs/cachefiles/internal.h @@ -235,6 +235,7 @@ extern int cachefiles_set_object_xattr(struct cachefiles_object *object, struct cachefiles_xattr *auxdata); extern int cachefiles_update_object_xattr(struct cachefiles_object *object, struct cachefiles_xattr *auxdata); +extern int cachefiles_check_auxdata(struct cachefiles_object *object); extern int cachefiles_check_object_xattr(struct cachefiles_object *object, struct cachefiles_xattr *auxdata); extern int cachefiles_remove_object_xattr(struct cachefiles_cache *cache, diff --git a/fs/cachefiles/xattr.c b/fs/cachefiles/xattr.c index 2476e5162609..34c88b83e39f 100644 --- a/fs/cachefiles/xattr.c +++ b/fs/cachefiles/xattr.c @@ -156,6 +156,42 @@ int cachefiles_update_object_xattr(struct cachefiles_object *object, return ret; } +/* + * check the consistency between the backing cache and the FS-Cache cookie + */ +int cachefiles_check_auxdata(struct cachefiles_object *object) +{ + struct cachefiles_xattr *auxbuf; + struct dentry *dentry = object->dentry; + unsigned int dlen; + int ret; + + ASSERT(dentry); + ASSERT(dentry->d_inode); + ASSERT(object->fscache.cookie->def->check_aux); + + auxbuf = kmalloc(sizeof(struct cachefiles_xattr) + 512, GFP_KERNEL); + if (!auxbuf) + return -ENOMEM; + + auxbuf->len = vfs_getxattr(dentry, cachefiles_xattr_cache, + &auxbuf->type, 512 + 1); + if (auxbuf->len < 1) + return -ESTALE; + + if (auxbuf->type != object->fscache.cookie->def->type) + return -ESTALE; + + dlen = auxbuf->len - 1; + ret = fscache_check_aux(&object->fscache, &auxbuf->data, dlen); + + kfree(auxbuf); + if (ret != FSCACHE_CHECKAUX_OKAY) + return -ESTALE; + + return 0; +} + /* * check the state xattr on a cache file * - return -ESTALE if the object should be deleted diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index 49bc78243db9..ac9a2ef5bb9b 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -16,3 +16,12 @@ config CEPH_FS If unsure, say N. +if CEPH_FS +config CEPH_FSCACHE + bool "Enable Ceph client caching support" + depends on CEPH_FS=m && FSCACHE || CEPH_FS=y && FSCACHE=y + help + Choose Y here to enable persistent, read-only local + caching support for Ceph clients using FS-Cache + +endif diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index bd352125e829..32e30106a2f0 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -9,3 +9,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ mds_client.o mdsmap.o strings.o ceph_frag.o \ debugfs.o +ceph-$(CONFIG_CEPH_FSCACHE) += cache.o diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5318a3b704f6..6df8bd481425 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -11,6 +11,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include /* @@ -70,15 +71,16 @@ static int ceph_set_page_dirty(struct page *page) struct address_space *mapping = page->mapping; struct inode *inode; struct ceph_inode_info *ci; - int undo = 0; struct ceph_snap_context *snapc; + int ret; if (unlikely(!mapping)) return !TestSetPageDirty(page); - if (TestSetPageDirty(page)) { + if (PageDirty(page)) { dout("%p set_page_dirty %p idx %lu -- already dirty\n", mapping->host, page, page->index); + BUG_ON(!PagePrivate(page)); return 0; } @@ -107,35 +109,19 @@ static int ceph_set_page_dirty(struct page *page) snapc, snapc->seq, snapc->num_snaps); spin_unlock(&ci->i_ceph_lock); - /* now adjust page */ - spin_lock_irq(&mapping->tree_lock); - if (page->mapping) { /* Race with truncate? */ - WARN_ON_ONCE(!PageUptodate(page)); - account_page_dirtied(page, page->mapping); - radix_tree_tag_set(&mapping->page_tree, - page_index(page), PAGECACHE_TAG_DIRTY); + /* + * Reference snap context in page->private. Also set + * PagePrivate so that we get invalidatepage callback. + */ + BUG_ON(PagePrivate(page)); + page->private = (unsigned long)snapc; + SetPagePrivate(page); - /* - * Reference snap context in page->private. Also set - * PagePrivate so that we get invalidatepage callback. - */ - page->private = (unsigned long)snapc; - SetPagePrivate(page); - } else { - dout("ANON set_page_dirty %p (raced truncate?)\n", page); - undo = 1; - } + ret = __set_page_dirty_nobuffers(page); + WARN_ON(!PageLocked(page)); + WARN_ON(!page->mapping); - spin_unlock_irq(&mapping->tree_lock); - - if (undo) - /* whoops, we failed to dirty the page */ - ceph_put_wrbuffer_cap_refs(ci, 1, snapc); - - __mark_inode_dirty(mapping->host, I_DIRTY_PAGES); - - BUG_ON(!PageDirty(page)); - return 1; + return ret; } /* @@ -150,11 +136,19 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset, struct ceph_inode_info *ci; struct ceph_snap_context *snapc = page_snap_context(page); - BUG_ON(!PageLocked(page)); - BUG_ON(!PagePrivate(page)); - BUG_ON(!page->mapping); - inode = page->mapping->host; + ci = ceph_inode(inode); + + if (offset != 0 || length != PAGE_CACHE_SIZE) { + dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n", + inode, page, page->index, offset, length); + return; + } + + ceph_invalidate_fscache_page(inode, page); + + if (!PagePrivate(page)) + return; /* * We can get non-dirty pages here due to races between @@ -164,31 +158,28 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset, if (!PageDirty(page)) pr_err("%p invalidatepage %p page not dirty\n", inode, page); - if (offset == 0 && length == PAGE_CACHE_SIZE) - ClearPageChecked(page); + ClearPageChecked(page); - ci = ceph_inode(inode); - if (offset == 0 && length == PAGE_CACHE_SIZE) { - dout("%p invalidatepage %p idx %lu full dirty page\n", - inode, page, page->index); - ceph_put_wrbuffer_cap_refs(ci, 1, snapc); - ceph_put_snap_context(snapc); - page->private = 0; - ClearPagePrivate(page); - } else { - dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n", - inode, page, page->index, offset, length); - } + dout("%p invalidatepage %p idx %lu full dirty page\n", + inode, page, page->index); + + ceph_put_wrbuffer_cap_refs(ci, 1, snapc); + ceph_put_snap_context(snapc); + page->private = 0; + ClearPagePrivate(page); } -/* just a sanity check */ static int ceph_releasepage(struct page *page, gfp_t g) { struct inode *inode = page->mapping ? page->mapping->host : NULL; dout("%p releasepage %p idx %lu\n", inode, page, page->index); WARN_ON(PageDirty(page)); - WARN_ON(PagePrivate(page)); - return 0; + + /* Can we release the page from the cache? */ + if (!ceph_release_fscache_page(page, g)) + return 0; + + return !PagePrivate(page); } /* @@ -198,11 +189,16 @@ static int readpage_nounlock(struct file *filp, struct page *page) { struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = + struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; int err = 0; u64 len = PAGE_CACHE_SIZE; + err = ceph_readpage_from_fscache(inode, page); + + if (err == 0) + goto out; + dout("readpage inode %p file %p page %p index %lu\n", inode, filp, page, page->index); err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, @@ -220,6 +216,9 @@ static int readpage_nounlock(struct file *filp, struct page *page) } SetPageUptodate(page); + if (err == 0) + ceph_readpage_to_fscache(inode, page); + out: return err < 0 ? err : 0; } @@ -262,6 +261,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) page->index); flush_dcache_page(page); SetPageUptodate(page); + ceph_readpage_to_fscache(inode, page); unlock_page(page); page_cache_release(page); bytes -= PAGE_CACHE_SIZE; @@ -331,11 +331,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) page = list_entry(page_list->prev, struct page, lru); BUG_ON(PageLocked(page)); list_del(&page->lru); - + dout("start_read %p adding %p idx %lu\n", inode, page, page->index); if (add_to_page_cache_lru(page, &inode->i_data, page->index, GFP_NOFS)) { + ceph_fscache_uncache_page(inode, page); page_cache_release(page); dout("start_read %p add_to_page_cache failed %p\n", inode, page); @@ -378,6 +379,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, int rc = 0; int max = 0; + rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, + &nr_pages); + + if (rc == 0) + goto out; + if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; @@ -392,6 +399,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, BUG_ON(rc == 0); } out: + ceph_fscache_readpages_cancel(inode, page_list); + dout("readpages %p file %p ret %d\n", inode, file, rc); return rc; } @@ -497,6 +506,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); + ceph_readpage_to_fscache(inode, page); + set_page_writeback(page); err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout, snapc, @@ -552,7 +563,6 @@ static void ceph_release_pages(struct page **pages, int num) pagevec_release(&pvec); } - /* * async writeback completion handler. * diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c new file mode 100644 index 000000000000..6bfe65e0b038 --- /dev/null +++ b/fs/ceph/cache.c @@ -0,0 +1,398 @@ +/* + * Ceph cache definitions. + * + * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved. + * Written by Milosz Tanski (milosz@adfin.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to: + * Free Software Foundation + * 51 Franklin Street, Fifth Floor + * Boston, MA 02111-1301 USA + * + */ + +#include "super.h" +#include "cache.h" + +struct ceph_aux_inode { + struct timespec mtime; + loff_t size; +}; + +struct fscache_netfs ceph_cache_netfs = { + .name = "ceph", + .version = 0, +}; + +static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data, + void *buffer, uint16_t maxbuf) +{ + const struct ceph_fs_client* fsc = cookie_netfs_data; + uint16_t klen; + + klen = sizeof(fsc->client->fsid); + if (klen > maxbuf) + return 0; + + memcpy(buffer, &fsc->client->fsid, klen); + return klen; +} + +static const struct fscache_cookie_def ceph_fscache_fsid_object_def = { + .name = "CEPH.fsid", + .type = FSCACHE_COOKIE_TYPE_INDEX, + .get_key = ceph_fscache_session_get_key, +}; + +int ceph_fscache_register(void) +{ + return fscache_register_netfs(&ceph_cache_netfs); +} + +void ceph_fscache_unregister(void) +{ + fscache_unregister_netfs(&ceph_cache_netfs); +} + +int ceph_fscache_register_fs(struct ceph_fs_client* fsc) +{ + fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index, + &ceph_fscache_fsid_object_def, + fsc); + + if (fsc->fscache == NULL) { + pr_err("Unable to resgister fsid: %p fscache cookie", fsc); + return 0; + } + + fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1); + if (fsc->revalidate_wq == NULL) + return -ENOMEM; + + return 0; +} + +static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data, + void *buffer, uint16_t maxbuf) +{ + const struct ceph_inode_info* ci = cookie_netfs_data; + uint16_t klen; + + /* use ceph virtual inode (id + snaphot) */ + klen = sizeof(ci->i_vino); + if (klen > maxbuf) + return 0; + + memcpy(buffer, &ci->i_vino, klen); + return klen; +} + +static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data, + void *buffer, uint16_t bufmax) +{ + struct ceph_aux_inode aux; + const struct ceph_inode_info* ci = cookie_netfs_data; + const struct inode* inode = &ci->vfs_inode; + + memset(&aux, 0, sizeof(aux)); + aux.mtime = inode->i_mtime; + aux.size = inode->i_size; + + memcpy(buffer, &aux, sizeof(aux)); + + return sizeof(aux); +} + +static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data, + uint64_t *size) +{ + const struct ceph_inode_info* ci = cookie_netfs_data; + const struct inode* inode = &ci->vfs_inode; + + *size = inode->i_size; +} + +static enum fscache_checkaux ceph_fscache_inode_check_aux( + void *cookie_netfs_data, const void *data, uint16_t dlen) +{ + struct ceph_aux_inode aux; + struct ceph_inode_info* ci = cookie_netfs_data; + struct inode* inode = &ci->vfs_inode; + + if (dlen != sizeof(aux)) + return FSCACHE_CHECKAUX_OBSOLETE; + + memset(&aux, 0, sizeof(aux)); + aux.mtime = inode->i_mtime; + aux.size = inode->i_size; + + if (memcmp(data, &aux, sizeof(aux)) != 0) + return FSCACHE_CHECKAUX_OBSOLETE; + + dout("ceph inode 0x%p cached okay", ci); + return FSCACHE_CHECKAUX_OKAY; +} + +static void ceph_fscache_inode_now_uncached(void* cookie_netfs_data) +{ + struct ceph_inode_info* ci = cookie_netfs_data; + struct pagevec pvec; + pgoff_t first; + int loop, nr_pages; + + pagevec_init(&pvec, 0); + first = 0; + + dout("ceph inode 0x%p now uncached", ci); + + while (1) { + nr_pages = pagevec_lookup(&pvec, ci->vfs_inode.i_mapping, first, + PAGEVEC_SIZE - pagevec_count(&pvec)); + + if (!nr_pages) + break; + + for (loop = 0; loop < nr_pages; loop++) + ClearPageFsCache(pvec.pages[loop]); + + first = pvec.pages[nr_pages - 1]->index + 1; + + pvec.nr = nr_pages; + pagevec_release(&pvec); + cond_resched(); + } +} + +static const struct fscache_cookie_def ceph_fscache_inode_object_def = { + .name = "CEPH.inode", + .type = FSCACHE_COOKIE_TYPE_DATAFILE, + .get_key = ceph_fscache_inode_get_key, + .get_attr = ceph_fscache_inode_get_attr, + .get_aux = ceph_fscache_inode_get_aux, + .check_aux = ceph_fscache_inode_check_aux, + .now_uncached = ceph_fscache_inode_now_uncached, +}; + +void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, + struct ceph_inode_info* ci) +{ + struct inode* inode = &ci->vfs_inode; + + /* No caching for filesystem */ + if (fsc->fscache == NULL) + return; + + /* Only cache for regular files that are read only */ + if ((ci->vfs_inode.i_mode & S_IFREG) == 0) + return; + + /* Avoid multiple racing open requests */ + mutex_lock(&inode->i_mutex); + + if (ci->fscache) + goto done; + + ci->fscache = fscache_acquire_cookie(fsc->fscache, + &ceph_fscache_inode_object_def, + ci); +done: + mutex_unlock(&inode->i_mutex); + +} + +void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci) +{ + struct fscache_cookie* cookie; + + if ((cookie = ci->fscache) == NULL) + return; + + ci->fscache = NULL; + + fscache_uncache_all_inode_pages(cookie, &ci->vfs_inode); + fscache_relinquish_cookie(cookie, 0); +} + +static void ceph_vfs_readpage_complete(struct page *page, void *data, int error) +{ + if (!error) + SetPageUptodate(page); +} + +static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int error) +{ + if (!error) + SetPageUptodate(page); + + unlock_page(page); +} + +static inline int cache_valid(struct ceph_inode_info *ci) +{ + return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) && + (ci->i_fscache_gen == ci->i_rdcache_gen)); +} + + +/* Atempt to read from the fscache, + * + * This function is called from the readpage_nounlock context. DO NOT attempt to + * unlock the page here (or in the callback). + */ +int ceph_readpage_from_fscache(struct inode *inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!cache_valid(ci)) + return -ENOBUFS; + + ret = fscache_read_or_alloc_page(ci->fscache, page, + ceph_vfs_readpage_complete, NULL, + GFP_KERNEL); + + switch (ret) { + case 0: /* Page found */ + dout("page read submitted\n"); + return 0; + case -ENOBUFS: /* Pages were not found, and can't be */ + case -ENODATA: /* Pages were not found */ + dout("page/inode not in cache\n"); + return ret; + default: + dout("%s: unknown error ret = %i\n", __func__, ret); + return ret; + } +} + +int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!cache_valid(ci)) + return -ENOBUFS; + + ret = fscache_read_or_alloc_pages(ci->fscache, mapping, pages, nr_pages, + ceph_vfs_readpage_complete_unlock, + NULL, mapping_gfp_mask(mapping)); + + switch (ret) { + case 0: /* All pages found */ + dout("all-page read submitted\n"); + return 0; + case -ENOBUFS: /* Some pages were not found, and can't be */ + case -ENODATA: /* some pages were not found */ + dout("page/inode not in cache\n"); + return ret; + default: + dout("%s: unknown error ret = %i\n", __func__, ret); + return ret; + } +} + +void ceph_readpage_to_fscache(struct inode *inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!PageFsCache(page)) + return; + + if (!cache_valid(ci)) + return; + + ret = fscache_write_page(ci->fscache, page, GFP_KERNEL); + if (ret) + fscache_uncache_page(ci->fscache, page); +} + +void ceph_invalidate_fscache_page(struct inode* inode, struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + fscache_wait_on_page_write(ci->fscache, page); + fscache_uncache_page(ci->fscache, page); +} + +void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc) +{ + if (fsc->revalidate_wq) + destroy_workqueue(fsc->revalidate_wq); + + fscache_relinquish_cookie(fsc->fscache, 0); + fsc->fscache = NULL; +} + +static void ceph_revalidate_work(struct work_struct *work) +{ + int issued; + u32 orig_gen; + struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, + i_revalidate_work); + struct inode *inode = &ci->vfs_inode; + + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + orig_gen = ci->i_rdcache_gen; + spin_unlock(&ci->i_ceph_lock); + + if (!(issued & CEPH_CAP_FILE_CACHE)) { + dout("revalidate_work lost cache before validation %p\n", + inode); + goto out; + } + + if (!fscache_check_consistency(ci->fscache)) + fscache_invalidate(ci->fscache); + + spin_lock(&ci->i_ceph_lock); + /* Update the new valid generation (backwards sanity check too) */ + if (orig_gen > ci->i_fscache_gen) { + ci->i_fscache_gen = orig_gen; + } + spin_unlock(&ci->i_ceph_lock); + +out: + iput(&ci->vfs_inode); +} + +void ceph_queue_revalidate(struct inode *inode) +{ + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_inode_info *ci = ceph_inode(inode); + + if (fsc->revalidate_wq == NULL || ci->fscache == NULL) + return; + + ihold(inode); + + if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq, + &ci->i_revalidate_work)) { + dout("ceph_queue_revalidate %p\n", inode); + } else { + dout("ceph_queue_revalidate %p failed\n)", inode); + iput(inode); + } +} + +void ceph_fscache_inode_init(struct ceph_inode_info *ci) +{ + ci->fscache = NULL; + /* The first load is verifed cookie open time */ + ci->i_fscache_gen = 1; + INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work); +} diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h new file mode 100644 index 000000000000..ba949408a336 --- /dev/null +++ b/fs/ceph/cache.h @@ -0,0 +1,159 @@ +/* + * Ceph cache definitions. + * + * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved. + * Written by Milosz Tanski (milosz@adfin.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to: + * Free Software Foundation + * 51 Franklin Street, Fifth Floor + * Boston, MA 02111-1301 USA + * + */ + +#ifndef _CEPH_CACHE_H +#define _CEPH_CACHE_H + +#ifdef CONFIG_CEPH_FSCACHE + +extern struct fscache_netfs ceph_cache_netfs; + +int ceph_fscache_register(void); +void ceph_fscache_unregister(void); + +int ceph_fscache_register_fs(struct ceph_fs_client* fsc); +void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc); + +void ceph_fscache_inode_init(struct ceph_inode_info *ci); +void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc, + struct ceph_inode_info* ci); +void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci); + +int ceph_readpage_from_fscache(struct inode *inode, struct page *page); +int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages); +void ceph_readpage_to_fscache(struct inode *inode, struct page *page); +void ceph_invalidate_fscache_page(struct inode* inode, struct page *page); +void ceph_queue_revalidate(struct inode *inode); + +static inline void ceph_fscache_invalidate(struct inode *inode) +{ + fscache_invalidate(ceph_inode(inode)->fscache); +} + +static inline void ceph_fscache_uncache_page(struct inode *inode, + struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_uncache_page(ci->fscache, page); +} + +static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) +{ + struct inode* inode = page->mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_maybe_release_page(ci->fscache, page, gfp); +} + +static inline void ceph_fscache_readpages_cancel(struct inode *inode, + struct list_head *pages) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + return fscache_readpages_cancel(ci->fscache, pages); +} + +#else + +static inline int ceph_fscache_register(void) +{ + return 0; +} + +static inline void ceph_fscache_unregister(void) +{ +} + +static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc) +{ + return 0; +} + +static inline void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc) +{ +} + +static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci) +{ +} + +static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc, + struct ceph_inode_info* ci) +{ +} + +static inline void ceph_fscache_uncache_page(struct inode *inode, + struct page *pages) +{ +} + +static inline int ceph_readpage_from_fscache(struct inode* inode, + struct page *page) +{ + return -ENOBUFS; +} + +static inline int ceph_readpages_from_fscache(struct inode *inode, + struct address_space *mapping, + struct list_head *pages, + unsigned *nr_pages) +{ + return -ENOBUFS; +} + +static inline void ceph_readpage_to_fscache(struct inode *inode, + struct page *page) +{ +} + +static inline void ceph_fscache_invalidate(struct inode *inode) +{ +} + +static inline void ceph_invalidate_fscache_page(struct inode *inode, + struct page *page) +{ +} + +static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci) +{ +} + +static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) +{ + return 1; +} + +static inline void ceph_fscache_readpages_cancel(struct inode *inode, + struct list_head *pages) +{ +} + +static inline void ceph_queue_revalidate(struct inode *inode) +{ +} + +#endif + +#endif diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 25442b40c25a..13976c33332e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -10,6 +10,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include #include @@ -479,8 +480,9 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * i_rdcache_gen. */ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) + (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { ci->i_rdcache_gen++; + } /* * if we are newly issued FILE_SHARED, mark dir not complete; we @@ -2072,19 +2074,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, /* finish pending truncate */ while (ci->i_truncate_pending) { spin_unlock(&ci->i_ceph_lock); - if (!(need & CEPH_CAP_FILE_WR)) - mutex_lock(&inode->i_mutex); __ceph_do_pending_vmtruncate(inode); - if (!(need & CEPH_CAP_FILE_WR)) - mutex_unlock(&inode->i_mutex); spin_lock(&ci->i_ceph_lock); } - if (need & CEPH_CAP_FILE_WR) { + have = __ceph_caps_issued(ci, &implemented); + + if (have & need & CEPH_CAP_FILE_WR) { if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); - if (endoff > ci->i_wanted_max_size) { + if (endoff > ci->i_requested_max_size) { *check_max = 1; ret = 1; } @@ -2099,7 +2099,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, goto out; } } - have = __ceph_caps_issued(ci, &implemented); if ((have & need) == need) { /* @@ -2141,14 +2140,17 @@ static void check_max_size(struct inode *inode, loff_t endoff) /* do we need to explicitly request a larger max_size? */ spin_lock(&ci->i_ceph_lock); - if ((endoff >= ci->i_max_size || - endoff > (inode->i_size << 1)) && - endoff > ci->i_wanted_max_size) { + if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { dout("write %p at large endoff %llu, req max_size\n", inode, endoff); ci->i_wanted_max_size = endoff; - check = 1; } + /* duplicate ceph_check_caps()'s logic */ + if (ci->i_auth_cap && + (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && + ci->i_wanted_max_size > ci->i_max_size && + ci->i_wanted_max_size > ci->i_requested_max_size) + check = 1; spin_unlock(&ci->i_ceph_lock); if (check) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); @@ -2333,6 +2335,38 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, iput(inode); } +/* + * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. + */ +static void invalidate_aliases(struct inode *inode) +{ + struct dentry *dn, *prev = NULL; + + dout("invalidate_aliases inode %p\n", inode); + d_prune_aliases(inode); + /* + * For non-directory inode, d_find_alias() only returns + * connected dentry. After calling d_invalidate(), the + * dentry become disconnected. + * + * For directory inode, d_find_alias() can return + * disconnected dentry. But directory inode should have + * one alias at most. + */ + while ((dn = d_find_alias(inode))) { + if (dn == prev) { + dput(dn); + break; + } + d_invalidate(dn); + if (prev) + dput(prev); + prev = dn; + } + if (prev) + dput(prev); +} + /* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) @@ -2361,8 +2395,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, int check_caps = 0; int wake = 0; int writeback = 0; - int revoked_rdcache = 0; int queue_invalidate = 0; + int deleted_inode = 0; + int queue_revalidate = 0; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2377,9 +2412,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { - if (try_nonblocking_invalidate(inode) == 0) { - revoked_rdcache = 1; - } else { + if (try_nonblocking_invalidate(inode)) { /* there were locked pages.. invalidate later in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { @@ -2387,6 +2420,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ci->i_rdcache_revoking = ci->i_rdcache_gen; } } + + ceph_fscache_invalidate(inode); } /* side effects now are allowed */ @@ -2407,8 +2442,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, from_kgid(&init_user_ns, inode->i_gid)); } - if ((issued & CEPH_CAP_LINK_EXCL) == 0) + if ((issued & CEPH_CAP_LINK_EXCL) == 0) { set_nlink(inode, le32_to_cpu(grant->nlink)); + if (inode->i_nlink == 0 && + (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) + deleted_inode = 1; + } if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len); @@ -2424,6 +2463,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } } + /* Do we need to revalidate our fscache cookie. Don't bother on the + * first cache cap as we already validate at cookie creation time. */ + if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) + queue_revalidate = 1; + /* size/ctime/mtime/atime? */ ceph_fill_file_size(inode, issued, le32_to_cpu(grant->truncate_seq), @@ -2508,6 +2552,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, BUG_ON(cap->issued & ~cap->implemented); spin_unlock(&ci->i_ceph_lock); + if (writeback) /* * queue inode for writeback: we can't actually call @@ -2517,6 +2562,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_queue_writeback(inode); if (queue_invalidate) ceph_queue_invalidate(inode); + if (deleted_inode) + invalidate_aliases(inode); + if (queue_revalidate) + ceph_queue_revalidate(inode); if (wake) wake_up_all(&ci->i_cap_wq); @@ -2673,8 +2722,10 @@ static void handle_cap_trunc(struct inode *inode, truncate_seq, truncate_size, size); spin_unlock(&ci->i_ceph_lock); - if (queue_trunc) + if (queue_trunc) { ceph_queue_vmtruncate(inode); + ceph_fscache_invalidate(inode); + } } /* diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index a40ceda47a32..868b61d56cac 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -793,6 +793,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_locked_dir = dir; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + /* release LINK_SHARED on source inode (mds will lock it) */ + req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; err = ceph_mdsc_do_request(mdsc, dir, req); if (err) { d_drop(dentry); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 2ddf061c1c4a..3de89829e2a1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -8,9 +8,11 @@ #include #include #include +#include #include "super.h" #include "mds_client.h" +#include "cache.h" /* * Ceph file operations @@ -68,9 +70,23 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) { struct ceph_file_info *cf; int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_mds_client *mdsc = fsc->mdsc; switch (inode->i_mode & S_IFMT) { case S_IFREG: + /* First file open request creates the cookie, we want to keep + * this cookie around for the filetime of the inode as not to + * have to worry about fscache register / revoke / operation + * races. + * + * Also, if we know the operation is going to invalidate data + * (non readonly) just nuke the cache right away. + */ + ceph_fscache_register_inode_cookie(mdsc->fsc, ci); + if ((fmode & CEPH_FILE_MODE_WR)) + ceph_fscache_invalidate(inode); case S_IFDIR: dout("init_file %p %p 0%o (regular)\n", inode, file, inode->i_mode); @@ -181,6 +197,7 @@ int ceph_open(struct inode *inode, struct file *file) spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } + spin_unlock(&ci->i_ceph_lock); dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); @@ -191,6 +208,7 @@ int ceph_open(struct inode *inode, struct file *file) } req->r_inode = inode; ihold(inode); + req->r_num_caps = 1; if (flags & (O_CREAT|O_TRUNC)) parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); @@ -313,9 +331,9 @@ static int striped_read(struct inode *inode, { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len; + u64 pos, this_len, left; int io_align, page_align; - int left, pages_left; + int pages_left; int read; struct page **page_pos; int ret; @@ -346,47 +364,40 @@ more: ret = 0; hit_stripe = this_len < left; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - if (ret > 0) { - int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; - - if (read < pos - off) { - dout(" zero gap %llu to %llu\n", off + read, pos); - ceph_zero_page_vector_range(page_align + read, - pos - off - read, pages); + if (ret >= 0) { + int didpages; + if (was_short && (pos + ret < inode->i_size)) { + u64 tmp = min(this_len - ret, + inode->i_size - pos - ret); + dout(" zero gap %llu to %llu\n", + pos + ret, pos + ret + tmp); + ceph_zero_page_vector_range(page_align + read + ret, + tmp, pages); + ret += tmp; } + + didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; pos += ret; read = pos - off; left -= ret; page_pos += didpages; pages_left -= didpages; - /* hit stripe? */ - if (left && hit_stripe) + /* hit stripe and need continue*/ + if (left && hit_stripe && pos < inode->i_size) goto more; } - if (was_short) { + if (read > 0) { + ret = read; /* did we bounce off eof? */ if (pos + left > inode->i_size) *checkeof = 1; - - /* zero trailing bytes (inside i_size) */ - if (left > 0 && pos < inode->i_size) { - if (pos + left > inode->i_size) - left = inode->i_size - pos; - - dout("zero tail %d\n", left); - ceph_zero_page_vector_range(page_align + read, left, - pages); - read += left; - } } - if (ret >= 0) - ret = read; dout("striped_read returns %d\n", ret); return ret; } @@ -618,6 +629,8 @@ out: if (check_caps) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); + } else if (ret != -EOLDSNAPC && written > 0) { + ret = written; } return ret; } @@ -659,7 +672,6 @@ again: if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) /* hmm, this isn't really async... */ ret = ceph_sync_read(filp, base, len, ppos, &checkeof); @@ -711,13 +723,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, &ceph_sb_to_client(inode->i_sb)->client->osdc; ssize_t count, written = 0; int err, want, got; - bool hold_mutex; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; mutex_lock(&inode->i_mutex); - hold_mutex = true; err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); if (err) @@ -763,18 +773,31 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); written = ceph_sync_write(file, iov->iov_base, count, pos, &iocb->ki_pos); + if (written == -EOLDSNAPC) { + dout("aio_write %p %llx.%llx %llu~%u" + "got EOLDSNAPC, retrying\n", + inode, ceph_vinop(inode), + pos, (unsigned)iov->iov_len); + mutex_lock(&inode->i_mutex); + goto retry_snap; + } } else { + /* + * No need to acquire the i_truncate_mutex. Because + * the MDS revokes Fwb caps before sending truncate + * message to us. We can't get Fwb cap while there + * are pending vmtruncate. So write and vmtruncate + * can not run at the same time + */ written = generic_file_buffered_write(iocb, iov, nr_segs, pos, &iocb->ki_pos, count, 0); mutex_unlock(&inode->i_mutex); } - hold_mutex = false; if (written >= 0) { int dirty; @@ -798,18 +821,12 @@ retry_snap: written = err; } - if (written == -EOLDSNAPC) { - dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); - mutex_lock(&inode->i_mutex); - hold_mutex = true; - goto retry_snap; - } -out: - if (hold_mutex) - mutex_unlock(&inode->i_mutex); - current->backing_dev_info = NULL; + goto out_unlocked; +out: + mutex_unlock(&inode->i_mutex); +out_unlocked: + current->backing_dev_info = NULL; return written ? written : err; } @@ -822,7 +839,6 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) int ret; mutex_lock(&inode->i_mutex); - __ceph_do_pending_vmtruncate(inode); if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); @@ -871,6 +887,204 @@ out: return offset; } +static inline void ceph_zero_partial_page( + struct inode *inode, loff_t offset, unsigned size) +{ + struct page *page; + pgoff_t index = offset >> PAGE_CACHE_SHIFT; + + page = find_lock_page(inode->i_mapping, index); + if (page) { + wait_on_page_writeback(page); + zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size); + unlock_page(page); + page_cache_release(page); + } +} + +static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset, + loff_t length) +{ + loff_t nearly = round_up(offset, PAGE_CACHE_SIZE); + if (offset < nearly) { + loff_t size = nearly - offset; + if (length < size) + size = length; + ceph_zero_partial_page(inode, offset, size); + offset += size; + length -= size; + } + if (length >= PAGE_CACHE_SIZE) { + loff_t size = round_down(length, PAGE_CACHE_SIZE); + truncate_pagecache_range(inode, offset, offset + size - 1); + offset += size; + length -= size; + } + if (length) + ceph_zero_partial_page(inode, offset, length); +} + +static int ceph_zero_partial_object(struct inode *inode, + loff_t offset, loff_t *length) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_request *req; + int ret = 0; + loff_t zero = 0; + int op; + + if (!length) { + op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE; + length = &zero; + } else { + op = CEPH_OSD_OP_ZERO; + } + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), + offset, length, + 1, op, + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ONDISK, + NULL, 0, 0, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, + &inode->i_mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) { + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + if (ret == -ENOENT) + ret = 0; + } + ceph_osdc_put_request(req); + +out: + return ret; +} + +static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) +{ + int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + s32 stripe_unit = ceph_file_layout_su(ci->i_layout); + s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout); + s32 object_size = ceph_file_layout_object_size(ci->i_layout); + u64 object_set_size = object_size * stripe_count; + u64 nearly, t; + + /* round offset up to next period boundary */ + nearly = offset + object_set_size - 1; + t = nearly; + nearly -= do_div(t, object_set_size); + + while (length && offset < nearly) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + while (length >= object_set_size) { + int i; + loff_t pos = offset; + for (i = 0; i < stripe_count; ++i) { + ret = ceph_zero_partial_object(inode, pos, NULL); + if (ret < 0) + return ret; + pos += stripe_unit; + } + offset += object_set_size; + length -= object_set_size; + } + while (length) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + return ret; +} + +static long ceph_fallocate(struct file *file, int mode, + loff_t offset, loff_t length) +{ + struct ceph_file_info *fi = file->private_data; + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_osd_client *osdc = + &ceph_inode_to_client(inode)->client->osdc; + int want, got = 0; + int dirty; + int ret = 0; + loff_t endoff = 0; + loff_t size; + + if (!S_ISREG(inode->i_mode)) + return -EOPNOTSUPP; + + if (IS_SWAPFILE(inode)) + return -ETXTBSY; + + mutex_lock(&inode->i_mutex); + + if (ceph_snap(inode) != CEPH_NOSNAP) { + ret = -EROFS; + goto unlock; + } + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) && + !(mode & FALLOC_FL_PUNCH_HOLE)) { + ret = -ENOSPC; + goto unlock; + } + + size = i_size_read(inode); + if (!(mode & FALLOC_FL_KEEP_SIZE)) + endoff = offset + length; + + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); + if (ret < 0) + goto unlock; + + if (mode & FALLOC_FL_PUNCH_HOLE) { + if (offset < size) + ceph_zero_pagecache_range(inode, offset, length); + ret = ceph_zero_objects(inode, offset, length); + } else if (endoff > size) { + truncate_pagecache_range(inode, size, -1); + if (ceph_inode_set_size(inode, endoff)) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, NULL); + } + + if (!ret) { + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + ceph_put_cap_refs(ci, got); +unlock: + mutex_unlock(&inode->i_mutex); + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -887,5 +1101,6 @@ const struct file_operations ceph_file_fops = { .splice_write = generic_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, + .fallocate = ceph_fallocate, }; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f3a2abf28a77..8549a48115f7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -12,6 +12,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include /* @@ -344,6 +345,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; + mutex_init(&ci->i_truncate_mutex); ci->i_truncate_seq = 0; ci->i_truncate_size = 0; ci->i_truncate_pending = 0; @@ -377,6 +379,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); + ceph_fscache_inode_init(ci); + return &ci->vfs_inode; } @@ -396,6 +400,8 @@ void ceph_destroy_inode(struct inode *inode) dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); + ceph_fscache_unregister_inode_cookie(ci); + ceph_queue_caps_release(inode); /* @@ -430,7 +436,6 @@ void ceph_destroy_inode(struct inode *inode) call_rcu(&inode->i_rcu, ceph_i_callback); } - /* * Helpers to fill in size, ctime, mtime, and atime. We have to be * careful because either the client or MDS may have more up to date @@ -455,16 +460,20 @@ int ceph_fill_file_size(struct inode *inode, int issued, dout("truncate_seq %u -> %u\n", ci->i_truncate_seq, truncate_seq); ci->i_truncate_seq = truncate_seq; + + /* the MDS should have revoked these caps */ + WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL | + CEPH_CAP_FILE_RD | + CEPH_CAP_FILE_WR | + CEPH_CAP_FILE_LAZYIO)); /* * If we hold relevant caps, or in the case where we're * not the only client referencing this file and we * don't hold those caps, then we need to check whether * the file is either opened or mmaped */ - if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD| - CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| - CEPH_CAP_FILE_EXCL| - CEPH_CAP_FILE_LAZYIO)) || + if ((issued & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_BUFFER)) || mapping_mapped(inode->i_mapping) || __ceph_caps_file_wanted(ci)) { ci->i_truncate_pending++; @@ -478,6 +487,10 @@ int ceph_fill_file_size(struct inode *inode, int issued, truncate_size); ci->i_truncate_size = truncate_size; } + + if (queue_trunc) + ceph_fscache_invalidate(inode); + return queue_trunc; } @@ -1066,7 +1079,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, * complete. */ ceph_set_dentry_offset(req->r_old_dentry); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, + dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); dn = req->r_old_dentry; /* use old_dentry */ @@ -1419,18 +1432,20 @@ static void ceph_invalidate_work(struct work_struct *work) u32 orig_gen; int check = 0; + mutex_lock(&ci->i_truncate_mutex); spin_lock(&ci->i_ceph_lock); dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { /* nevermind! */ spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); goto out; } orig_gen = ci->i_rdcache_gen; spin_unlock(&ci->i_ceph_lock); - truncate_inode_pages(&inode->i_data, 0); + truncate_inode_pages(inode->i_mapping, 0); spin_lock(&ci->i_ceph_lock); if (orig_gen == ci->i_rdcache_gen && @@ -1445,6 +1460,7 @@ static void ceph_invalidate_work(struct work_struct *work) ci->i_rdcache_revoking); } spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); if (check) ceph_check_caps(ci, 0, NULL); @@ -1465,9 +1481,7 @@ static void ceph_vmtruncate_work(struct work_struct *work) struct inode *inode = &ci->vfs_inode; dout("vmtruncate_work %p\n", inode); - mutex_lock(&inode->i_mutex); __ceph_do_pending_vmtruncate(inode); - mutex_unlock(&inode->i_mutex); iput(inode); } @@ -1480,6 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode) struct ceph_inode_info *ci = ceph_inode(inode); ihold(inode); + if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, &ci->i_vmtruncate_work)) { dout("ceph_queue_vmtruncate %p\n", inode); @@ -1500,11 +1515,13 @@ void __ceph_do_pending_vmtruncate(struct inode *inode) u64 to; int wrbuffer_refs, finish = 0; + mutex_lock(&ci->i_truncate_mutex); retry: spin_lock(&ci->i_ceph_lock); if (ci->i_truncate_pending == 0) { dout("__do_pending_vmtruncate %p none pending\n", inode); spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&ci->i_truncate_mutex); return; } @@ -1521,6 +1538,9 @@ retry: goto retry; } + /* there should be no reader or writer */ + WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref); + to = ci->i_truncate_size; wrbuffer_refs = ci->i_wrbuffer_ref; dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode, @@ -1538,13 +1558,14 @@ retry: if (!finish) goto retry; + mutex_unlock(&ci->i_truncate_mutex); + if (wrbuffer_refs == 0) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); wake_up_all(&ci->i_cap_wq); } - /* * symlinks */ @@ -1586,8 +1607,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; - __ceph_do_pending_vmtruncate(inode); - err = inode_change_ok(inode, attr); if (err != 0) return err; @@ -1768,7 +1787,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_cap_string(dirtied), mask); ceph_mdsc_put_request(req); - __ceph_do_pending_vmtruncate(inode); + if (mask & CEPH_SETATTR_SIZE) + __ceph_do_pending_vmtruncate(inode); return err; out: spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index e0b4ef31d3c8..669622fd1ae3 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -196,8 +196,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, &dl.object_no, &dl.object_offset, &olen); - if (r < 0) + if (r < 0) { + up_read(&osdc->map_sem); return -EIO; + } dl.file_offset -= dl.object_offset; dl.object_size = ceph_file_layout_object_size(ci->i_layout); dl.block_size = ceph_file_layout_su(ci->i_layout); @@ -209,8 +211,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, - ceph_file_layout_pg_pool(ci->i_layout)); + r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, + ceph_file_layout_pg_pool(ci->i_layout)); + if (r < 0) { + up_read(&osdc->map_sem); + return r; + } dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); if (dl.osd >= 0) { diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 187bf214444d..b7bda5d9611d 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -414,6 +414,9 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, { struct ceph_mds_session *s; + if (mds >= mdsc->mdsmap->m_max_mds) + return ERR_PTR(-EINVAL); + s = kzalloc(sizeof(*s), GFP_NOFS); if (!s) return ERR_PTR(-ENOMEM); @@ -1028,6 +1031,37 @@ static void remove_session_caps(struct ceph_mds_session *session) { dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, NULL); + + spin_lock(&session->s_cap_lock); + if (session->s_nr_caps > 0) { + struct super_block *sb = session->s_mdsc->fsc->sb; + struct inode *inode; + struct ceph_cap *cap, *prev = NULL; + struct ceph_vino vino; + /* + * iterate_session_caps() skips inodes that are being + * deleted, we need to wait until deletions are complete. + * __wait_on_freeing_inode() is designed for the job, + * but it is not exported, so use lookup inode function + * to access it. + */ + while (!list_empty(&session->s_caps)) { + cap = list_entry(session->s_caps.next, + struct ceph_cap, session_caps); + if (cap == prev) + break; + prev = cap; + vino = cap->ci->i_vino; + spin_unlock(&session->s_cap_lock); + + inode = ceph_find_inode(sb, vino); + iput(inode); + + spin_lock(&session->s_cap_lock); + } + } + spin_unlock(&session->s_cap_lock); + BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); cleanup_cap_releases(session); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 6627b26a800c..6a0951e43044 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -17,6 +17,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include #include @@ -142,6 +143,8 @@ enum { Opt_nodcache, Opt_ino32, Opt_noino32, + Opt_fscache, + Opt_nofscache }; static match_table_t fsopt_tokens = { @@ -167,6 +170,8 @@ static match_table_t fsopt_tokens = { {Opt_nodcache, "nodcache"}, {Opt_ino32, "ino32"}, {Opt_noino32, "noino32"}, + {Opt_fscache, "fsc"}, + {Opt_nofscache, "nofsc"}, {-1, NULL} }; @@ -260,6 +265,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_noino32: fsopt->flags &= ~CEPH_MOUNT_OPT_INO32; break; + case Opt_fscache: + fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE; + break; + case Opt_nofscache: + fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE; + break; default: BUG_ON(token); } @@ -422,6 +433,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",dcache"); else seq_puts(m, ",nodcache"); + if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) + seq_puts(m, ",fsc"); + else + seq_puts(m, ",nofsc"); if (fsopt->wsize) seq_printf(m, ",wsize=%d", fsopt->wsize); @@ -530,11 +545,18 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, if (!fsc->wb_pagevec_pool) goto fail_trunc_wq; + /* setup fscache */ + if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) && + (ceph_fscache_register_fs(fsc) != 0)) + goto fail_fscache; + /* caps */ fsc->min_caps = fsopt->max_readdir; return fsc; +fail_fscache: + ceph_fscache_unregister_fs(fsc); fail_trunc_wq: destroy_workqueue(fsc->trunc_wq); fail_pg_inv_wq: @@ -554,6 +576,8 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) { dout("destroy_fs_client %p\n", fsc); + ceph_fscache_unregister_fs(fsc); + destroy_workqueue(fsc->wb_wq); destroy_workqueue(fsc->pg_inv_wq); destroy_workqueue(fsc->trunc_wq); @@ -588,6 +612,8 @@ static void ceph_inode_init_once(void *foo) static int __init init_caches(void) { + int error = -ENOMEM; + ceph_inode_cachep = kmem_cache_create("ceph_inode_info", sizeof(struct ceph_inode_info), __alignof__(struct ceph_inode_info), @@ -611,15 +637,17 @@ static int __init init_caches(void) if (ceph_file_cachep == NULL) goto bad_file; - return 0; + if ((error = ceph_fscache_register())) + goto bad_file; + return 0; bad_file: kmem_cache_destroy(ceph_dentry_cachep); bad_dentry: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); - return -ENOMEM; + return error; } static void destroy_caches(void) @@ -629,10 +657,13 @@ static void destroy_caches(void) * destroy cache. */ rcu_barrier(); + kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); + + ceph_fscache_unregister(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index cbded572345e..6014b0a3c405 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -16,6 +16,10 @@ #include +#ifdef CONFIG_CEPH_FSCACHE +#include +#endif + /* f_type in struct statfs */ #define CEPH_SUPER_MAGIC 0x00c36400 @@ -29,6 +33,7 @@ #define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */ #define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ +#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES) @@ -90,6 +95,11 @@ struct ceph_fs_client { struct dentry *debugfs_bdi; struct dentry *debugfs_mdsc, *debugfs_mdsmap; #endif + +#ifdef CONFIG_CEPH_FSCACHE + struct fscache_cookie *fscache; + struct workqueue_struct *revalidate_wq; +#endif }; @@ -288,6 +298,7 @@ struct ceph_inode_info { int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ + struct mutex i_truncate_mutex; u32 i_truncate_seq; /* last truncate to smaller size */ u64 i_truncate_size; /* and the size we last truncated down to */ int i_truncate_pending; /* still need to call vmtruncate */ @@ -319,6 +330,12 @@ struct ceph_inode_info { struct work_struct i_vmtruncate_work; +#ifdef CONFIG_CEPH_FSCACHE + struct fscache_cookie *fscache; + u32 i_fscache_gen; /* sequence, for delayed fscache validate */ + struct work_struct i_revalidate_work; +#endif + struct inode vfs_inode; /* at end */ }; diff --git a/fs/fscache/cookie.c b/fs/fscache/cookie.c index 0e91a3c9fdb2..318e8433527c 100644 --- a/fs/fscache/cookie.c +++ b/fs/fscache/cookie.c @@ -558,3 +558,74 @@ void __fscache_cookie_put(struct fscache_cookie *cookie) _leave(""); } + +/* + * check the consistency between the netfs inode and the backing cache + * + * NOTE: it only serves no-index type + */ +int __fscache_check_consistency(struct fscache_cookie *cookie) +{ + struct fscache_operation *op; + struct fscache_object *object; + int ret; + + _enter("%p,", cookie); + + ASSERTCMP(cookie->def->type, ==, FSCACHE_COOKIE_TYPE_DATAFILE); + + if (fscache_wait_for_deferred_lookup(cookie) < 0) + return -ERESTARTSYS; + + if (hlist_empty(&cookie->backing_objects)) + return 0; + + op = kzalloc(sizeof(*op), GFP_NOIO | __GFP_NOMEMALLOC | __GFP_NORETRY); + if (!op) + return -ENOMEM; + + fscache_operation_init(op, NULL, NULL); + op->flags = FSCACHE_OP_MYTHREAD | + (1 << FSCACHE_OP_WAITING); + + spin_lock(&cookie->lock); + + if (hlist_empty(&cookie->backing_objects)) + goto inconsistent; + object = hlist_entry(cookie->backing_objects.first, + struct fscache_object, cookie_link); + if (test_bit(FSCACHE_IOERROR, &object->cache->flags)) + goto inconsistent; + + op->debug_id = atomic_inc_return(&fscache_op_debug_id); + + atomic_inc(&cookie->n_active); + if (fscache_submit_op(object, op) < 0) + goto submit_failed; + + /* the work queue now carries its own ref on the object */ + spin_unlock(&cookie->lock); + + ret = fscache_wait_for_operation_activation(object, op, + NULL, NULL, NULL); + if (ret == 0) { + /* ask the cache to honour the operation */ + ret = object->cache->ops->check_consistency(op); + fscache_op_complete(op, false); + } else if (ret == -ENOBUFS) { + ret = 0; + } + + fscache_put_operation(op); + _leave(" = %d", ret); + return ret; + +submit_failed: + atomic_dec(&cookie->n_active); +inconsistent: + spin_unlock(&cookie->lock); + kfree(op); + _leave(" = -ESTALE"); + return -ESTALE; +} +EXPORT_SYMBOL(__fscache_check_consistency); diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h index 12d505bedb5c..4226f6680b06 100644 --- a/fs/fscache/internal.h +++ b/fs/fscache/internal.h @@ -130,6 +130,12 @@ extern void fscache_operation_gc(struct work_struct *); /* * page.c */ +extern int fscache_wait_for_deferred_lookup(struct fscache_cookie *); +extern int fscache_wait_for_operation_activation(struct fscache_object *, + struct fscache_operation *, + atomic_t *, + atomic_t *, + void (*)(struct fscache_operation *)); extern void fscache_invalidate_writes(struct fscache_cookie *); /* diff --git a/fs/fscache/page.c b/fs/fscache/page.c index d479ab3c63e4..8702b732109a 100644 --- a/fs/fscache/page.c +++ b/fs/fscache/page.c @@ -278,7 +278,7 @@ static struct fscache_retrieval *fscache_alloc_retrieval( /* * wait for a deferred lookup to complete */ -static int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie) +int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie) { unsigned long jif; @@ -322,42 +322,46 @@ static void fscache_do_cancel_retrieval(struct fscache_operation *_op) /* * wait for an object to become active (or dead) */ -static int fscache_wait_for_retrieval_activation(struct fscache_object *object, - struct fscache_retrieval *op, - atomic_t *stat_op_waits, - atomic_t *stat_object_dead) +int fscache_wait_for_operation_activation(struct fscache_object *object, + struct fscache_operation *op, + atomic_t *stat_op_waits, + atomic_t *stat_object_dead, + void (*do_cancel)(struct fscache_operation *)) { int ret; - if (!test_bit(FSCACHE_OP_WAITING, &op->op.flags)) + if (!test_bit(FSCACHE_OP_WAITING, &op->flags)) goto check_if_dead; _debug(">>> WT"); - fscache_stat(stat_op_waits); - if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING, + if (stat_op_waits) + fscache_stat(stat_op_waits); + if (wait_on_bit(&op->flags, FSCACHE_OP_WAITING, fscache_wait_bit_interruptible, TASK_INTERRUPTIBLE) != 0) { - ret = fscache_cancel_op(&op->op, fscache_do_cancel_retrieval); + ret = fscache_cancel_op(op, do_cancel); if (ret == 0) return -ERESTARTSYS; /* it's been removed from the pending queue by another party, * so we should get to run shortly */ - wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING, + wait_on_bit(&op->flags, FSCACHE_OP_WAITING, fscache_wait_bit, TASK_UNINTERRUPTIBLE); } _debug("<<< GO"); check_if_dead: - if (op->op.state == FSCACHE_OP_ST_CANCELLED) { - fscache_stat(stat_object_dead); + if (op->state == FSCACHE_OP_ST_CANCELLED) { + if (stat_object_dead) + fscache_stat(stat_object_dead); _leave(" = -ENOBUFS [cancelled]"); return -ENOBUFS; } if (unlikely(fscache_object_is_dead(object))) { - pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->op.state); - fscache_cancel_op(&op->op, fscache_do_cancel_retrieval); - fscache_stat(stat_object_dead); + pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->state); + fscache_cancel_op(op, do_cancel); + if (stat_object_dead) + fscache_stat(stat_object_dead); return -ENOBUFS; } return 0; @@ -432,10 +436,11 @@ int __fscache_read_or_alloc_page(struct fscache_cookie *cookie, /* we wait for the operation to become active, and then process it * *here*, in this thread, and not in the thread pool */ - ret = fscache_wait_for_retrieval_activation( - object, op, + ret = fscache_wait_for_operation_activation( + object, &op->op, __fscache_stat(&fscache_n_retrieval_op_waits), - __fscache_stat(&fscache_n_retrievals_object_dead)); + __fscache_stat(&fscache_n_retrievals_object_dead), + fscache_do_cancel_retrieval); if (ret < 0) goto error; @@ -557,10 +562,11 @@ int __fscache_read_or_alloc_pages(struct fscache_cookie *cookie, /* we wait for the operation to become active, and then process it * *here*, in this thread, and not in the thread pool */ - ret = fscache_wait_for_retrieval_activation( - object, op, + ret = fscache_wait_for_operation_activation( + object, &op->op, __fscache_stat(&fscache_n_retrieval_op_waits), - __fscache_stat(&fscache_n_retrievals_object_dead)); + __fscache_stat(&fscache_n_retrievals_object_dead), + fscache_do_cancel_retrieval); if (ret < 0) goto error; @@ -658,10 +664,11 @@ int __fscache_alloc_page(struct fscache_cookie *cookie, fscache_stat(&fscache_n_alloc_ops); - ret = fscache_wait_for_retrieval_activation( - object, op, + ret = fscache_wait_for_operation_activation( + object, &op->op, __fscache_stat(&fscache_n_alloc_op_waits), - __fscache_stat(&fscache_n_allocs_object_dead)); + __fscache_stat(&fscache_n_allocs_object_dead), + fscache_do_cancel_retrieval); if (ret < 0) goto error; @@ -693,6 +700,22 @@ nobufs: } EXPORT_SYMBOL(__fscache_alloc_page); +/* + * Unmark pages allocate in the readahead code path (via: + * fscache_readpages_or_alloc) after delegating to the base filesystem + */ +void __fscache_readpages_cancel(struct fscache_cookie *cookie, + struct list_head *pages) +{ + struct page *page; + + list_for_each_entry(page, pages, lru) { + if (PageFsCache(page)) + __fscache_uncache_page(cookie, page); + } +} +EXPORT_SYMBOL(__fscache_readpages_cancel); + /* * release a write op reference */ diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h index a9ff9a36b86d..7823e9ef995e 100644 --- a/include/linux/fscache-cache.h +++ b/include/linux/fscache-cache.h @@ -251,6 +251,10 @@ struct fscache_cache_ops { /* unpin an object in the cache */ void (*unpin_object)(struct fscache_object *object); + /* check the consistency between the backing cache and the FS-Cache + * cookie */ + bool (*check_consistency)(struct fscache_operation *op); + /* store the updated auxiliary data on an object */ void (*update_object)(struct fscache_object *object); diff --git a/include/linux/fscache.h b/include/linux/fscache.h index 7a086235da4b..19b46458e4e8 100644 --- a/include/linux/fscache.h +++ b/include/linux/fscache.h @@ -183,6 +183,7 @@ extern struct fscache_cookie *__fscache_acquire_cookie( const struct fscache_cookie_def *, void *); extern void __fscache_relinquish_cookie(struct fscache_cookie *, int); +extern int __fscache_check_consistency(struct fscache_cookie *); extern void __fscache_update_cookie(struct fscache_cookie *); extern int __fscache_attr_changed(struct fscache_cookie *); extern void __fscache_invalidate(struct fscache_cookie *); @@ -208,6 +209,8 @@ extern bool __fscache_maybe_release_page(struct fscache_cookie *, struct page *, gfp_t); extern void __fscache_uncache_all_inode_pages(struct fscache_cookie *, struct inode *); +extern void __fscache_readpages_cancel(struct fscache_cookie *cookie, + struct list_head *pages); /** * fscache_register_netfs - Register a filesystem as desiring caching services @@ -325,6 +328,25 @@ void fscache_relinquish_cookie(struct fscache_cookie *cookie, int retire) __fscache_relinquish_cookie(cookie, retire); } +/** + * fscache_check_consistency - Request that if the cache is updated + * @cookie: The cookie representing the cache object + * + * Request an consistency check from fscache, which passes the request + * to the backing cache. + * + * Returns 0 if consistent and -ESTALE if inconsistent. May also + * return -ENOMEM and -ERESTARTSYS. + */ +static inline +int fscache_check_consistency(struct fscache_cookie *cookie) +{ + if (fscache_cookie_valid(cookie)) + return __fscache_check_consistency(cookie); + else + return 0; +} + /** * fscache_update_cookie - Request that a cache object be updated * @cookie: The cookie representing the cache object @@ -569,6 +591,26 @@ int fscache_alloc_page(struct fscache_cookie *cookie, return -ENOBUFS; } +/** + * fscache_readpages_cancel - Cancel read/alloc on pages + * @cookie: The cookie representing the inode's cache object. + * @pages: The netfs pages that we canceled write on in readpages() + * + * Uncache/unreserve the pages reserved earlier in readpages() via + * fscache_readpages_or_alloc() and similar. In most successful caches in + * readpages() this doesn't do anything. In cases when the underlying netfs's + * readahead failed we need to clean up the pagelist (unmark and uncache). + * + * This function may sleep as it may have to clean up disk state. + */ +static inline +void fscache_readpages_cancel(struct fscache_cookie *cookie, + struct list_head *pages) +{ + if (fscache_cookie_valid(cookie)) + __fscache_readpages_cancel(cookie, pages); +} + /** * fscache_write_page - Request storage of a page in the cache * @cookie: The cookie representing the cache object diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3be308e14302..4a5df7b1cc9f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -290,7 +290,7 @@ int ceph_msgr_init(void) if (ceph_msgr_slab_init()) return -ENOMEM; - ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); + ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0); if (ceph_msgr_wq) return 0; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index dd47889adc4a..1606f740d6ae 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -503,7 +503,9 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); size_t payload_len = 0; - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && + opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && + opcode != CEPH_OSD_OP_TRUNCATE); op->extent.offset = offset; op->extent.length = length; @@ -631,6 +633,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_TRUNCATE: if (src->op == CEPH_OSD_OP_WRITE) request_data_len = src->extent.length; dst->extent.offset = cpu_to_le64(src->extent.offset); @@ -715,7 +720,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u64 object_base; int r; - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && + opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && + opcode != CEPH_OSD_OP_TRUNCATE); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -1488,14 +1495,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, req, result); - ceph_decode_need(&p, end, 4, bad); + ceph_decode_need(&p, end, 4, bad_put); numops = ceph_decode_32(&p); if (numops > CEPH_OSD_MAX_OP) goto bad_put; if (numops != req->r_num_ops) goto bad_put; payload_len = 0; - ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); + ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); for (i = 0; i < numops; i++) { struct ceph_osd_op *op = p; int len; @@ -1513,7 +1520,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, goto bad_put; } - ceph_decode_need(&p, end, 4 + numops * 4, bad); + ceph_decode_need(&p, end, 4 + numops * 4, bad_put); retry_attempt = ceph_decode_32(&p); for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); @@ -1786,6 +1793,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) nr_maps--; } + if (!osdc->osdmap) + goto bad; done: downgrade_write(&osdc->map_sem); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); @@ -2129,6 +2138,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, dout("osdc_start_request failed map, " " will retry %lld\n", req->r_tid); rc = 0; + } else { + __unregister_request(osdc, req); } goto out_unlock; } @@ -2253,12 +2264,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (err < 0) goto out_msgpool; + err = -ENOMEM; osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); - if (IS_ERR(osdc->notify_wq)) { - err = PTR_ERR(osdc->notify_wq); - osdc->notify_wq = NULL; + if (!osdc->notify_wq) goto out_msgpool; - } return 0; out_msgpool: diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 603ddd92db19..dbd9a4792427 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1129,7 +1129,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, /* pg_temp? */ pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, - pool->pgp_num_mask); + pool->pg_num_mask); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len;