From d683d32bd5502c15be600145966400a41710a7f4 Mon Sep 17 00:00:00 2001 From: Grissiom Date: Mon, 19 Aug 2013 15:35:56 +0800 Subject: [PATCH] device/pipe: add nonblocking read/write and force write mode The previous implementation will always blocks the reader/writer. However, at least FinSh would expect the device to be nonblocking --- read should return 0 when there is no data in it. --- components/drivers/include/rtdevice.h | 19 +++- components/drivers/src/pipe.c | 127 +++++++++++++++++--------- 2 files changed, 104 insertions(+), 42 deletions(-) diff --git a/components/drivers/include/rtdevice.h b/components/drivers/include/rtdevice.h index bd6e243b0..734839e69 100644 --- a/components/drivers/include/rtdevice.h +++ b/components/drivers/include/rtdevice.h @@ -75,6 +75,20 @@ struct rt_ringbuffer /* pipe device */ #define PIPE_DEVICE(device) ((struct rt_pipe_device*)(device)) +enum rt_pipe_flag +{ + /* both read and write won't block */ + RT_PIPE_FLAG_NONBLOCK_RDWR = 0x00, + /* read would block */ + RT_PIPE_FLAG_BLOCK_RD = 0x01, + /* write would block */ + RT_PIPE_FLAG_BLOCK_WR = 0x02, + /* write to this pipe will discard some data when the pipe is full. + * When this flag is set, RT_PIPE_FLAG_BLOCK_WR will be ignored since write + * operation will always be success. */ + RT_PIPE_FLAG_FORCE_WR = 0x04, +}; + struct rt_pipe_device { struct rt_device parent; @@ -82,6 +96,8 @@ struct rt_pipe_device /* ring buffer in pipe device */ struct rt_ringbuffer ringbuffer; + enum rt_pipe_flag flag; + /* suspended list */ rt_list_t suspended_read_list; rt_list_t suspended_write_list; @@ -199,11 +215,12 @@ rt_inline rt_uint16_t rt_ringbuffer_data_len(struct rt_ringbuffer *rb) */ rt_err_t rt_pipe_init(struct rt_pipe_device *pipe, const char *name, + enum rt_pipe_flag flag, rt_uint8_t *buf, rt_size_t size); rt_err_t rt_pipe_detach(struct rt_pipe_device *pipe); #ifdef RT_USING_HEAP -rt_err_t rt_pipe_create(const char *name, rt_size_t size); +rt_err_t rt_pipe_create(const char *name, enum rt_pipe_flag flag, rt_size_t size); void rt_pipe_destroy(struct rt_pipe_device *pipe); #endif diff --git a/components/drivers/src/pipe.c b/components/drivers/src/pipe.c index 469c2d06e..f01244ce2 100644 --- a/components/drivers/src/pipe.c +++ b/components/drivers/src/pipe.c @@ -26,6 +26,26 @@ #include #include +static void _rt_pipe_resume_writer(struct rt_pipe_device *pipe) +{ + if (!rt_list_isempty(&pipe->suspended_write_list)) + { + rt_thread_t thread; + + RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR); + + /* get suspended thread */ + thread = rt_list_entry(pipe->suspended_write_list.next, + struct rt_thread, + tlist); + + /* resume the write thread */ + rt_thread_resume(thread); + + rt_schedule(); + } +} + static rt_size_t rt_pipe_read(rt_device_t dev, rt_off_t pos, void *buffer, @@ -39,13 +59,26 @@ static rt_size_t rt_pipe_read(rt_device_t dev, pipe = PIPE_DEVICE(dev); RT_ASSERT(pipe != RT_NULL); + if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD)) + { + level = rt_hw_interrupt_disable(); + read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); + + /* if the ringbuffer is empty, there won't be any writer waiting */ + if (read_nbytes) + _rt_pipe_resume_writer(pipe); + + rt_hw_interrupt_enable(level); + + return read_nbytes; + } + thread = rt_thread_self(); /* current context checking */ RT_DEBUG_NOT_IN_INTERRUPT; - do - { + do { level = rt_hw_interrupt_disable(); read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); if (read_nbytes == 0) @@ -60,23 +93,8 @@ static rt_size_t rt_pipe_read(rt_device_t dev, } else { - if (!rt_list_isempty(&pipe->suspended_write_list)) - { - /* get suspended thread */ - thread = rt_list_entry(pipe->suspended_write_list.next, - struct rt_thread, - tlist); - - /* resume the write thread */ - rt_thread_resume(thread); - rt_hw_interrupt_enable(level); - - rt_schedule(); - } - else - { - rt_hw_interrupt_enable(level); - } + _rt_pipe_resume_writer(pipe); + rt_hw_interrupt_enable(level); break; } } while (read_nbytes == 0); @@ -84,6 +102,26 @@ static rt_size_t rt_pipe_read(rt_device_t dev, return read_nbytes; } +static void _rt_pipe_resume_reader(struct rt_pipe_device *pipe) +{ + if (!rt_list_isempty(&pipe->suspended_read_list)) + { + rt_thread_t thread; + + RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD); + + /* get suspended thread */ + thread = rt_list_entry(pipe->suspended_read_list.next, + struct rt_thread, + tlist); + + /* resume the read thread */ + rt_thread_resume(thread); + + rt_schedule(); + } +} + struct rt_pipe_device *_pipe = RT_NULL; static rt_size_t rt_pipe_write(rt_device_t dev, rt_off_t pos, @@ -100,13 +138,31 @@ static rt_size_t rt_pipe_write(rt_device_t dev, if (_pipe == RT_NULL) _pipe = pipe; + if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) || + !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR)) + { + level = rt_hw_interrupt_disable(); + + if (pipe->flag & RT_PIPE_FLAG_FORCE_WR) + write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer), + buffer, size); + else + write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), + buffer, size); + + _rt_pipe_resume_reader(pipe); + + rt_hw_interrupt_enable(level); + + return write_nbytes; + } + thread = rt_thread_self(); /* current context checking */ RT_DEBUG_NOT_IN_INTERRUPT; - do - { + do { level = rt_hw_interrupt_disable(); write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size); if (write_nbytes == 0) @@ -122,26 +178,11 @@ static rt_size_t rt_pipe_write(rt_device_t dev, } else { - if (!rt_list_isempty(&pipe->suspended_read_list)) - { - /* get suspended thread */ - thread = rt_list_entry(pipe->suspended_read_list.next, - struct rt_thread, - tlist); - - /* resume the read thread */ - rt_thread_resume(thread); - rt_hw_interrupt_enable(level); - - rt_schedule(); - } - else - { - rt_hw_interrupt_enable(level); - } + _rt_pipe_resume_reader(pipe); + rt_hw_interrupt_enable(level); break; } - }while (write_nbytes == 0); + } while (write_nbytes == 0); return write_nbytes; } @@ -157,6 +198,7 @@ static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args) * * @param pipe the pipe device * @param name the name of pipe device + * @param flag the attribute of the pipe device * @param buf the buffer of pipe device * @param size the size of pipe device buffer * @@ -164,6 +206,7 @@ static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args) */ rt_err_t rt_pipe_init(struct rt_pipe_device *pipe, const char *name, + enum rt_pipe_flag flag, rt_uint8_t *buf, rt_size_t size) { @@ -177,6 +220,8 @@ rt_err_t rt_pipe_init(struct rt_pipe_device *pipe, /* initialize ring buffer */ rt_ringbuffer_init(&pipe->ringbuffer, buf, size); + pipe->flag = flag; + /* create pipe */ pipe->parent.type = RT_Device_Class_Char; pipe->parent.init = RT_NULL; @@ -204,7 +249,7 @@ rt_err_t rt_pipe_detach(struct rt_pipe_device *pipe) RTM_EXPORT(rt_pipe_detach); #ifdef RT_USING_HEAP -rt_err_t rt_pipe_create(const char *name, rt_size_t size) +rt_err_t rt_pipe_create(const char *name, enum rt_pipe_flag flag, rt_size_t size) { rt_uint8_t *rb_memptr = RT_NULL; struct rt_pipe_device *pipe = RT_NULL; @@ -223,7 +268,7 @@ rt_err_t rt_pipe_create(const char *name, rt_size_t size) return -RT_ENOMEM; } - return rt_pipe_init(pipe, name, rb_memptr, size); + return rt_pipe_init(pipe, name, flag, rb_memptr, size); } RTM_EXPORT(rt_pipe_create);