diff --git a/components/drivers/include/ipc/pipe.h b/components/drivers/include/ipc/pipe.h index f9f2118f59..08aa287c6a 100644 --- a/components/drivers/include/ipc/pipe.h +++ b/components/drivers/include/ipc/pipe.h @@ -7,8 +7,6 @@ #include #include -#if defined(RT_USING_POSIX) - #ifndef RT_PIPE_BUFSZ #define PIPE_BUFSZ 512 #else @@ -21,6 +19,7 @@ struct rt_pipe_device /* ring buffer in pipe device */ struct rt_ringbuffer *fifo; + rt_uint16_t bufsz; rt_uint8_t readers; rt_uint8_t writers; @@ -32,7 +31,6 @@ struct rt_pipe_device }; typedef struct rt_pipe_device rt_pipe_t; -rt_pipe_t *rt_pipe_create(const char *name); +rt_pipe_t *rt_pipe_create(const char *name, int bufsz); -#endif /* RT_USING_POSIX */ #endif /* PIPE_H__ */ diff --git a/components/drivers/src/pipe.c b/components/drivers/src/pipe.c index 83f47cc27a..223337f81b 100644 --- a/components/drivers/src/pipe.c +++ b/components/drivers/src/pipe.c @@ -43,7 +43,7 @@ static int pipe_fops_open(struct dfs_fd *fd) if (device->ref_count == 0) { - pipe->fifo = rt_ringbuffer_create(PIPE_BUFSZ); + pipe->fifo = rt_ringbuffer_create(pipe->bufsz); } switch (fd->flags & O_ACCMODE) @@ -182,7 +182,6 @@ static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count) out: rt_mutex_release(&pipe->lock); - return len; } @@ -245,15 +244,14 @@ static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count) rt_wqueue_wait(&(pipe->writer_queue), 0, -1); rt_mutex_take(&pipe->lock, -1); } - rt_mutex_release(&pipe->lock); + if (wakeup) { rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN); } out: - return ret; } @@ -319,8 +317,97 @@ static const struct dfs_file_ops pipe_fops = RT_NULL, pipe_fops_poll, }; +#endif /* end of RT_USING_POSIX */ -rt_pipe_t *rt_pipe_create(const char *name) +rt_err_t rt_pipe_open (rt_device_t device, rt_uint16_t oflag) +{ + rt_pipe_t *pipe = (rt_pipe_t *)device; + + if (device == RT_NULL) return -RT_EINVAL; + rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); + + if (pipe->fifo == RT_NULL) + { + pipe->fifo = rt_ringbuffer_create(pipe->bufsz); + } + + rt_mutex_release(&(pipe->lock)); + + return RT_EOK; +} + +rt_err_t rt_pipe_close (rt_device_t device) +{ + rt_pipe_t *pipe = (rt_pipe_t *)device; + + if (device == RT_NULL) return -RT_EINVAL; + rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); + + if (device->ref_count == 1) + { + rt_ringbuffer_destroy(pipe->fifo); + pipe->fifo = RT_NULL; + } + + rt_mutex_release(&(pipe->lock)); + + return RT_EOK; +} + +rt_size_t rt_pipe_read (rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count) +{ + uint8_t *pbuf; + int read_bytes = 0; + rt_pipe_t *pipe = (rt_pipe_t *)device; + + if (device == RT_NULL) return -EINVAL; + if (count == 0) return 0; + + pbuf = (uint8_t*)buffer; + rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); + + while (read_bytes < count) + { + int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes); + if (len <= 0) break; + + read_bytes += len; + } + rt_mutex_release(&pipe->lock); + + return read_bytes; +} + +rt_size_t rt_pipe_write (rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count) +{ + uint8_t *pbuf; + int write_bytes = 0; + rt_pipe_t *pipe = (rt_pipe_t *)device; + + if (device == RT_NULL) return -EINVAL; + if (count == 0) return 0; + + pbuf = (uint8_t*)buffer; + rt_mutex_take(&pipe->lock, -1); + + while (write_bytes < count) + { + int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes); + if (len <= 0) break; + + write_bytes += len; + } + rt_mutex_release(&pipe->lock); + + return write_bytes; +} + +rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args) +{ + return RT_EOK; +} + +rt_pipe_t *rt_pipe_create(const char *name, int bufsz) { rt_pipe_t *pipe; rt_device_t dev; @@ -333,15 +420,29 @@ rt_pipe_t *rt_pipe_create(const char *name) rt_list_init(&(pipe->reader_queue)); rt_list_init(&(pipe->writer_queue)); + RT_ASSERT(bufsz < 0xFFFF); + pipe->bufsz = bufsz; + dev = &(pipe->parent); dev->type = RT_Device_Class_Pipe; + dev->init = RT_NULL; + dev->open = rt_pipe_open; + dev->read = rt_pipe_read; + dev->write = rt_pipe_write; + dev->close = rt_pipe_close; + dev->control = rt_pipe_control; + + dev->rx_indicate = RT_NULL; + dev->tx_complete = RT_NULL; if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0) { rt_free(pipe); return RT_NULL; } +#ifdef RT_USING_POSIX dev->fops = (void*)&pipe_fops; +#endif return pipe; } @@ -368,21 +469,28 @@ int rt_pipe_delete(const char *name) rt_mutex_detach(&(pipe->lock)); rt_device_unregister(device); + /* close fifo ringbuffer */ + if (pipe->fifo) + { + rt_ringbuffer_destroy(pipe->fifo); + pipe->fifo = RT_NULL; + } rt_free(pipe); } else { - result = -1; + result = -ENODEV; } } else { - result = -1; + result = -ENODEV; } return result; } +#ifdef RT_USING_POSIX int pipe(int fildes[2]) { rt_pipe_t *pipe; @@ -392,7 +500,7 @@ int pipe(int fildes[2]) rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++); - pipe = rt_pipe_create(dname); + pipe = rt_pipe_create(dname, PIPE_BUFSZ); if (pipe == RT_NULL) { return -1; @@ -419,7 +527,7 @@ int mkfifo(const char *path, mode_t mode) { rt_pipe_t *pipe; - pipe = rt_pipe_create(path); + pipe = rt_pipe_create(path, PIPE_BUFSZ); if (pipe == RT_NULL) { return -1;