From 4f05dd5426c277cd1b14f7d20f678983f88b2c0a Mon Sep 17 00:00:00 2001 From: "bernard.xiong@gmail.com" Date: Sun, 30 Sep 2012 07:33:13 +0000 Subject: [PATCH] Add pipe, data queue implementation; Fix the issue which leaks one item in the available data of ring buffer. git-svn-id: https://rt-thread.googlecode.com/svn/trunk@2313 bbd45198-f89e-11dd-88c7-29a3b14d5316 --- components/drivers/include/rtdevice.h | 76 ++++++- components/drivers/src/completion.c | 229 ++++++++++--------- components/drivers/src/dataqueue.c | 315 ++++++++++++++++++++++++++ components/drivers/src/pipe.c | 196 ++++++++++++++++ components/drivers/src/ringbuffer.c | 119 +++++----- 5 files changed, 754 insertions(+), 181 deletions(-) create mode 100644 components/drivers/src/dataqueue.c create mode 100644 components/drivers/src/pipe.c diff --git a/components/drivers/include/rtdevice.h b/components/drivers/include/rtdevice.h index afc4ac7da..84705cbf4 100644 --- a/components/drivers/include/rtdevice.h +++ b/components/drivers/include/rtdevice.h @@ -14,6 +14,8 @@ struct rt_completion rt_list_t suspended_list; }; +#define RT_RINGBUFFER_SIZE(rb) ((rb)->write_index - (rb)->read_index) +#define RT_RINGBUFFER_EMPTY(rb) ((rb)->buffer_size - RT_RINGBUFFER_SIZE(rb)) /* ring buffer */ struct rt_ringbuffer { @@ -22,6 +24,47 @@ struct rt_ringbuffer rt_uint16_t buffer_size; }; +/* pipe device */ +#define PIPE_DEVICE(device) ((struct rt_pipe_device*)(device)) +struct rt_pipe_device +{ + struct rt_device parent; + + /* ring buffer in pipe device */ + struct rt_ringbuffer ringbuffer; + + /* suspended list */ + rt_list_t suspended_read_list; + rt_list_t suspended_write_list; +}; + +#define RT_DATAQUEUE_EVENT_UNKNOWN 0x00 +#define RT_DATAQUEUE_EVENT_POP 0x01 +#define RT_DATAQUEUE_EVENT_PUSH 0x02 +#define RT_DATAQUEUE_EVENT_LWM 0x03 + +struct rt_data_item; +#define RT_DATAQUEUE_SIZE(dq) ((dq)->put_index - (dq)->get_index) +#define RT_DATAQUEUE_EMPTY(dq) ((dq)->size - RT_DATAQUEUE_SIZE(dq)) +/* data queue implementation */ +struct rt_data_queue +{ + rt_uint16_t size; + rt_uint16_t lwm; + rt_bool_t waiting_lwm; + + rt_uint16_t get_index; + rt_uint16_t put_index; + + struct rt_data_item *queue; + + rt_list_t suspended_push_list; + rt_list_t suspended_pop_list; + + /* event notify */ + void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event); +}; + /** * Completion */ @@ -30,12 +73,11 @@ rt_err_t rt_completion_wait(struct rt_completion* completion, rt_int32_t timeout); void rt_completion_done(struct rt_completion* completion); -/** - * DataLink for DeviceDriver - */ - /** * RingBuffer for DeviceDriver + * + * Please note that the ring buffer implementation of RT-Thread + * has no thread wait or resume feature. */ void rt_ringbuffer_init(struct rt_ringbuffer* rb, rt_uint8_t *pool, @@ -49,8 +91,29 @@ rt_size_t rt_ringbuffer_get(struct rt_ringbuffer* rb, rt_uint8_t *ptr, rt_uint16_t length); rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch); -rt_size_t rt_ringbuffer_available_size(struct rt_ringbuffer* rb); -rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb); +rt_inline rt_uint16_t rt_ringbuffer_get_size(struct rt_ringbuffer* rb) +{ + RT_ASSERT(rb != RT_NULL); + return rb->buffer_size; +} + +/** + * Pipe Device + */ +rt_err_t rt_pipe_create(const char* name, rt_size_t size); +void rt_pipe_destroy(struct rt_pipe_device* pipe); + +/** + * DataQueue for DeviceDriver + */ +rt_err_t rt_data_queue_init(struct rt_data_queue* queue, rt_uint16_t size, rt_uint16_t lwm, + void (*evt_notify)(struct rt_data_queue* queue, rt_uint32_t event)); +rt_err_t rt_data_queue_push(struct rt_data_queue* queue, void* data_ptr, rt_size_t data_size, + rt_int32_t timeout); +rt_err_t rt_data_queue_pop(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size, + rt_int32_t timeout); +rt_err_t rt_data_queue_peak(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size); +void rt_data_queue_reset(struct rt_data_queue* queue); #ifdef RT_USING_SPI #include "drivers/spi.h" @@ -93,3 +156,4 @@ rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb); #endif #endif /* __RT_DEVICE_H__ */ + diff --git a/components/drivers/src/completion.c b/components/drivers/src/completion.c index 220fc04a8..d35da7a10 100644 --- a/components/drivers/src/completion.c +++ b/components/drivers/src/completion.c @@ -1,110 +1,121 @@ -/** - * Complete implementation in Device Drivers - */ -#include -#include -#include - -#define RT_COMPLETED 1 -#define RT_UNCOMPLETED 0 - -void rt_completion_init(struct rt_completion* completion) -{ - rt_base_t level; - RT_ASSERT(completion != RT_NULL); - - level = rt_hw_interrupt_disable(); - completion->flag = RT_UNCOMPLETED; - rt_list_init(&completion->suspended_list); - rt_hw_interrupt_enable(level); -} - -rt_err_t rt_completion_wait(struct rt_completion* completion, rt_int32_t timeout) -{ - rt_err_t result; - rt_base_t level; - rt_thread_t thread; - RT_ASSERT(completion != RT_NULL); - - result = RT_EOK; - thread = rt_thread_self(); - - level = rt_hw_interrupt_disable(); - if (completion->flag != RT_COMPLETED) - { - /* only one thread can suspend on complete */ - RT_ASSERT(rt_list_isempty(&(completion->suspended_list))); - - if (timeout == 0) - { - result = -RT_ETIMEOUT; - goto __exit; - } - else - { +/* + * File : completion.c + * This file is part of RT-Thread RTOS + * COPYRIGHT (C) 2012, RT-Thread Development Team + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rt-thread.org/license/LICENSE + * + * Change Logs: + * Date Author Notes + * 2012-09-30 Bernard first version. + */ + +#include +#include +#include + +#define RT_COMPLETED 1 +#define RT_UNCOMPLETED 0 + +void rt_completion_init(struct rt_completion* completion) +{ + rt_base_t level; + RT_ASSERT(completion != RT_NULL); + + level = rt_hw_interrupt_disable(); + completion->flag = RT_UNCOMPLETED; + rt_list_init(&completion->suspended_list); + rt_hw_interrupt_enable(level); +} + +rt_err_t rt_completion_wait(struct rt_completion* completion, rt_int32_t timeout) +{ + rt_err_t result; + rt_base_t level; + rt_thread_t thread; + RT_ASSERT(completion != RT_NULL); + + result = RT_EOK; + thread = rt_thread_self(); + + level = rt_hw_interrupt_disable(); + if (completion->flag != RT_COMPLETED) + { + /* only one thread can suspend on complete */ + RT_ASSERT(rt_list_isempty(&(completion->suspended_list))); + + if (timeout == 0) + { + result = -RT_ETIMEOUT; + goto __exit; + } + else + { /* reset thread error number */ - thread->error = RT_EOK; - - /* suspend thread */ - rt_thread_suspend(thread); - /* add to suspended list */ - rt_list_insert_before(&(completion->suspended_list), &(thread->tlist)); - - /* current context checking */ - RT_DEBUG_NOT_IN_INTERRUPT; - - /* start timer */ - if (timeout > 0) - { - /* reset the timeout of thread timer and start it */ - rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); - rt_timer_start(&(thread->thread_timer)); - } - /* enable interrupt */ - rt_hw_interrupt_enable(level); - - /* do schedule */ - rt_schedule(); - - /* thread is waked up */ - result = thread->error; - - level = rt_hw_interrupt_disable(); - /* clean completed flag */ - completion->flag = RT_UNCOMPLETED; - } - } - -__exit: - rt_hw_interrupt_enable(level); - return result; -} - -void rt_completion_done(struct rt_completion* completion) -{ - rt_base_t level; - RT_ASSERT(completion != RT_NULL); - - level = rt_hw_interrupt_disable(); - completion->flag = RT_COMPLETED; - - if (!rt_list_isempty(&(completion->suspended_list))) - { - /* there is one thread in suspended list */ - struct rt_thread *thread; - - /* get thread entry */ - thread = rt_list_entry(completion->suspended_list.next, struct rt_thread, tlist); - - /* resume it */ - rt_thread_resume(thread); - rt_hw_interrupt_enable(level); - - /* perform a schedule */ - rt_schedule(); - } - else - { - rt_hw_interrupt_enable(level); - } -} + thread->error = RT_EOK; + + /* suspend thread */ + rt_thread_suspend(thread); + /* add to suspended list */ + rt_list_insert_before(&(completion->suspended_list), &(thread->tlist)); + + /* current context checking */ + RT_DEBUG_NOT_IN_INTERRUPT; + + /* start timer */ + if (timeout > 0) + { + /* reset the timeout of thread timer and start it */ + rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); + rt_timer_start(&(thread->thread_timer)); + } + /* enable interrupt */ + rt_hw_interrupt_enable(level); + + /* do schedule */ + rt_schedule(); + + /* thread is waked up */ + result = thread->error; + + level = rt_hw_interrupt_disable(); + /* clean completed flag */ + completion->flag = RT_UNCOMPLETED; + } + } + +__exit: + rt_hw_interrupt_enable(level); + return result; +} + +void rt_completion_done(struct rt_completion* completion) +{ + rt_base_t level; + RT_ASSERT(completion != RT_NULL); + + level = rt_hw_interrupt_disable(); + completion->flag = RT_COMPLETED; + + if (!rt_list_isempty(&(completion->suspended_list))) + { + /* there is one thread in suspended list */ + struct rt_thread *thread; + + /* get thread entry */ + thread = rt_list_entry(completion->suspended_list.next, struct rt_thread, tlist); + + /* resume it */ + rt_thread_resume(thread); + rt_hw_interrupt_enable(level); + + /* perform a schedule */ + rt_schedule(); + } + else + { + rt_hw_interrupt_enable(level); + } +} diff --git a/components/drivers/src/dataqueue.c b/components/drivers/src/dataqueue.c new file mode 100644 index 000000000..353934e26 --- /dev/null +++ b/components/drivers/src/dataqueue.c @@ -0,0 +1,315 @@ +/* + * File : dataqueue.c + * This file is part of RT-Thread RTOS + * COPYRIGHT (C) 2012, RT-Thread Development Team + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rt-thread.org/license/LICENSE + * + * Change Logs: + * Date Author Notes + * 2012-09-30 Bernard first version. + */ +#include +#include +#include + +struct rt_data_item +{ + void* data_ptr; + rt_size_t data_size; +}; + +rt_err_t rt_data_queue_init(struct rt_data_queue* queue, rt_uint16_t size, rt_uint16_t lwm, + void (*evt_notify)(struct rt_data_queue* queue, rt_uint32_t event)) +{ + RT_ASSERT(queue != RT_NULL); + + queue->evt_notify = evt_notify; + + queue->size = size; + queue->lwm = lwm; + queue->waiting_lwm = RT_FALSE; + + queue->get_index = 0; + queue->put_index = 0; + + rt_list_init(&(queue->suspended_push_list)); + rt_list_init(&(queue->suspended_pop_list)); + + queue->queue = (struct rt_data_item*) rt_malloc(sizeof(struct rt_data_item) * size); + if (queue->queue == RT_NULL) + { + return -RT_ENOMEM; + } + + return RT_EOK; +} +RTM_EXPORT(rt_data_queue_init); + +rt_err_t rt_data_queue_push(struct rt_data_queue* queue, void* data_ptr, rt_size_t data_size, rt_int32_t timeout) +{ + rt_uint16_t mask; + rt_ubase_t level; + rt_thread_t thread; + rt_err_t result; + + RT_ASSERT(queue != RT_NULL); + + result = RT_EOK; + thread = rt_thread_self(); + mask = queue->size - 1; + + level = rt_hw_interrupt_disable(); + while (queue->put_index - queue->get_index == queue->size) + { + queue->waiting_lwm = RT_TRUE; + + /* queue is full */ + if (timeout == 0) + { + result = -RT_ETIMEOUT; + + goto __exit; + } + + /* current context checking */ + RT_DEBUG_NOT_IN_INTERRUPT; + + /* reset thread error number */ + thread->error = RT_EOK; + + /* suspend thread on the push list */ + rt_thread_suspend(thread); + rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist)); + /* start timer */ + if (timeout > 0) + { + /* reset the timeout of thread timer and start it */ + rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); + rt_timer_start(&(thread->thread_timer)); + } + + /* enable interrupt */ + rt_hw_interrupt_enable(level); + + /* do schedule */ + rt_schedule(); + + /* thread is waked up */ + result = thread->error; + level = rt_hw_interrupt_disable(); + if (result != RT_EOK) goto __exit; + } + + queue->queue[queue->put_index & mask].data_ptr = data_ptr; + queue->queue[queue->put_index & mask].data_size = data_size; + queue->put_index += 1; + + if (!rt_list_isempty(&(queue->suspended_pop_list))) + { + /* there is at least one thread in suspended list */ + + /* get thread entry */ + thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist); + + /* resume it */ + rt_thread_resume(thread); + rt_hw_interrupt_enable(level); + + /* perform a schedule */ + rt_schedule(); + + return result; + } + +__exit: + rt_hw_interrupt_enable(level); + if ((result == RT_EOK) && queue->evt_notify != RT_NULL) + { + queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH); + } + + return result; +} +RTM_EXPORT(rt_data_queue_push); + +rt_err_t rt_data_queue_pop(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size, + rt_int32_t timeout) +{ + rt_ubase_t level; + rt_thread_t thread; + rt_err_t result; + rt_uint16_t mask; + + RT_ASSERT(queue != RT_NULL); + RT_ASSERT(data_ptr != RT_NULL); + RT_ASSERT(size != RT_NULL); + + result = RT_EOK; + thread = rt_thread_self(); + mask = queue->size - 1; + + level = rt_hw_interrupt_disable(); + while (queue->get_index == queue->put_index) + { + /* queue is empty */ + if (timeout == 0) + { + result = -RT_ETIMEOUT; + goto __exit; + } + + /* current context checking */ + RT_DEBUG_NOT_IN_INTERRUPT; + + /* reset thread error number */ + thread->error = RT_EOK; + + /* suspend thread on the pop list */ + rt_thread_suspend(thread); + rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist)); + /* start timer */ + if (timeout > 0) + { + /* reset the timeout of thread timer and start it */ + rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); + rt_timer_start(&(thread->thread_timer)); + } + + /* enable interrupt */ + rt_hw_interrupt_enable(level); + + /* do schedule */ + rt_schedule(); + + /* thread is waked up */ + result = thread->error; + level = rt_hw_interrupt_disable(); + if (result != RT_EOK) goto __exit; + } + + *data_ptr = queue->queue[queue->get_index & mask].data_ptr; + *size = queue->queue[queue->get_index & mask].data_size; + + queue->get_index += 1; + + if ((queue->waiting_lwm == RT_TRUE) && + (queue->put_index - queue->get_index) <= queue->lwm) + { + queue->waiting_lwm = RT_FALSE; + + /* there is at least one thread in suspended list and less than low water mark */ + if (!rt_list_isempty(&(queue->suspended_push_list))) + { + /* get thread entry */ + thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist); + + /* resume it */ + rt_thread_resume(thread); + rt_hw_interrupt_enable(level); + + /* perform a schedule */ + rt_schedule(); + } + + if (queue->evt_notify != RT_NULL) + queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM); + + return result; + } + +__exit: + rt_hw_interrupt_enable(level); + if ((result == RT_EOK) && (queue->evt_notify != RT_NULL)) + { + queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP); + } + + return result; +} +RTM_EXPORT(rt_data_queue_pop); + +rt_err_t rt_data_queue_peak(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size) +{ + rt_ubase_t level; + rt_uint16_t mask; + + RT_ASSERT(queue != RT_NULL); + + mask = queue->size - 1; + + level = rt_hw_interrupt_disable(); + + if (queue->get_index == queue->put_index) + { + rt_hw_interrupt_enable(level); + return -RT_EEMPTY; + } + + *data_ptr = queue->queue[queue->get_index & mask].data_ptr; + *size = queue->queue[queue->get_index & mask].data_size; + + rt_hw_interrupt_enable(level); + return RT_EOK; +} +RTM_EXPORT(rt_data_queue_peak); + +void rt_data_queue_reset(struct rt_data_queue* queue) +{ + struct rt_thread *thread; + register rt_ubase_t temp; + + rt_enter_critical(); + /* wakeup all suspend threads */ + + /* resume on pop list */ + while (!rt_list_isempty(&(queue->suspended_pop_list))) + { + /* disable interrupt */ + temp = rt_hw_interrupt_disable(); + + /* get next suspend thread */ + thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist); + /* set error code to RT_ERROR */ + thread->error = -RT_ERROR; + + /* + * resume thread + * In rt_thread_resume function, it will remove current thread from + * suspend list + */ + rt_thread_resume(thread); + + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + } + + /* resume on push list */ + while (!rt_list_isempty(&(queue->suspended_push_list))) + { + /* disable interrupt */ + temp = rt_hw_interrupt_disable(); + + /* get next suspend thread */ + thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist); + /* set error code to RT_ERROR */ + thread->error = -RT_ERROR; + + /* + * resume thread + * In rt_thread_resume function, it will remove current thread from + * suspend list + */ + rt_thread_resume(thread); + + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + } + rt_exit_critical(); + + rt_schedule(); +} +RTM_EXPORT(rt_data_queue_reset); + diff --git a/components/drivers/src/pipe.c b/components/drivers/src/pipe.c new file mode 100644 index 000000000..142f1d939 --- /dev/null +++ b/components/drivers/src/pipe.c @@ -0,0 +1,196 @@ +/* + * File : pipe.c + * This file is part of RT-Thread RTOS + * COPYRIGHT (C) 2012, RT-Thread Development Team + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rt-thread.org/license/LICENSE + * + * Change Logs: + * Date Author Notes + * 2012-09-30 Bernard first version. + */ +#include +#include +#include + +static rt_size_t rt_pipe_read(rt_device_t dev, rt_off_t pos, void *buffer, rt_size_t size) +{ + rt_uint32_t level; + rt_thread_t thread; + struct rt_pipe_device *pipe; + rt_size_t read_nbytes; + + pipe = PIPE_DEVICE(dev); + RT_ASSERT(pipe != RT_NULL); + + thread = rt_thread_self(); + + /* current context checking */ + RT_DEBUG_NOT_IN_INTERRUPT; + + do + { + level = rt_hw_interrupt_disable(); + read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); + if (read_nbytes == 0) + { + rt_thread_suspend(thread); + /* waiting on suspended read list */ + rt_list_insert_before(&(pipe->suspended_read_list), &(thread->tlist)); + rt_hw_interrupt_enable(level); + + rt_schedule(); + } + else + { + if (!rt_list_isempty(&pipe->suspended_write_list)) + { + /* get suspended thread */ + thread = rt_list_entry(pipe->suspended_write_list.next, + struct rt_thread, tlist); + + /* resume the write thread */ + rt_thread_resume(thread); + rt_hw_interrupt_enable(level); + + rt_schedule(); + } + else + { + rt_hw_interrupt_enable(level); + } + break; + } + } while (read_nbytes == 0); + + return read_nbytes; +} + +struct rt_pipe_device *_pipe = RT_NULL; +static rt_size_t rt_pipe_write(rt_device_t dev, rt_off_t pos, const void *buffer, rt_size_t size) +{ + rt_uint32_t level; + rt_thread_t thread; + struct rt_pipe_device *pipe; + rt_size_t write_nbytes; + + pipe = PIPE_DEVICE(dev); + RT_ASSERT(pipe != RT_NULL); + if (_pipe == RT_NULL) + _pipe = pipe; + + thread = rt_thread_self(); + + /* current context checking */ + RT_DEBUG_NOT_IN_INTERRUPT; + + do + { + level = rt_hw_interrupt_disable(); + write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size); + if (write_nbytes == 0) + { + /* pipe full, waiting on suspended write list */ + rt_thread_suspend(thread); + /* waiting on suspended read list */ + rt_list_insert_before(&(pipe->suspended_write_list), &(thread->tlist)); + rt_hw_interrupt_enable(level); + + rt_schedule(); + } + else + { + if (!rt_list_isempty(&pipe->suspended_read_list)) + { + /* get suspended thread */ + thread = rt_list_entry(pipe->suspended_read_list.next, + struct rt_thread, tlist); + + /* resume the read thread */ + rt_thread_resume(thread); + rt_hw_interrupt_enable(level); + + rt_schedule(); + } + else + { + rt_hw_interrupt_enable(level); + } + break; + } + }while (write_nbytes == 0); + + return write_nbytes; +} + +static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args) +{ + return RT_EOK; +} + +rt_err_t rt_pipe_create(const char* name, rt_size_t size) +{ + rt_err_t result = RT_EOK; + rt_uint8_t* rb_memptr = RT_NULL; + struct rt_pipe_device *pipe = RT_NULL; + + /* get aligned size */ + size = RT_ALIGN(size, RT_ALIGN_SIZE); + pipe = (struct rt_pipe_device*) rt_calloc (1, sizeof(struct rt_pipe_device)); + if (pipe != RT_NULL) + { + /* create ring buffer of pipe */ + rb_memptr = rt_malloc(size); + if (rb_memptr == RT_NULL) + { + result = -RT_ENOMEM; + goto __exit; + } + /* initialize suspended list */ + rt_list_init(&pipe->suspended_read_list); + rt_list_init(&pipe->suspended_write_list); + + /* initialize ring buffer */ + rt_ringbuffer_init(&pipe->ringbuffer, rb_memptr, size); + + /* create device */ + pipe->parent.type = RT_Device_Class_Char; + pipe->parent.init = RT_NULL; + pipe->parent.open = RT_NULL; + pipe->parent.close = RT_NULL; + pipe->parent.read = rt_pipe_read; + pipe->parent.write = rt_pipe_write; + pipe->parent.control = rt_pipe_control; + + return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR); + } + else + { + result = -RT_ENOMEM; + } + +__exit: + if (pipe != RT_NULL) rt_free(pipe); + if (rb_memptr != RT_NULL) rt_free(rb_memptr); + + return result; +} +RTM_EXPORT(rt_pipe_create); + +void rt_pipe_destroy(struct rt_pipe_device* pipe) +{ + if (pipe == RT_NULL) return; + + /* un-register pipe device */ + rt_device_unregister(&(pipe->parent)); + + /* release memory */ + rt_free(pipe->ringbuffer.buffer_ptr); + rt_free(pipe); + + return; +} +RTM_EXPORT(rt_pipe_destroy); + diff --git a/components/drivers/src/ringbuffer.c b/components/drivers/src/ringbuffer.c index 80606469a..cc6484aed 100644 --- a/components/drivers/src/ringbuffer.c +++ b/components/drivers/src/ringbuffer.c @@ -1,3 +1,17 @@ +/* + * File : ringbuffer.c + * This file is part of RT-Thread RTOS + * COPYRIGHT (C) 2012, RT-Thread Development Team + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rt-thread.org/license/LICENSE + * + * Change Logs: + * Date Author Notes + * 2012-09-30 Bernard first version. + */ + #include #include #include @@ -13,71 +27,64 @@ void rt_ringbuffer_init(struct rt_ringbuffer* rb, rt_uint8_t *pool, rt_uint16_t rb->buffer_ptr = pool; rb->buffer_size = RT_ALIGN_DOWN(size, RT_ALIGN_SIZE); } +RTM_EXPORT(rt_ringbuffer_init); rt_size_t rt_ringbuffer_put(struct rt_ringbuffer* rb, const rt_uint8_t *ptr, rt_uint16_t length) { rt_uint16_t size; rt_uint16_t mask; - + rt_uint16_t write_position; + RT_ASSERT(rb != RT_NULL); mask = rb->buffer_size - 1; /* whether has enough space */ - size = rb->buffer_size - ((rb->write_index - rb->read_index) & mask); + size = rb->buffer_size - (rb->write_index - rb->read_index); /* no space */ if (size == 0) return 0; /* drop some data */ if (size < length) length = size; - if (rb->read_index > rb->write_index) + write_position = (rb->write_index & mask); + if (rb->buffer_size - write_position> length) { /* read_index - write_index = empty space */ - memcpy(&rb->buffer_ptr[rb->write_index], ptr, length); - rb->write_index += length; + memcpy(&rb->buffer_ptr[write_position], ptr, length); } else { - if (rb->buffer_size - rb->write_index >= length) - { - /* there is enough space after write_index */ - memcpy(&rb->buffer_ptr[rb->write_index], ptr, length); - rb->write_index = (rb->write_index + length) & mask; - } - else - { - memcpy(&rb->buffer_ptr[rb->write_index], ptr, - rb->buffer_size - rb->write_index); - memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - rb->write_index], - length - (rb->buffer_size - rb->write_index)); - rb->write_index = length - (rb->buffer_size - rb->write_index); - } + memcpy(&rb->buffer_ptr[write_position], ptr, rb->buffer_size - write_position); + memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - write_position], + length - (rb->buffer_size - write_position)); } + rb->write_index += length; return length; } +RTM_EXPORT(rt_ringbuffer_put); /** * put a character into ring buffer */ rt_size_t rt_ringbuffer_putchar(struct rt_ringbuffer* rb, const rt_uint8_t ch) { - rt_uint16_t next; rt_uint16_t mask; RT_ASSERT(rb != RT_NULL); /* whether has enough space */ mask = rb->buffer_size - 1; - next = (rb->write_index + 1) & mask; - - if (next == rb->read_index) return 0; + + /* whether has enough space */ + if (rb->write_index - rb->read_index == rb->buffer_size) return 0; /* put character */ - rb->buffer_ptr[rb->write_index] = ch; - rb->write_index = next; + rb->buffer_ptr[rb->write_index & mask] = ch; + rb->write_index += 1; return 1; } +RTM_EXPORT(rt_ringbuffer_putchar); /** * get data from ring buffer @@ -86,47 +93,42 @@ rt_size_t rt_ringbuffer_get(struct rt_ringbuffer* rb, rt_uint8_t *ptr, rt_uint16 { rt_size_t size; rt_uint16_t mask; + rt_uint16_t read_position; RT_ASSERT(rb != RT_NULL); /* whether has enough data */ mask = rb->buffer_size - 1; - size = (rb->write_index - rb->read_index) & mask; + size = rb->write_index - rb->read_index; /* no data */ if (size == 0) return 0; /* less data */ if (size < length) length = size; - if (rb->read_index > rb->write_index) + read_position = rb->read_index & mask; + if (rb->buffer_size - read_position >= length) { - if (rb->buffer_size - rb->read_index >= length) - { - /* copy directly */ - memcpy(ptr, &rb->buffer_ptr[rb->read_index], length); - rb->read_index = (rb->read_index + length) & mask; - } - else - { - /* copy first and second */ - memcpy(ptr, &rb->buffer_ptr[rb->read_index], - rb->buffer_size - rb->read_index); - memcpy(&ptr[rb->buffer_size - rb->read_index], &rb->buffer_ptr[0], - length - rb->buffer_size + rb->read_index); - rb->read_index = length - rb->buffer_size + rb->read_index; - } + /* copy all of data */ + memcpy(ptr, &rb->buffer_ptr[read_position], length); } else { - memcpy(ptr, &rb->buffer_ptr[rb->read_index], length); - rb->read_index += length; + /* copy first and second */ + memcpy(ptr, &rb->buffer_ptr[read_position], rb->buffer_size - read_position); + memcpy(&ptr[rb->buffer_size - read_position], &rb->buffer_ptr[0], + length - rb->buffer_size + read_position); } + rb->read_index += length; return length; } +RTM_EXPORT(rt_ringbuffer_get); +/** + * get a character from a ringbuffer + */ rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch) { - rt_uint16_t next; rt_uint16_t mask; RT_ASSERT(rb != RT_NULL); @@ -134,40 +136,25 @@ rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch) /* ringbuffer is empty */ if (rb->read_index == rb->write_index) return 0; - /* whether has data */ mask = rb->buffer_size - 1; - next = (rb->read_index + 1) & mask; /* put character */ - *ch = rb->buffer_ptr[rb->read_index]; - rb->read_index = next; + *ch = rb->buffer_ptr[rb->read_index & mask]; + rb->read_index += 1; return 1; } +RTM_EXPORT(rt_ringbuffer_getchar); /** * get available data size */ -rt_size_t rt_ringbuffer_available_size(struct rt_ringbuffer* rb) +rt_size_t rt_ringbuffer_get_datasize(struct rt_ringbuffer* rb) { - rt_size_t size; - rt_uint16_t mask; - RT_ASSERT(rb != RT_NULL); - mask = rb->buffer_size - 1; - size = (rb->write_index - rb->read_index) & mask; - /* return the available size */ - return size; + return rb->write_index - rb->read_index; } +RTM_EXPORT(rt_data_queue_reset); -/** - * get empty space size - */ -rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb) -{ - RT_ASSERT(rb != RT_NULL); - - return rb->buffer_size - rt_ringbuffer_available_size(rb); -}