diff --git a/include/rtdef.h b/include/rtdef.h index 7c05178e3c..b25a6a87d5 100644 --- a/include/rtdef.h +++ b/include/rtdef.h @@ -742,6 +742,8 @@ struct rt_messagequeue void *msg_queue_head; /**< list head */ void *msg_queue_tail; /**< list tail */ void *msg_queue_free; /**< pointer indicated the free node of queue */ + + rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue */ }; typedef struct rt_messagequeue *rt_mq_t; #endif diff --git a/include/rtthread.h b/include/rtthread.h index 04db77da46..d703f36268 100644 --- a/include/rtthread.h +++ b/include/rtthread.h @@ -379,6 +379,10 @@ rt_mq_t rt_mq_create(const char *name, rt_err_t rt_mq_delete(rt_mq_t mq); rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size); +rt_err_t rt_mq_send_wait(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t timeout); rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size); rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, diff --git a/src/ipc.c b/src/ipc.c index c6a05e96a4..16d714b3f4 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -33,6 +33,7 @@ * 2011-12-18 Bernard add more parameter checking in message queue * 2013-09-14 Grissiom add an option check in rt_event_recv * 2018-10-02 Bernard add 64bit support for mailbox + * 2019-09-16 tyx add send wait support for message queue */ #include @@ -1823,6 +1824,9 @@ rt_err_t rt_mq_init(rt_mq_t mq, /* the initial entry is zero */ mq->entry = 0; + /* init an additional list of sender suspend thread */ + rt_list_init(&(mq->suspend_sender_thread)); + return RT_EOK; } RTM_EXPORT(rt_mq_init); @@ -1843,6 +1847,8 @@ rt_err_t rt_mq_detach(rt_mq_t mq) /* resume all suspended thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); + /* also resume all message queue private suspended thread */ + rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* detach message queue object */ rt_object_detach(&(mq->parent.parent)); @@ -1916,6 +1922,9 @@ rt_mq_t rt_mq_create(const char *name, /* the initial entry is zero */ mq->entry = 0; + /* init an additional list of sender suspend thread */ + rt_list_init(&(mq->suspend_sender_thread)); + return mq; } RTM_EXPORT(rt_mq_create); @@ -1938,6 +1947,8 @@ rt_err_t rt_mq_delete(rt_mq_t mq) /* resume all suspended thread */ rt_ipc_list_resume_all(&(mq->parent.suspend_thread)); + /* also resume all message queue private suspended thread */ + rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* free message queue pool */ RT_KERNEL_FREE(mq->msg_pool); @@ -1951,19 +1962,25 @@ RTM_EXPORT(rt_mq_delete); #endif /** - * This function will send a message to message queue object, if there are - * threads suspended on message queue object, it will be waked up. + * This function will send a message to message queue object. If the message queue is full, + * current thread will be suspended until timeout. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer + * @param timeout the waiting time * * @return the error code */ -rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) +rt_err_t rt_mq_send_wait(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t timeout) { register rt_ubase_t temp; struct rt_mq_message *msg; + rt_uint32_t tick_delta; + struct rt_thread *thread; /* parameter check */ RT_ASSERT(mq != RT_NULL); @@ -1975,6 +1992,11 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) if (size > mq->msg_size) return -RT_ERROR; + /* initialize delta tick */ + tick_delta = 0; + /* get current thread */ + thread = rt_thread_self(); + RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ @@ -1982,14 +2004,78 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; - /* message queue is full */ - if (msg == RT_NULL) + /* for non-blocking call */ + if (msg == RT_NULL && timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } + + /* message queue is full */ + while ((msg = mq->msg_queue_free) == RT_NULL) + { + /* reset error number in thread */ + thread->error = RT_EOK; + + /* no waiting, return timeout */ + if (timeout == 0) + { + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + + return -RT_EFULL; + } + + RT_DEBUG_IN_THREAD_CONTEXT; + /* suspend current thread */ + rt_ipc_list_suspend(&(mq->suspend_sender_thread), + thread, + mq->parent.parent.flag); + + /* has waiting time, start thread timer */ + if (timeout > 0) + { + /* get the start tick of timer */ + tick_delta = rt_tick_get(); + + RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n", + thread->name)); + + /* reset the timeout of thread timer and start it */ + rt_timer_control(&(thread->thread_timer), + RT_TIMER_CTRL_SET_TIME, + &timeout); + rt_timer_start(&(thread->thread_timer)); + } + + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + + /* re-schedule */ + rt_schedule(); + + /* resume from suspend state */ + if (thread->error != RT_EOK) + { + /* return error */ + return thread->error; + } + + /* disable interrupt */ + temp = rt_hw_interrupt_disable(); + + /* if it's not waiting forever and then re-calculate timeout tick */ + if (timeout > 0) + { + tick_delta = rt_tick_get() - tick_delta; + timeout -= tick_delta; + if (timeout < 0) + timeout = 0; + } + } + /* move free list pointer */ mq->msg_queue_free = msg->next; @@ -2037,6 +2123,22 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) return RT_EOK; } +RTM_EXPORT(rt_mq_send_wait) + +/** + * This function will send a message to message queue object, if there are + * threads suspended on message queue object, it will be waked up. + * + * @param mq the message queue object + * @param buffer the message + * @param size the size of buffer + * + * @return the error code + */ +rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) +{ + return rt_mq_send_wait(mq, buffer, size, 0); +} RTM_EXPORT(rt_mq_send); /** @@ -2257,6 +2359,22 @@ rt_err_t rt_mq_recv(rt_mq_t mq, /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; + + /* resume suspended thread */ + if (!rt_list_isempty(&(mq->suspend_sender_thread))) + { + rt_ipc_list_resume(&(mq->suspend_sender_thread)); + + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + + RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); + + rt_schedule(); + + return RT_EOK; + } + /* enable interrupt */ rt_hw_interrupt_enable(temp); @@ -2292,6 +2410,8 @@ rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg) /* resume all waiting thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); + /* also resume all message queue private suspended thread */ + rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* release all message in the queue */ while (mq->msg_queue_head != RT_NULL)