#include "netclient.h" /* Block device network subsystem code for the client module. * The code is exclusively used by the client to send out requests to anemone * servers. * It includes an output queue, flow control, and retransmission handling. */ /* Globals */ extern long nsectors; atomic64_t num_requests = ATOMIC_INIT(0), inflight = ATOMIC_INIT(0), timerset = ATOMIC_INIT(0); ulong captured = 0, packet_drops = 0, retrans_drops = 0, retransmissions = 0, success = 0, writes = 0, oldreads = 0, oldwrites = 0, reads = 0, rband = 0, wband = 0, readlat = 0, writelat = 0, spaced = 0, sequence = 0, ctrlcount = 0, servers = 0; timeval lasttransmit; timeval started; ulong max_requests = REQ_THRESH_UPPER; ulong session_id; u8 dest[ETH_ALEN]; HashTable mapping; int map_size = 129000; static char report_data[4000]; struct proc_dir_entry * aprocfile2; kmem_cache_t * pcache; static LIST_HEAD(pq); LIST_HEAD(server_list); struct timer_list pq_timer; struct timer_list broadcast_recv_timer; struct timer_list session_timer; void (*read_callback) (void *, sk_buff *, ulong, bool, void *) = NULL; void (*write_callback) (void *, void *, bool) = NULL; void * meta_back = NULL; spinlock_t timerlock; int toggle_debug(struct file* file, const char* buffer, ulong count, void* data) { if(debug) { printk(KERN_ALERT "Anemone: Turning debugging OFF.\n"); debug = false; } else { printk(KERN_ALERT "Anemone: Turning debugging ON.\n"); debug = true; } return count; } bool map_equal(void *x, void *y) { if((ulong)x == (ulong)y) return true; return false; } ulong map_hash(void *offset) { if((ulong) offset > nsectors) printk(KERN_ALERT "Anemone_Client: Beyond-end write %ld of %ld !!!!!!!!!!!!\n", (ulong)offset, nsectors); return MOD((ulong) offset, mapping.ibits); } /* Networking initialization by the client during module load-time. */ int client_network_init(void (*read_back) (void *, sk_buff *, ulong, bool, void *), void (*write_back) (void *, void *, bool), void * meta) { int ret; /* Initializations */ pcache = kmem_cache_create("pcache", sizeof(Packet), 0, 0, NULL, NULL); if(!pcache) { printk(KERN_ALERT "Error: packet slab cache allocation failed.\n"); kmem_cache_destroy(pcache); dev_put(NIC); return -ENOMEM; } ret = net_init(anemone_client_recv); if(ret < 0) { kmem_cache_destroy(pcache); return ret; } if(init_table(&mapping, &map_size, map_hash, map_equal, "Map_Table") != 0) return -ENOMEM; printk(KERN_ALERT "Init mapping hashtable sucessfully. ibits = %lu \n",mapping.ibits); do_gettimeofday(&lasttransmit); do_gettimeofday(&started); session_id = (ulong)started.tv_sec; spin_lock_init(&timerlock); read_callback = read_back; write_callback = write_back; meta_back = meta; /* Timer. */ init_timer(&pq_timer); pq_timer.function = traverse_replies; pq_timer.data = TRAVERSE_TIMER; add_timer(&pq_timer); mod_timer(&pq_timer, jiffies); init_timer(&broadcast_recv_timer); broadcast_recv_timer.function = broadcast_recv_timeout; broadcast_recv_timer.expires = jiffies + SERVER_BROADCAST_TIMEOUT; broadcast_recv_timer.data = SERVER_BROADCAST_TIMEOUT; add_timer(&broadcast_recv_timer); init_timer(&session_timer); session_timer.function = session_timeout; session_timer.expires = jiffies + SESSION_TIMEOUT; session_timer.data = SESSION_TIMEOUT; add_timer(&session_timer); /* Procfile(s) */ aprocfile2 = create_proc_entry(PROCNAME2, S_IFREG | S_IRUGO, NULL); aprocfile2->write_proc = toggle_debug; aprocfile2->data = report_data; aprocfile2->owner = THIS_MODULE; aprocfile2->read_proc = NULL; return 0; } /* Cleanup during module exit */ void client_network_exit(void) { list_head *x, *y; int cleaned = 0; remove_proc_entry(PROCNAME2, NULL); net_exit(); /* get rid of the timer */ del_timer(&pq_timer); del_timer(&session_timer); del_timer(&broadcast_recv_timer); /* clean the mapping hashtable*/ clean_table(&mapping); /* Cleanup the packet queue. */ list_for_each_safe(x, y, &pq) { callback(list_entry(x, Packet, queue)); cleaned++; } printk(KERN_ALERT "Cleaned %d queued packets.\n", cleaned); if(kmem_cache_destroy(pcache)) printk(KERN_ALERT "Anemone Error: Packet slab cache failed to empty.\n"); } /* * Process ACK and callback to device driver. */ inline void process_ack(Packet * p) { list_head *y, *x; Packet *oldp; Server *s; bool found = false; p->m = *p->mp; if(debug) printk(KERN_ALERT "Handling Reply... offset: %lu\n", (ulong) p->m.offset); list_for_each_safe(x, y, &pq) { oldp = list_entry(x, Packet, queue); if(p->m.seq == oldp->m.seq) { /* * Complete the request. */ found = true; s = lookup_table(&mapping, (void *)((ulong) oldp->m.offset)); if(p->m.status == REPLY_GOOD) { oldp->success = true; success++; if(oldp->m.type == PAGE_IN) { reads++; memcpy(oldp->skb->data + PREALLOC_RW, p->mp + 1, p->m.count); } else writes++; } else if(p->m.status == REPLY_FULL) { /* Reset this packet to be re-scheduled on a new server. * Several things must happen here: * 1. We remove the old mapping. * 2. We increase the number of times this packet has * been rescheduled for record-keeping. * 3. We decrement the window so that new requests can * still be pulled from the packet queue. * 4. Reset the number of retransmissions. * 5. Signal that this server should not be considered for * scheduling in the future. update_server_status will * toggle this flag in the future. */ if(oldp->rescheduled < servers) { remove_table(&mapping, (void *)((ulong) oldp->m.offset)); atomic64_set(&oldp->retry, 0); atomic64_dec(&inflight); oldp->rescheduled++; s->consider = false; printk(KERN_ALERT "Packet rescheduled, offset: %lu\n", (ulong)oldp->m.offset); break; } } if(!oldp->success && net_ratelimit()) printk(KERN_ALERT "ACK ERROR (%d): server: %x avail_mem : %lu MB\n", p->m.type, s->mac[5], MB(s->avail_mem)); callback(oldp); break; } } if (!found) { printk(KERN_ALERT "ACK ERROR(%lu): offset %lu not found.\n",(ulong)p->m.status, (ulong)(p->m.offset)); dump_server_status(); s = get_least_utilized(); printk(KERN_ALERT "process_ack, get least utilized server %x avail_memory is: %lu MB\n", s->mac[5], (s->avail_mem)*4/1024); } } /* * Update client session status to server, * to avoid being taken as dead client */ void session_timeout(ulong timer) { if (timer != SESSION_TIMEOUT) return; broadcast_session_status(); mod_timer(&session_timer, jiffies + SESSION_TIMEOUT); } void broadcast_session_status() { Message *m; struct sk_buff *skb; ethhdr *eth; u8 bc_addr[ETH_ALEN]; skb = dev_alloc_skb(headers); if(!skb) BUG(); skb->len = headers; skb->dev = NIC; eth = (ethhdr *) skb->data; m = (Message *) (eth + 1); m->seq = 0; m->status = 0; m->count = 0; m->offset = 0; m->vpid = 0; m->serv_internal_time = 0; m->fragbyte = 0; m->fragment = 0; m->fragleft = 0; m->session_id = session_id; m->type = TYPE_CLIENT_BROADCAST; inet_mac(BROADCAST_MAC, bc_addr); local_irq_enable(); anemone_send(skb, bc_addr); local_irq_disable(); ctrlcount++; } /* * check whether there is timeout server, * if yes,kick out the node after 3 retries */ void broadcast_recv_timeout(ulong recv_timer) { Server *p, *q; list_head *x, *y; timeval curr, sub; ulong flags; if (recv_timer != SERVER_BROADCAST_TIMEOUT) return; local_irq_save(flags); local_irq_disable(); list_for_each_safe(x, y, &server_list) { p = list_entry(x, struct Server, queue); do_gettimeofday(&curr); timersub(&curr, &p->timestamp, &sub); /* Give server 3 retry times */ if (USEC(sub) > (3* SERVER_BROADCAST_TIMEOUT*10000) ) { list_del(x); servers--; rewind_table(&mapping); while ( (q = next_in_table(&mapping)) ) { if ( p==q ) { remove_current_in_table(&mapping); } } kfree(p); printk(KERN_ALERT "Delete Server from list with mac addr = %02x:%02x:%02x:%02x:%02x:%02x.\n", mac_ntoa(p->mac)); } } local_irq_restore(flags); mod_timer(&broadcast_recv_timer, jiffies + SERVER_BROADCAST_TIMEOUT); } Server * get_least_utilized() { Server *p=NULL,*s=NULL; struct list_head *pos; list_for_each(pos, &server_list) { p = list_entry(pos, struct Server, queue); /*pick the server that has the most mem*/ // tmp = (int)(100*p->avail_mem/p->total_mem); if(s && ((p->avail_mem <= s->avail_mem) || !s->consider)) continue; s = p; } if ( (!s) && (!list_empty(&server_list)) ) { printk(KERN_ALERT "damn it.seems no available server,actually there is one %lu \n",p->avail_mem); s = p ; } return s; } void dump_server_status(){ Server *p = NULL; struct list_head *x, *y; list_for_each_safe(x, y, &server_list) { p = list_entry(x, struct Server, queue); if (!p) printk(KERN_ALERT "dump_server_status: server %x available with avail_mem = %luMB, percent= %lu.\n", p->mac[5], (ulong)((p->avail_mem)*4/1024), (ulong)(100*(p->avail_mem)/(p->total_mem))); } } void update_server_status(sk_buff* skb, ulong avail_mem, ulong total_mem, ulong status){ Server *server,*p = NULL; struct list_head *pos, *y; bool found = false; int total = 0; list_for_each_safe(pos, y, &server_list) { p = list_entry(pos, struct Server, queue); if ( memcmp(skb->mac.raw + ETH_ALEN, p->mac, ETH_ALEN) == 0) { if(!p->consider && p->avail_mem < avail_mem) p->consider = true; if ( avail_mem < 1000 ) { /* something is wrong ,we had to deal with it * we consider if it is used up of memory */ //if (net_ratelimit()) printk(KERN_ALERT "update_status:seems nearly runs out of memory for server mac %x avail_mem %luPages total_mem %luPages\n",p->mac[5], avail_mem, total_mem); found = true; return; } do_gettimeofday(&p->timestamp); p->avail_mem = avail_mem; p->total_mem = total_mem; p->status = status; found = true; } } if (!found) { servers++; server = (struct Server*)kmalloc(sizeof(struct Server), GFP_ATOMIC); do_gettimeofday(&server->timestamp); memcpy(server->mac, skb->mac.raw + ETH_ALEN, ETH_ALEN); server->avail_mem = avail_mem; server->total_mem = total_mem; server->reads = 0; server->writes = 0; server->consider = true; server->status = status; list_add_tail(&server->queue, &server_list); list_for_each(y, &server_list) { total += list_entry(y, struct Server, queue)->avail_mem; } printk(KERN_ALERT "\nNew Server %x, avail_mem = %luMB, %lu pages, percent= %lu, Summary: %lu, %d pages.\n", server->mac[5], avail_mem*4/1024, avail_mem, (int)100*avail_mem/total_mem, servers, total); } } /* Incoming packets are delivered to this routing immediately when they're * handled by the NIC's device driver if the protocol ID matches 0x9000 */ int anemone_client_recv(sk_buff * skb) { Packet * p = kmem_cache_alloc(pcache, GFP_ATOMIC); ulong flags; local_irq_save(flags); local_irq_disable(); if (debug) printk(KERN_ALERT "Received a packet.\n"); if(p) { p->success = false; p->skb = skb; p->mp = (Message *) skb->data; } else { printk(KERN_ALERT "Bad problem: Could not allocate packet.\n"); return 0; } /*each packet contains server status information except session broadcast msg*/ switch(p->mp->type) { case TYPE_CLIENT_BROADCAST: goto bad; case TYPE_SERVER_BROADCAST: ctrlcount++; update_server_status(skb, p->mp->avail_mem, p->mp->total_mem, p->mp->server_status); goto out; case PAGE_IN_REPLY: case PAGE_OUT_REPLY: update_server_status(skb, p->mp->avail_mem, p->mp->total_mem, p->mp->server_status); process_ack(p); traverse_replies(TRAVERSE_REPLY); break; default: printk(KERN_ALERT "Anemone_client_recv: unrecognized msg type\n"); goto bad; } //if(!valid_checksum()) /* * sk_buffs are pre-allocated by the block device. * So, we free *all* reply packet skbuffs. */ out: kfree_skb(skb); kmem_cache_free(pcache, p); local_irq_restore(flags); return NET_RX_SUCCESS; bad: kfree_skb(skb); kmem_cache_free(pcache, p); local_irq_restore(flags); return NET_RX_DROP; } /* The client module calls this function to send off a single request, either * a read or write (i.e. Page-IN or Page-OUT) */ int request_anemone(bool write, ulong offset, sk_buff *skb, void *backinfo) { Packet * p; int ret = 1; Server *s; if(offset > nsectors) printk(KERN_ALERT "Anemone_Client: Beyond-end write %ld of %ld !!!!!!!!!!!!\n", offset, nsectors); if(!irqs_disabled()) { printk(KERN_ALERT "request_ME(): irq is not disabled!\n"); //while(1); BUG(); } /*Eliminate the need to dd before mkswap*/ if (!write) { if (!(s = (struct Server *)lookup_table(&mapping, (void *)offset))) return -1; else { // printk(KERN_ALERT "request_ME(): wj Yeah get offset in hashtable server mac = %x!\n", s->mac[5]); } } if(atomic64_read(&num_requests) < max_requests) { p = kmem_cache_alloc(pcache, GFP_ATOMIC); /* Initializations */ p->skb = skb; p->skb->len = headers; p->skb->len += write ? PAGE_SIZE : 0; p->skb->dev = NIC; p->meta_back = backinfo; p->success = false; atomic64_set(&p->retry, 0); p->mp = (Message *) (p->skb->data + LINK_HDR); /* Setup the message */ p->m.type = write ? PAGE_OUT : PAGE_IN; p->m.seq = sequence++; p->m.status = 0; p->m.count = PAGE_SIZE; p->m.offset = offset; p->m.vpid = 0; p->serv_internal_time = 0; if(!atomic64_read(&timerset)) RESET_TIMER(); list_add_tail(&p->queue, &pq); atomic64_inc(&num_requests); if(debug) { if(write) printk(KERN_ALERT " Sending page @ %lu\n", (ulong) p->m.offset); else printk(KERN_ALERT " Requesting page @ %lu\n", (ulong) p->m.offset); } *p->mp = p->m; } else { printk(KERN_ALERT " Queue is full. Dropping Request.\n"); packet_drops++; ret = 0; } traverse_replies(TRAVERSE_REQUEST); if(!ret) printk(KERN_ALERT "About to fail: %lu requests\n", (ulong)atomic64_read(&num_requests)); return ret; } /* Transmit a particular request */ void transmit(Packet * p) { Server *s=NULL; #ifdef SPACING try_to_add_space(); #endif if(debug) printk(KERN_ALERT " Transmitting request, seq: %d\n", p->m.seq); atomic_inc(&p->skb->users); if(p->m.type == PAGE_OUT) { if(!(s = (struct Server *) lookup_table(&mapping, (void *)((ulong) p->m.offset)))) { s = (Server *)get_least_utilized(); if (s == NULL ) { printk(KERN_ALERT "No server available for use\n"); if(list_empty(&server_list)) printk(KERN_ALERT "Server list is NULL list\n"); BUG(); } insert_table(&mapping, (void *)((ulong) p->m.offset), s); } if ( 0 && net_ratelimit() && memcmp(dest, s->mac, ETH_ALEN) != 0) { printk(KERN_ALERT " Switch from server %x to server %x\n",dest[5], s->mac[5]); dump_server_status(); } memcpy(dest, s->mac, ETH_ALEN); s->writes++; } else if(p->m.type == PAGE_IN) { s = (struct Server *)lookup_table(&mapping, (void *)((ulong)p->m.offset)); if (!s) { printk(KERN_ALERT "No mapping for this offset %d server %x\n", p->m.offset, s->mac[5]); dump_server_status(); BUG(); } s->reads++; } local_irq_enable(); anemone_send(p->skb, s->mac); local_irq_disable(); p->timestamp = *(timeval *) &p->skb->tstamp; } /* Callback routine to the block-device code. Used to cleanup succesffully * acknowledged requests or to signal asynchronous failure. */ void callback(Packet * p) { timeval curr, sub; ulong flags; local_irq_save(flags); local_irq_disable(); atomic64_dec(&inflight); atomic64_dec(&num_requests); do_gettimeofday(&curr); timersub(&p->timestamp, &curr, &sub); if(p->m.type == PAGE_OUT) { writelat = USEC(sub); write_callback(meta_back, p->meta_back, p->success); } else { readlat = USEC(sub); read_callback(meta_back, p->skb, p->m.offset, p->success, p->meta_back); } list_del(&p->queue); kmem_cache_free(pcache, p); if(debug && !atomic64_read(&num_requests) && net_ratelimit()) printk(KERN_ALERT "Done: queue drops: %lu, retry drops: %lu success: %lu, retransmits: %lu\n", packet_drops, retrans_drops, success, retransmissions); local_irq_restore(flags); } /* Timeout routine to handle retransmission events */ void timeout(ulong traverse) { ulong flags; local_irq_save(flags); local_irq_disable(); traverse_replies(traverse); local_irq_restore(flags); } /* * Loop through the output request queue * and process the contents in a round-robin manner. * * This function is called from the kernel timer * and re-sechedules itself if there is still work to be done. */ void traverse_replies(ulong traverse) { Packet *p = NULL; list_head *x, *y; int totalits = 0; if(debug) printk(KERN_ALERT "Traversing(%lu). drops: %lu rexmits: %lu, qsize: %lu, flight: %lu\n", (ulong) traverse, packet_drops, retransmissions, (ulong)atomic64_read(&num_requests), (ulong)atomic64_read(&inflight)); if(traverse == TRAVERSE_TIMER) { atomic64_set(&timerset, 0); rband = (reads - oldreads) * 4096 * 8 / TIMEOUT / 1000; oldreads = reads; wband = (writes - oldwrites) * 4096 * 8 / TIMEOUT / 1000; oldwrites = writes; } if(!atomic64_read(&num_requests)) return; /* Starting at the front of the FIFO here, * and transmit packets until our window is full. */ list_for_each_safe(x, y, &pq) { totalits++; if(atomic64_read(&inflight) >= WINDOWSIZE) break; p = list_entry(x, Packet, queue); switch(atomic64_read(&p->retry)) { case 0: /* We're sending this packet for the first time */ if(debug) printk(KERN_ALERT "New request...\n"); do_gettimeofday(&p->timestamp); atomic64_set(&p->halflife, jiffies); atomic64_inc(&inflight); atomic64_set(&p->retry, 1); transmit(p); break; case MAX_RETRIES: /* To many retransmissions. Request attempt has failed. */ printk(KERN_ALERT "Too many retries. Dropping packet type %d\n",p->mp->type); retrans_drops++; callback(p); break; default: /* Retransmission */ if((jiffies - atomic64_read(&p->halflife)) < RETRANS_TIMEOUT) break; if(debug) printk(KERN_ALERT "Retransmitting...\n"); atomic64_set(&p->halflife, jiffies); atomic64_inc(&p->retry); retransmissions++; transmit(p); break; } /* Concurrency is not working. Traverse_requests() is * getting intervleaved when reply softirqs come in on top * of this. As such when list_del() occurs, it messes up * the local pointers of the above queue traversal. * * One cannot set spinlocks inside soft_irqs because the * kernel will not issue more than one of the same soft_irq * at the same time. * * Although the concurrency should be fixed, I found that * it doesn't happen that often. Fortunately list_del sets * the next pointer in the LL to a magic value, which * I detect here. If I find it, I just break the loop * and let traverse() get called again later. */ if((void *)(y->next) == LIST_POISON1) { printk(KERN_ALERT "Anemone Client: Breaking at iteration %d out of %lu during %lu\n", totalits, (ulong)atomic64_read(&num_requests), traverse); break; } } /* * If the queue still has packets in it and the timer fired, * then re-schedule the timer. */ if(atomic64_read(&num_requests) && traverse == TRAVERSE_TIMER) RESET_TIMER(); } /* An attempt to add a lower-bounded amount of time between the * transmission of two individual requests */ #ifdef SPACING void try_to_add_space(void) { timeval curr, diff; ulong usec, spin, x = 0, lat = 0; ulong pr = true; do_gettimeofday(&curr); timersub(&curr, &lasttransmit, &diff); pr = net_ratelimit(); if((usec = USEC(diff)) < SPACING) { spaced++; do_gettimeofday(&curr); for(spin = 0; spin < ((SPACING - usec) * MICROSECOND); spin++); x++; do_gettimeofday(&lasttransmit); timersub(&lasttransmit, &curr, &diff); lat = USEC(diff); if(pr) printk(KERN_ALERT "Added Latency: %lu usec\n", lat); } if(pr) printk(KERN_ALERT "Time since last transmit: %lu usec\n", usec); do_gettimeofday(&lasttransmit); } #endif