From: Linux Kernel Mailing List To: bk-commits-head@vger.kernel.org Subject: Make pipe data structure be a circular list of pages, rather than Date: Fri, 07 Jan 2005 00:29:13 +0000 Archive-link: Article, Thread ChangeSet 1.2229.1.1, 2005/01/06 16:29:13-08:00, torvalds@ppc970.osdl.org Make pipe data structure be a circular list of pages, rather than a circular list of one page. This improves pipe throughput, and allows us to (eventually) use these lists of page buffers for moving data around efficiently. fs/pipe.c | 130 ++++++++++++++++++++++++++++++---------------- include/linux/pipe_fs_i.h | 19 +++--- 2 files changed, 97 insertions(+), 52 deletions(-) diff -Nru a/fs/pipe.c b/fs/pipe.c --- a/fs/pipe.c 2005-01-06 19:13:23 -08:00 +++ b/fs/pipe.c 2005-01-06 19:13:23 -08:00 @@ -14,6 +14,8 @@ #include #include #include +#include + #include #include @@ -89,6 +91,7 @@ unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; + struct pipe_inode_info *info; int do_wakeup; ssize_t ret; struct iovec *iov = (struct iovec *)_iov; @@ -102,32 +105,40 @@ do_wakeup = 0; ret = 0; down(PIPE_SEM(*inode)); + info = inode->i_pipe; for (;;) { - int size = PIPE_LEN(*inode); - if (size) { - char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode); - ssize_t chars = PIPE_MAX_RCHUNK(*inode); + int bufs = info->nrbufs; + if (bufs) { + int curbuf = info->curbuf; + struct pipe_buffer *buf = info->bufs + curbuf; + size_t chars = buf->len; + int error; if (chars > total_len) chars = total_len; - if (chars > size) - chars = size; - if (pipe_iov_copy_to_user(iov, pipebuf, chars)) { + error = pipe_iov_copy_to_user(iov, kmap(buf->page) + buf->offset, chars); + kunmap(buf->page); + if (unlikely(error)) { if (!ret) ret = -EFAULT; break; } ret += chars; - - PIPE_START(*inode) += chars; - PIPE_START(*inode) &= (PIPE_SIZE - 1); - PIPE_LEN(*inode) -= chars; + buf->offset += chars; + buf->len -= chars; + if (!buf->len) { + __free_page(buf->page); + buf->page = NULL; + curbuf = (curbuf + 1) & (PIPE_BUFFERS-1); + info->curbuf = curbuf; + info->nrbufs = --bufs; + do_wakeup = 1; + } total_len -= chars; - do_wakeup = 1; if (!total_len) break; /* common path: read succeeded */ } - if (PIPE_LEN(*inode)) /* test for cyclic buffers */ + if (bufs) /* More to do? */ continue; if (!PIPE_WRITERS(*inode)) break; @@ -177,8 +188,8 @@ unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; + struct pipe_inode_info *info; ssize_t ret; - size_t min; int do_wakeup; struct iovec *iov = (struct iovec *)_iov; size_t total_len; @@ -190,48 +201,58 @@ do_wakeup = 0; ret = 0; - min = total_len; - if (min > PIPE_BUF) - min = 1; down(PIPE_SEM(*inode)); + info = inode->i_pipe; for (;;) { - int free; + int bufs; if (!PIPE_READERS(*inode)) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } - free = PIPE_FREE(*inode); - if (free >= min) { - /* transfer data */ - ssize_t chars = PIPE_MAX_WCHUNK(*inode); - char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode); + bufs = info->nrbufs; + if (bufs < PIPE_BUFFERS) { + ssize_t chars; + int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1); + struct pipe_buffer *buf = info->bufs + newbuf; + struct page *page = alloc_page(GFP_USER); + int error; + + if (unlikely(!page)) { + ret = ret ? : -ENOMEM; + break; + } /* Always wakeup, even if the copy fails. Otherwise * we lock up (O_NONBLOCK-)readers that sleep due to * syscall merging. + * FIXME! Is this really true? */ do_wakeup = 1; + chars = PAGE_SIZE; if (chars > total_len) chars = total_len; - if (chars > free) - chars = free; - if (pipe_iov_copy_from_user(pipebuf, iov, chars)) { + error = pipe_iov_copy_from_user(kmap(page), iov, chars); + kunmap(page); + if (unlikely(error)) { if (!ret) ret = -EFAULT; + __free_page(page); break; } ret += chars; - PIPE_LEN(*inode) += chars; + /* Insert it into the buffer array */ + buf->page = page; + buf->offset = 0; + buf->len = chars; + info->nrbufs = ++bufs; + total_len -= chars; if (!total_len) break; } - if (PIPE_FREE(*inode) && ret) { - /* handle cyclic data buffers */ - min = 1; + if (bufs < PIPE_BUFFERS) continue; - } if (filp->f_flags & O_NONBLOCK) { if (!ret) ret = -EAGAIN; break; @@ -283,9 +304,23 @@ pipe_ioctl(struct inode *pino, struct file *filp, unsigned int cmd, unsigned long arg) { + struct inode *inode = filp->f_dentry->d_inode; + struct pipe_inode_info *info; + int count, buf, nrbufs; + switch (cmd) { case FIONREAD: - return put_user(PIPE_LEN(*pino), (int __user *)arg); + down(PIPE_SEM(*inode)); + info = inode->i_pipe; + count = 0; + buf = info->curbuf; + nrbufs = info->nrbufs; + while (--nrbufs >= 0) { + count += info->bufs[buf].len; + buf = (buf+1) & (PIPE_BUFFERS-1); + } + up(PIPE_SEM(*inode)); + return put_user(count, (int __user *)arg); default: return -EINVAL; } @@ -297,13 +332,16 @@ { unsigned int mask; struct inode *inode = filp->f_dentry->d_inode; + struct pipe_inode_info *info = inode->i_pipe; + int nrbufs; poll_wait(filp, PIPE_WAIT(*inode), wait); /* Reading only -- no need for acquiring the semaphore. */ - mask = POLLIN | POLLRDNORM; - if (PIPE_EMPTY(*inode)) - mask = POLLOUT | POLLWRNORM; + nrbufs = info->nrbufs; + mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0; + mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0; + if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode)) mask |= POLLHUP; if (!PIPE_READERS(*inode)) @@ -529,31 +567,37 @@ void free_pipe_info(struct inode *inode) { + int i; struct pipe_inode_info *info = inode->i_pipe; + inode->i_pipe = NULL; - free_page((unsigned long)info->base); + for (i = 0; i < PIPE_BUFFERS; i++) { + struct page *page = info->bufs[i].page; + + /* We'll make this a data-dependent free some day .. */ + if (page) + __free_page(page); + } kfree(info); } struct inode* pipe_new(struct inode* inode) { unsigned long page; + struct pipe_inode_info *info; page = __get_free_page(GFP_USER); if (!page) return NULL; - inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); - if (!inode->i_pipe) + info = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); + if (!info) goto fail_page; + memset(info, 0, sizeof(*info)); + inode->i_pipe = info; init_waitqueue_head(PIPE_WAIT(*inode)); - PIPE_BASE(*inode) = (char*) page; - PIPE_START(*inode) = PIPE_LEN(*inode) = 0; - PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; - PIPE_WAITING_WRITERS(*inode) = 0; PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1; - *PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL; return inode; fail_page: diff -Nru a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h --- a/include/linux/pipe_fs_i.h 2005-01-06 19:13:23 -08:00 +++ b/include/linux/pipe_fs_i.h 2005-01-06 19:13:23 -08:00 @@ -2,10 +2,18 @@ #define _LINUX_PIPE_FS_I_H #define PIPEFS_MAGIC 0x50495045 + +#define PIPE_BUFFERS (16) + +struct pipe_buffer { + struct page *page; + unsigned short offset, len; +}; + struct pipe_inode_info { wait_queue_head_t wait; - char *base; - unsigned int len; + unsigned int nrbufs, curbuf; + struct pipe_buffer bufs[PIPE_BUFFERS]; unsigned int start; unsigned int readers; unsigned int writers; @@ -32,13 +40,6 @@ #define PIPE_WCOUNTER(inode) ((inode).i_pipe->w_counter) #define PIPE_FASYNC_READERS(inode) (&((inode).i_pipe->fasync_readers)) #define PIPE_FASYNC_WRITERS(inode) (&((inode).i_pipe->fasync_writers)) - -#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0) -#define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE) -#define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode)) -#define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1)) -#define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode)) -#define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode)) /* Drop the inode semaphore and wait for a pipe event, atomically */ void pipe_wait(struct inode * inode);