diff --git a/components/libc/posix/ipc/Kconfig b/components/libc/posix/ipc/Kconfig index a1e4fc1632..cf3135e53a 100644 --- a/components/libc/posix/ipc/Kconfig +++ b/components/libc/posix/ipc/Kconfig @@ -24,6 +24,7 @@ config RT_USING_POSIX_PIPE_SIZE config RT_USING_POSIX_MESSAGE_QUEUE bool "Enable posix message queue " select RT_USING_POSIX_CLOCK + select RT_USING_MESSAGEQUEUE_PRIORITY default n config RT_USING_POSIX_MESSAGE_SEMAPHORE diff --git a/components/libc/posix/ipc/mqueue.c b/components/libc/posix/ipc/mqueue.c index 22d2854770..49ce6bf98d 100644 --- a/components/libc/posix/ipc/mqueue.c +++ b/components/libc/posix/ipc/mqueue.c @@ -232,7 +232,7 @@ ssize_t mq_receive(mqd_t id, char *msg_ptr, size_t msg_len, unsigned *msg_prio) return -1; } - result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER); + result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE); if (result >= 0) return rt_strlen(msg_ptr); @@ -255,7 +255,7 @@ int mq_send(mqd_t id, const char *msg_ptr, size_t msg_len, unsigned msg_prio) return -1; } - result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len); + result = rt_mq_send_wait_prio(mqdes->mq, (void *)msg_ptr, msg_len, msg_prio, 0, RT_UNINTERRUPTIBLE); if (result == RT_EOK) return 0; @@ -287,7 +287,8 @@ ssize_t mq_timedreceive(mqd_t id, if (abs_timeout != RT_NULL) tick = rt_timespec_to_tick(abs_timeout); - result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick); + result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, tick, RT_UNINTERRUPTIBLE); + if (result >= 0) return rt_strlen(msg_ptr); diff --git a/examples/utest/testcases/kernel/messagequeue_tc.c b/examples/utest/testcases/kernel/messagequeue_tc.c index 08d4049875..6d44c0042b 100644 --- a/examples/utest/testcases/kernel/messagequeue_tc.c +++ b/examples/utest/testcases/kernel/messagequeue_tc.c @@ -82,6 +82,25 @@ static void mq_send_case(rt_mq_t testmq) rt_thread_delay(100); } +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + ret = rt_mq_send_wait_prio(testmq, &send_buf[3], sizeof(send_buf[0]), 3, 0, RT_UNINTERRUPTIBLE); + uassert_true(ret == RT_EOK); + ret = rt_mq_send_wait_prio(testmq, &send_buf[0], sizeof(send_buf[0]), 0, 0, RT_UNINTERRUPTIBLE); + uassert_true(ret == RT_EOK); + + ret = rt_mq_send_wait_prio(testmq, &send_buf[2], sizeof(send_buf[0]), 1, 0, RT_UNINTERRUPTIBLE); + uassert_true(ret == RT_EOK); + ret = rt_mq_send_wait_prio(testmq, &send_buf[4], sizeof(send_buf[0]), 4, 0, RT_UNINTERRUPTIBLE); + uassert_true(ret == RT_EOK); + ret = rt_mq_send_wait_prio(testmq, &send_buf[1], sizeof(send_buf[0]), 1, 0, RT_UNINTERRUPTIBLE); + uassert_true(ret == RT_EOK); + + while (testmq->entry != 0) + { + rt_thread_delay(100); + } +#endif + ret = rt_mq_send(testmq, &send_buf[1], sizeof(send_buf[0])); uassert_true(ret == RT_EOK); ret = rt_mq_control(testmq, RT_IPC_CMD_RESET, RT_NULL); @@ -121,6 +140,20 @@ static void mq_recv_case(rt_mq_t testmq) uassert_true(ret >= 0); uassert_true(recv_buf[var] == (var + 1)); } +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + rt_int32_t msg_prio; + while (testmq->entry == MAX_MSGS) + { + rt_thread_delay(100); + } + for (int var = 0; var < MAX_MSGS; ++var) + { + ret = rt_mq_recv_prio(testmq, &recv_buf[var], sizeof(recv_buf[0]), &msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE); + rt_kprintf("msg_prio = %d\r\n", msg_prio); + uassert_true(ret >= 0); + uassert_true(recv_buf[var] == (MAX_MSGS - var)); + } +#endif } static void mq_recv_entry(void *param) diff --git a/include/rtthread.h b/include/rtthread.h index dfcd1c413c..ea3ee6f81a 100644 --- a/include/rtthread.h +++ b/include/rtthread.h @@ -463,6 +463,9 @@ struct rt_mq_message { struct rt_mq_message *next; rt_ssize_t length; +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + rt_int32_t prio; +#endif }; #define RT_MQ_BUF_SIZE(msg_size, max_msgs) \ @@ -515,6 +518,21 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout); rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg); + +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY +rt_err_t rt_mq_send_wait_prio(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag); +rt_err_t rt_mq_recv_prio(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag); +#endif #endif /* defunct */ diff --git a/src/Kconfig b/src/Kconfig index 5cabfdbd72..f88f806f74 100644 --- a/src/Kconfig +++ b/src/Kconfig @@ -317,6 +317,12 @@ menu "Inter-Thread communication" bool "Enable message queue" default y + if RT_USING_MESSAGEQUEUE + config RT_USING_MESSAGEQUEUE_PRIORITY + bool "Enable message queue priority" + default n + endif + config RT_USING_SIGNALS bool "Enable signals" select RT_USING_MEMPOOL diff --git a/src/ipc.c b/src/ipc.c index 3a5a0efa61..806400fa12 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -3144,7 +3144,6 @@ rt_err_t rt_mq_delete(rt_mq_t mq) RTM_EXPORT(rt_mq_delete); #endif /* RT_USING_HEAP */ - /** * @brief This function will send a message to the messagequeue object. If * there is a thread suspended on the messagequeue, the thread will be @@ -3154,10 +3153,10 @@ RTM_EXPORT(rt_mq_delete); * fully used, the current thread will wait for a timeout. If reaching * the timeout and there is still no space available, the sending * thread will be resumed and an error code will be returned. By - * contrast, the rt_mq_send() function will return an error code + * contrast, the _rt_mq_send_wait() function will return an error code * immediately without waiting when the messagequeue if fully used. * - * @see rt_mq_send() + * @see _rt_mq_send_wait() * * @param mq is a pointer to the messagequeue object to be sent. * @@ -3165,8 +3164,12 @@ RTM_EXPORT(rt_mq_delete); * * @param size is the length of the message(Unit: Byte). * + * @param prio is message priority, A larger value indicates a higher priority + * * @param timeout is a timeout period (unit: an OS tick). * + * @param suspend_flag status flag of the thread to be suspended. + * * @return Return the operation status. When the return value is RT_EOK, the * operation is successful. If the return value is any other values, * it means that the messagequeue detach failed. @@ -3174,11 +3177,12 @@ RTM_EXPORT(rt_mq_delete); * @warning This function can be called in interrupt context and thread * context. */ -static rt_err_t _rt_mq_send_wait(rt_mq_t mq, - const void *buffer, - rt_size_t size, - rt_int32_t timeout, - int suspend_flag) +static rt_err_t _rt_mq_send_wait(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag) { rt_base_t level; struct rt_mq_message *msg; @@ -3304,6 +3308,33 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq, /* disable interrupt */ level = rt_hw_interrupt_disable(); +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + msg->prio = prio; + if (mq->msg_queue_head == RT_NULL) + mq->msg_queue_head = msg; + + struct rt_mq_message *node, *prev_node = RT_NULL; + for (node = mq->msg_queue_head; node != RT_NULL; node = node->next) + { + if (node->prio < msg->prio) + { + if (prev_node == RT_NULL) + mq->msg_queue_head = msg; + else + prev_node->next = msg; + msg->next = node; + break; + } + if (node->next == RT_NULL) + { + if (node != msg) + node->next = msg; + mq->msg_queue_tail = msg; + break; + } + prev_node = node; + } +#else /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { @@ -3316,6 +3347,7 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq, /* if the head is empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; +#endif if(mq->entry < RT_MQ_ENTRY_MAX) { @@ -3352,7 +3384,7 @@ rt_err_t rt_mq_send_wait(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE); } RTM_EXPORT(rt_mq_send_wait); @@ -3361,7 +3393,7 @@ rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE); } RTM_EXPORT(rt_mq_send_wait_interruptible); @@ -3370,7 +3402,7 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_KILLABLE); } RTM_EXPORT(rt_mq_send_wait_killable); /** @@ -3513,7 +3545,6 @@ rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size) } RTM_EXPORT(rt_mq_urgent); - /** * @brief This function will receive a message from message queue object, * if there is no message in messagequeue object, the thread shall wait for a specified time. @@ -3526,11 +3557,15 @@ RTM_EXPORT(rt_mq_urgent); * * @param buffer is the content of the message. * + * @param prio is message priority, A larger value indicates a higher priority + * * @param size is the length of the message(Unit: Byte). * * @param timeout is a timeout period (unit: an OS tick). If the message is unavailable, the thread will wait for * the message in the queue up to the amount of time specified by this parameter. * + * @param suspend_flag status flag of the thread to be suspended. + * * NOTE: * If use Macro RT_WAITING_FOREVER to set this parameter, which means that when the * message is unavailable in the queue, the thread will be waiting forever. @@ -3540,11 +3575,12 @@ RTM_EXPORT(rt_mq_urgent); * @return Return the real length of the message. When the return value is larger than zero, the operation is successful. * If the return value is any other values, it means that the mailbox release failed. */ -static rt_ssize_t _rt_mq_recv(rt_mq_t mq, - void *buffer, - rt_size_t size, - rt_int32_t timeout, - int suspend_flag) +static rt_ssize_t _rt_mq_recv(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag) { struct rt_thread *thread; rt_base_t level; @@ -3675,6 +3711,10 @@ static rt_ssize_t _rt_mq_recv(rt_mq_t mq, /* copy message */ rt_memcpy(buffer, GET_MESSAGEBYTE_ADDR(msg), len); +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + if (prio != RT_NULL) + *prio = msg->prio; +#endif /* disable interrupt */ level = rt_hw_interrupt_disable(); /* put message to free list */ @@ -3709,7 +3749,7 @@ rt_ssize_t rt_mq_recv(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE); } RTM_EXPORT(rt_mq_recv); @@ -3718,7 +3758,7 @@ rt_ssize_t rt_mq_recv_interruptible(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_INTERRUPTIBLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE); } RTM_EXPORT(rt_mq_recv_interruptible); @@ -3727,8 +3767,28 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_KILLABLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_KILLABLE); } +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY +rt_err_t rt_mq_send_wait_prio(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag) +{ + return _rt_mq_send_wait(mq, buffer, size, prio, timeout, suspend_flag); +} +rt_err_t rt_mq_recv_prio(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag) +{ + return _rt_mq_recv(mq, buffer, size, prio, timeout, suspend_flag); +} +#endif RTM_EXPORT(rt_mq_recv_killable); /** * @brief This function will set some extra attributions of a messagequeue object.