Merge pull request #3078 from enkiller/pr

[kernel][ipc] add send wait support for message queue
This commit is contained in:
Bernard Xiong 2019-09-20 06:26:27 +08:00 committed by GitHub
commit 71346926c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 5 deletions

View File

@ -742,6 +742,8 @@ struct rt_messagequeue
void *msg_queue_head; /**< list head */ void *msg_queue_head; /**< list head */
void *msg_queue_tail; /**< list tail */ void *msg_queue_tail; /**< list tail */
void *msg_queue_free; /**< pointer indicated the free node of queue */ void *msg_queue_free; /**< pointer indicated the free node of queue */
rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue */
}; };
typedef struct rt_messagequeue *rt_mq_t; typedef struct rt_messagequeue *rt_mq_t;
#endif #endif

View File

@ -379,6 +379,10 @@ rt_mq_t rt_mq_create(const char *name,
rt_err_t rt_mq_delete(rt_mq_t mq); rt_err_t rt_mq_delete(rt_mq_t mq);
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size); rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t timeout);
rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size); rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_recv(rt_mq_t mq, rt_err_t rt_mq_recv(rt_mq_t mq,
void *buffer, void *buffer,

130
src/ipc.c
View File

@ -33,6 +33,7 @@
* 2011-12-18 Bernard add more parameter checking in message queue * 2011-12-18 Bernard add more parameter checking in message queue
* 2013-09-14 Grissiom add an option check in rt_event_recv * 2013-09-14 Grissiom add an option check in rt_event_recv
* 2018-10-02 Bernard add 64bit support for mailbox * 2018-10-02 Bernard add 64bit support for mailbox
* 2019-09-16 tyx add send wait support for message queue
*/ */
#include <rtthread.h> #include <rtthread.h>
@ -1823,6 +1824,9 @@ rt_err_t rt_mq_init(rt_mq_t mq,
/* the initial entry is zero */ /* the initial entry is zero */
mq->entry = 0; mq->entry = 0;
/* init an additional list of sender suspend thread */
rt_list_init(&(mq->suspend_sender_thread));
return RT_EOK; return RT_EOK;
} }
RTM_EXPORT(rt_mq_init); RTM_EXPORT(rt_mq_init);
@ -1843,6 +1847,8 @@ rt_err_t rt_mq_detach(rt_mq_t mq)
/* resume all suspended thread */ /* resume all suspended thread */
rt_ipc_list_resume_all(&mq->parent.suspend_thread); rt_ipc_list_resume_all(&mq->parent.suspend_thread);
/* also resume all message queue private suspended thread */
rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
/* detach message queue object */ /* detach message queue object */
rt_object_detach(&(mq->parent.parent)); rt_object_detach(&(mq->parent.parent));
@ -1916,6 +1922,9 @@ rt_mq_t rt_mq_create(const char *name,
/* the initial entry is zero */ /* the initial entry is zero */
mq->entry = 0; mq->entry = 0;
/* init an additional list of sender suspend thread */
rt_list_init(&(mq->suspend_sender_thread));
return mq; return mq;
} }
RTM_EXPORT(rt_mq_create); RTM_EXPORT(rt_mq_create);
@ -1938,6 +1947,8 @@ rt_err_t rt_mq_delete(rt_mq_t mq)
/* resume all suspended thread */ /* resume all suspended thread */
rt_ipc_list_resume_all(&(mq->parent.suspend_thread)); rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
/* also resume all message queue private suspended thread */
rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
/* free message queue pool */ /* free message queue pool */
RT_KERNEL_FREE(mq->msg_pool); RT_KERNEL_FREE(mq->msg_pool);
@ -1951,19 +1962,25 @@ RTM_EXPORT(rt_mq_delete);
#endif #endif
/** /**
* This function will send a message to message queue object, if there are * This function will send a message to message queue object. If the message queue is full,
* threads suspended on message queue object, it will be waked up. * current thread will be suspended until timeout.
* *
* @param mq the message queue object * @param mq the message queue object
* @param buffer the message * @param buffer the message
* @param size the size of buffer * @param size the size of buffer
* @param timeout the waiting time
* *
* @return the error code * @return the error code
*/ */
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) rt_err_t rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t timeout)
{ {
register rt_ubase_t temp; register rt_ubase_t temp;
struct rt_mq_message *msg; struct rt_mq_message *msg;
rt_uint32_t tick_delta;
struct rt_thread *thread;
/* parameter check */ /* parameter check */
RT_ASSERT(mq != RT_NULL); RT_ASSERT(mq != RT_NULL);
@ -1975,6 +1992,11 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
if (size > mq->msg_size) if (size > mq->msg_size)
return -RT_ERROR; return -RT_ERROR;
/* initialize delta tick */
tick_delta = 0;
/* get current thread */
thread = rt_thread_self();
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
/* disable interrupt */ /* disable interrupt */
@ -1982,14 +2004,78 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
/* get a free list, there must be an empty item */ /* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free; msg = (struct rt_mq_message *)mq->msg_queue_free;
/* message queue is full */ /* for non-blocking call */
if (msg == RT_NULL) if (msg == RT_NULL && timeout == 0)
{ {
/* enable interrupt */ /* enable interrupt */
rt_hw_interrupt_enable(temp); rt_hw_interrupt_enable(temp);
return -RT_EFULL; return -RT_EFULL;
} }
/* message queue is full */
while ((msg = mq->msg_queue_free) == RT_NULL)
{
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
RT_DEBUG_IN_THREAD_CONTEXT;
/* suspend current thread */
rt_ipc_list_suspend(&(mq->suspend_sender_thread),
thread,
mq->parent.parent.flag);
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
/* resume from suspend state */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
}
/* move free list pointer */ /* move free list pointer */
mq->msg_queue_free = msg->next; mq->msg_queue_free = msg->next;
@ -2037,6 +2123,22 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
return RT_EOK; return RT_EOK;
} }
RTM_EXPORT(rt_mq_send_wait)
/**
* This function will send a message to message queue object, if there are
* threads suspended on message queue object, it will be waked up.
*
* @param mq the message queue object
* @param buffer the message
* @param size the size of buffer
*
* @return the error code
*/
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
{
return rt_mq_send_wait(mq, buffer, size, 0);
}
RTM_EXPORT(rt_mq_send); RTM_EXPORT(rt_mq_send);
/** /**
@ -2257,6 +2359,22 @@ rt_err_t rt_mq_recv(rt_mq_t mq,
/* put message to free list */ /* put message to free list */
msg->next = (struct rt_mq_message *)mq->msg_queue_free; msg->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = msg; mq->msg_queue_free = msg;
/* resume suspended thread */
if (!rt_list_isempty(&(mq->suspend_sender_thread)))
{
rt_ipc_list_resume(&(mq->suspend_sender_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
rt_schedule();
return RT_EOK;
}
/* enable interrupt */ /* enable interrupt */
rt_hw_interrupt_enable(temp); rt_hw_interrupt_enable(temp);
@ -2292,6 +2410,8 @@ rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg)
/* resume all waiting thread */ /* resume all waiting thread */
rt_ipc_list_resume_all(&mq->parent.suspend_thread); rt_ipc_list_resume_all(&mq->parent.suspend_thread);
/* also resume all message queue private suspended thread */
rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
/* release all message in the queue */ /* release all message in the queue */
while (mq->msg_queue_head != RT_NULL) while (mq->msg_queue_head != RT_NULL)