#include "cache.h" ulong cache_size = CACHE_SIZE; module_param(cache_size, ulong, 0); MODULE_PARM_DESC(cache_size, "Default: 4096 pages. Size of the cache."); atomic64_t outstanding_reqs = ATOMIC_INIT(0); ulong cache_hits; ulong cache_reads; #define WRITE_REPLY mru_finish #define READ_REPLY cache_retrieve_finish /** * cache_hash - * @offset: * * Description: * * Return: * The index in the hashtable **/ ulong cache_hash(void *offset) { return MOD(*(ulong *)offset, binary_hash_size); } /** * cache_entry_equal - used by the hashtable * @x: * @y: * * Description: * * Return: * true - * false - **/ bool cache_entry_equal(void *x, void *y) { ulong *one = x; ulong *two = y; if(*one == *two) return true; return false; } /** * cache_init - initialize a cache structure * @cache: the cache structure we wish to initialize * @cacheSize: the size we want the cache to be * @rep: the replacement function for the cache (LRU, CLOCK, ...) * @lock: pointer to the device's lock * @osr: pointer to device's outstanding_reqs counter * @queue: device's request queue * * Description: * This function is designed to initialize an object of type struct Cache. * * Return: * >= 0 - Success * < 0 - Failed **/ int cache_init(Cache *cache, int cacheSize, int (*rep)(struct cache *), void (*late)(void *), void *dev, spinlock_t *lock, struct request_queue *queue) { int hash_size = cache_size * 2, i = 0, ret = 0; struct cache_entry *ce; printk(KERN_ALERT "Cache size: %lu pages\n", cache_size); cache_hits = cache_reads = 0; max_requests = cacheSize; /* Initialize Network connection */ if((ret = client_network_init(READ_REPLY, WRITE_REPLY, cache) < 0)) goto nonet; cache->cache_size = cacheSize; cache->cache_write_limit = cacheSize/2; cache->lock = lock; cache->stored_pages = 0; cache->stored_write_pages = 0; cache->cache = cache; /* Linked List */ INIT_LIST_HEAD(&cache->lh); INIT_LIST_HEAD(&cache->list_free); //cache->queue = create_singlethread_workqueue("delayed write"); /* Init kmem_cache */ if((cache->page_cache = kmem_cache_create("p_cache", sizeof(struct cache_entry), 0, 0, NULL, NULL)) == NULL) { ret = -ENOMEM; printk(KERN_ALERT "Error: Could not create cache slab cache.\n"); goto noslab; } ret = init_table(&cache->ht, &hash_size, cache_hash, cache_entry_equal, "Cache_Table"); if(ret < 0) goto notable; cache->replacement = rep; cache->dev = dev; binary_hash_size = cache->ht.ibits; /* Preallocate sk_buffs */ for(i = 0; i < CACHE_SIZE + REQ_THRESH_UPPER; i++) { ce = (struct cache_entry *) kmem_cache_alloc(cache->page_cache, GFP_ATOMIC); if(!ce) { printk(KERN_ALERT "Error: Could not allocate cache entries on setup.\n"); ret = -ENOMEM; break; } ce->skb = alloc_skb(PREALLOC_RW + PAGE_SIZE, GFP_ATOMIC); if(!ce->skb) { printk(KERN_ALERT "Error: Could not allocate socket buffers.\n"); kmem_cache_free(cache->page_cache, ce); ret = -ENOMEM; break; } ce->page = ce->skb->data + PREALLOC_RW; list_add_tail(&ce->queue, &cache->list_free); } if(ret >= 0) goto good; else { /* All of the cache entries did not succeed. Free them */ list_head *x, *y; list_for_each_safe(x, y, &cache->list_free) { ce = list_entry(x, struct cache_entry, queue); if(ce->skb) kfree_skb(ce->skb); kmem_cache_free(cache->page_cache, ce); } } clean_table(&cache->ht); notable: kmem_cache_destroy(cache->page_cache); noslab: client_network_exit(); nonet: good: return ret; } /** * cache_cleanup - * @cache: * * Description: * * Return: * 0 - * 1 - **/ int cache_cleanup(Cache *cache) { list_head *x; list_head *y; struct cache_entry *list_ce = NULL; /* Clean up delayed write queue */ /* Clean up network connection */ client_network_exit(); /* Cache memory cleanup */ list_for_each_safe(x, y, &cache->lh) { list_ce = (struct cache_entry *)list_entry(x, struct cache_entry, queue); cache_remove(cache, list_ce->offset); } list_for_each_safe(x, y, &cache->list_free) { list_ce = (struct cache_entry *)list_entry(x, struct cache_entry, queue); kfree_skb(list_ce->skb); kmem_cache_free(cache->page_cache, list_ce); } /* Hashtable cleanup */ clean_table(&cache->ht); if(kmem_cache_destroy(cache->page_cache)) { printk(KERN_ALERT "anemone_client(cache_cleanup()): kmem_cache_destroy() failed!\n"); return 1; } return 0; } /** * cache_add - adds the a buffer to the cache * @cache: cache we wish to add to * @buffer: buffer we wish to copy into the cache * @offset: use this offset to compute the hash value * @size: how many bytes we should transfer * * Description: * We explicitly add the data in @buffer to the cache. We use @offset to * index the buffer in the hashtable, and we always place the latest page * on the tail of the FIFO queue. If an entry with the same offset already * exists in the cache we remove the old copy from the cache, and insert the * new copy as if the old one did not exist. * * Return: * 0 - Added the buffer to the cache, and did not need to send anything over * the network * PAGE_OUT_OVER_NET - * Added the buffer to the cache, but needed to send an old cache entry * out over the net **/ int cache_add(Cache *cache, ulong offset, ulong size, void *bio) { /* Allocate memory for the page */ struct cache_entry *old_ce; struct cache_entry *ce = NULL; list_head *x, *y; ulong flags; char * data; #ifdef TRACE printk(KERN_ALERT "WRITE offset: %lu\n", offset); #endif if(!irqs_disabled()) { printk(KERN_ALERT "cache_add(): irqs are not disabled!\n"); BUG(); } if(list_empty(&cache->list_free)) { printk(KERN_ALERT "cache_add(): trying to allocate mem from an empty pool!\n"); BUG(); } /* Grabs first entry from mem pool (the one that's been free the longest) */ list_for_each_safe(x, y, &cache->list_free) { ce = (struct cache_entry *)list_entry(x, struct cache_entry, queue); if(!ce) BUG(); break; } /* Copy Packet */ if(debug) printk(KERN_ALERT "cache_add(): offset: %lu\n", offset ); ce->page = ce->skb->data + PREALLOC_RW; data = bio_kmap_irq((struct bio*)bio, &flags); memcpy(ce->page, data, size); bio_kunmap_irq(data, &flags); /* Add entry to Linked List */ ce->use_count = 1; ce->write = 1; ce->offset = offset; ce->answered = 1; ce->swapped = 0; ce->prefetch = 0; /* * Check for existing copy of page in cache. If one is there * then we must replace the packet and skb with the new, updated * versions. We must take care to increase the use count for * that packet. */ if((old_ce = (struct cache_entry *)lookup_table(&cache->ht, &ce->offset)) != NULL) { ce->use_count = old_ce->use_count + 1; if(debug) printk(KERN_ALERT "cache_add(): removing offset %lu\n",offset); cache_remove(cache, old_ce->offset); } /* Increment Stored Pages */ cache->stored_pages++; cache->stored_write_pages++; if(debug) printk(KERN_ALERT "cache_add(): inserting offset%lu into hash table\n", offset ); /* Add entry to Hash Table */ insert_table(&cache->ht, &ce->offset, ce); list_move_tail(&ce->queue, &cache->lh); /* schedule delayed write */ if(!request_anemone(1, ce->offset, ce->skb, (void *)ce)) { printk(KERN_ALERT "cache_add(): outstanding_reqs: %lu, retransmissions: %lu\n", (ulong)atomic64_read(&outstanding_reqs), retransmissions); BUG(); } atomic64_inc(&outstanding_reqs); /* Run Replacement Function if stored_pages > cache_size */ if(cache->stored_write_pages > cache->cache_write_limit || cache->stored_pages > cache->cache_size) { cache->replacement(cache); return PAGE_OUT_OVER_NET; } if(debug) printk(KERN_ALERT "cache_add(): stored_pages = %d, size = %d, offset = %lu\n", cache->stored_pages, cache->cache_size, ce->offset); return 0; } void cache_prefetch(Cache * cache, ulong offset) { int ret,i = 0; struct cache_entry * pce = NULL; list_head *x, *y; if(offset == 0){ return; } while(i < MAX_PREFETCH){ offset++; i++; pce = (struct cache_entry *) lookup_table(&cache->ht, &offset); if(pce == NULL && atomic64_read(&outstanding_reqs) < (REQ_THRESH_UPPER - 3)) { list_for_each_safe(x,y,&cache->list_free){ pce = (struct cache_entry *)list_entry(x,struct cache_entry, queue); break; } list_move(&pce->queue, &cache->lh); pce->use_count = 1; pce->write = 0; pce->answered = 0; pce->offset = offset; pce->swapped = 0; pce->prefetch = 1; cache->stored_pages++; /* Add entry to Hash Table */ insert_table(&cache->ht, &pce->offset, pce); if(cache->stored_pages > cache->cache_size){ cache->replacement(cache); } if(debug) printk(KERN_ALERT "retrieve(): pageoffset %lu out: %lu, stored: %d\n", offset, (ulong) atomic64_read(&outstanding_reqs), cache->stored_pages); atomic64_inc(&outstanding_reqs); if((ret = request_anemone(0, offset, pce->skb, NULL))==0) { printk(KERN_ALERT "cache_retrieve(): ERROR! outstanding_reqs: %lu\n", (ulong)atomic64_read(&outstanding_reqs)); BUG(); } else if (ret<0){ /*ret <0 means cannot find map between offset and server mac*/ pce->answered = 1; pce->swapped = 1; pce->prefetch = 0; atomic64_dec(&outstanding_reqs); remove_table(&cache->ht, &pce->offset); if (debug) printk(KERN_ALERT "cache_retrieve(): cannot find read page offset = %lu\n", offset); } } } } /** * cache_retrieve - returns a page entry from the cache * @cache: the cache we wish to retrieve a page from * @offset: offset of the page we are looking for * * Description: * Looks through the cache for an entry at @offset. If it does not find one * it sends out a request to the memory engine, creates an empty entry in the cache * and returns NULL. If we find an entry, pop the entry off @cache->lh, and * add it to the tail of @cache->lh. * * Return: * page - the page we were looking for * NULL - the page was not in memory. If this returns NULL, we have send a page * request to the memory engine. **/ void *cache_retrieve(Cache *cache, ulong offset, void *b) { struct cache_entry *ce = (struct cache_entry *) lookup_table(&cache->ht, &offset); list_head *x, *y; struct bio * bio = (struct bio *)b; int ret; if(!bio_rw_ahead(bio)) cache_reads++; if(ce) cache_hits++; #ifdef TRACE printk(KERN_ALERT "READ offset: %lu, %s\n", offset, (ce == NULL)?"MISS":"HIT"); #endif if(!irqs_disabled()) { printk(KERN_ALERT "cache_retrieve(): irqs are not disabled!\n"); BUG(); } if(ce == NULL) { list_for_each_safe(x,y,&cache->list_free){ ce = (struct cache_entry *)list_entry(x,struct cache_entry, queue); break; } list_move_tail(&ce->queue, &cache->lh); ce->use_count = 1; ce->write = 0; ce->answered = 0; ce->offset = offset; ce->swapped = 0; ce->prefetch = 0; cache->stored_pages++; /* Add entry to Hash Table */ insert_table(&cache->ht, &ce->offset, ce); if(cache->stored_pages > cache->cache_size){ cache->replacement(cache); } if(debug) printk(KERN_ALERT "retrieve(): pageoffset %lu out: %lu, stored: %d\n", offset, (ulong) atomic64_read(&outstanding_reqs), cache->stored_pages); atomic64_inc(&outstanding_reqs); if((ret = request_anemone(0, offset, ce->skb, bio))==0) { printk(KERN_ALERT "cache_retrieve(): ERROR! outstanding_reqs: %lu\n", (ulong)atomic64_read(&outstanding_reqs)); BUG(); } else if (ret<0){ /*ret <0 means cannot find map between offset and server mac*/ ce->answered = 1; ce->swapped = 1; ce->prefetch = 0; atomic64_dec(&outstanding_reqs); if (debug) printk(KERN_ALERT "cache_retrieve(): cannot find read page offset = %lu\n", offset); return ce->page; } cache_prefetch(cache,offset); return NULL; } /* Move the cache_entry to the MRU position */ else { //page cached by upper level so we don't need to cache any more. list_del(&ce->queue); list_add_tail(&ce->queue, &cache->lh); ce->answered = 0; cache_prefetch(cache,offset); ce->answered = 1; } /* are there multiple requests for a page before the first page_in returns? */ if(ce->answered == 0) { printk(KERN_ALERT "cache_retrieve(): HOLY MOTHER & FATHER OF GOD, WHAT IS THIS?!?!\n"); BUG(); } return ce->page; } /** * cache_retieve_no_net - queries the cache * @cache: the cache we want to retrieve a page from * @offset: offset of the page we are looking for * * Description: * Looks in the cache for an entry at @offset. If it does not find one * it returns NULL. This version does NOT communicate with the memory engine. * If we find an entry, pop the entry off @cache->lh, and add it to the tail * of @cache->lh. * * Return: * page - page we were looking for * NULL - page is not in cache **/ void *cache_retrieve_no_net(Cache *cache, ulong offset) { struct cache_entry *ce = (struct cache_entry *)lookup_table(&cache->ht, &offset); if(ce) { list_del(&ce->queue); list_add_tail(&ce->queue, &cache->lh); } return ce->page; } /** * cache_retrieve_finish - adds entry to cache, and calls function to remove requests * tmp queue * @c: cache to insert data into * @data: data that will be inserted into the cache * @success: 1 if true, 0 if false * * Description: * This function is responsible for adding @data to @c (cache). Once it has * added @data it must call a function that will alert the device that pages * have arrived and are ready for consumption. * * Return: * VOID **/ void cache_retrieve_finish(void *c, struct sk_buff *skb, ulong offset, bool success, void *b) { struct cache *cache = (struct cache *)c; struct bio *bio = (struct bio *)b; ulong flags; struct cache_entry *ce; char *data; if(bio != NULL && !PageLocked(bio->bi_io_vec[0].bv_page)) { BUG(); } if(!irqs_disabled()) { printk(KERN_ALERT "cache_retrieve_finish(): irqs are not disabled!\n"); BUG(); } atomic64_dec(&outstanding_reqs); if(debug) printk(KERN_ALERT "finish(): reqs = %lu, offset: %lu storedpages:%d\n", (ulong)atomic64_read(&outstanding_reqs), offset, cache->stored_pages); ce = (struct cache_entry *)lookup_table(&cache->ht, &offset); if(ce == NULL) { printk(KERN_ALERT "finish(): could not find offset %lu\n", offset); BUG(); } ce->answered = 1; ce->swapped = 1; if(ce->prefetch){ list_move(&ce->queue, &cache->lh); } if(success){ if(!ce->prefetch && bio != NULL) { data = bio_kmap_irq(bio, &flags); memcpy(data, ce->page, bio_sectors(bio) * KERNEL_SECTOR_SIZE); bio_kunmap_irq(data, &flags); bio_endio(bio, bio->bi_size, 0); /* remove from cache since page cache will hold this page */ cache_remove(cache,ce->offset); } } else { if(!ce->prefetch && bio != NULL) { printk(KERN_ALERT "cache_retrieve_finish(): we weren't successful\n"); printk(KERN_ALERT "Couldn't retreive: %lu \n", offset); bio_endio(bio, bio->bi_size, -EIO); } cache_remove(cache,ce->offset); } ce->prefetch = 0; return; } /** * cache_remove - removes an entry from the cache * @cache: cache we will remove an entry from * @offset: offset of the entry to remove * * Description: * * Return: * 0 - FAILED * 1 - SUCCEEDED **/ int cache_remove(Cache *cache, ulong offset) { struct cache_entry *ce = lookup_table(&cache->ht, &offset); if(ce == NULL) { printk(KERN_ALERT "remove(): reqs: %lu, retrans: %lu\n", (ulong)atomic64_read(&outstanding_reqs), retransmissions ); BUG(); } if(!ce->answered) { printk(KERN_ALERT "You can't remove an unanswered page!!\n"); BUG(); } if(debug) printk(KERN_ALERT "cache_remove(): Removing page @ offset: %lu\n", offset); if(!list_empty(&ce->queue)) list_del_init(&ce->queue); remove_table(&cache->ht, &offset); /* Place the freed container on the free queue */ list_add_tail(&ce->queue, &cache->list_free); if(ce->write) { cache->stored_write_pages--; } cache->stored_pages--; return 0; } int cache_remove_and_dec(Cache * cache, ulong offset) { cache->stored_pages--; return cache_remove(cache, offset); } /********************************************************************************/ /* Page Replacement Algorithms */ /********************************************************************************/ /** * mru_replace - * @cache: * * Description: * * Return: * 0 - Success **/ int mru_replace(struct cache *cache) { struct cache_entry *list_ce; ulong howmany = 1; while(howmany > 0) { list_ce = list_entry(cache->lh.prev, struct cache_entry, queue); if(cache->stored_write_pages > cache->cache_write_limit) { /* possible infinite (or just very long) loop here?? */ while(!list_ce->write || !list_ce->answered || !list_ce->swapped || list_ce->prefetch) { //printk(KERN_ALERT "WAIT! This page hasn't been answered yet!!!!\n"); list_ce = list_entry(list_ce->queue.prev, struct cache_entry, queue); } } else{ while(!list_ce->answered || !list_ce->swapped || list_ce->prefetch) { //printk(KERN_ALERT "WAIT! This page hasn't been answered yet!!!!\n"); list_ce = list_entry(list_ce->queue.prev, struct cache_entry, queue); } } if(!list_empty(&list_ce->queue)) list_del_init(&list_ce->queue); cache_remove(cache, list_ce->offset); if(debug) printk(KERN_ALERT "mru_replace(): replaced page @ offset: %lu, stored: %d\n", list_ce->offset, cache->stored_pages); #ifdef TRACE printk(KERN_ALERT "VICTIM offset: %lu\n", list_ce->offset); #endif howmany--; } return 0; } /** * mru_finish - * @c: * @m: * @success: * * Description: * * Return: * VOID **/ void mru_finish(void *c, void *e, bool success) { //ulong flags; struct cache_entry * ce = (struct cache_entry *)e; if(!irqs_disabled()) { printk(KERN_ALERT "mru_finish(): irqs are not disabled!\n"); BUG(); } if(!success) { printk(KERN_ALERT "ALL YOUR BASE ARE BELONG TO US!\n"); printk(KERN_ALERT "outstanding_reqs: %lu\n", (ulong)atomic64_read(&outstanding_reqs)); } atomic64_dec(&outstanding_reqs); if(debug) printk(KERN_ALERT "mru_finish(): outstanding_reqs = %lu\n", (ulong)atomic64_read(&outstanding_reqs)); /* Need to be passed a pointer to the cache entry */ ce->answered = 1; ce->prefetch = 0; ce->swapped = 1; // printk(KERN_ALERT "outstanding_reqs: %lu stored pages%lu\n", atomic64_read(&outstanding_reqs), cache->stored_pages); return; }