[Kernel] message 可以返回消息的实际大小 (#7709)

This commit is contained in:
zhkag 2023-06-21 10:22:42 +00:00 committed by GitHub
parent b4e59bac4e
commit 33f550cb65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 73 additions and 56 deletions

View File

@ -150,7 +150,7 @@ OS_Status OS_QueueReceive(OS_Queue_t *queue, void *item, OS_Time_t waitMS)
OS_HANDLE_ASSERT(OS_QueueIsValid(queue), queue->handle); OS_HANDLE_ASSERT(OS_QueueIsValid(queue), queue->handle);
ret = rt_mq_recv(queue->handle, item, queue->itemSize, OS_CalcWaitTicks(waitMS)); ret = rt_mq_recv(queue->handle, item, queue->itemSize, OS_CalcWaitTicks(waitMS));
if (ret != RT_EOK) { if (ret < 0) {
OS_DBG("%s() fail @ %d, %"OS_TIME_F" ms\n", __func__, __LINE__, (unsigned int)waitMS); OS_DBG("%s() fail @ %d, %"OS_TIME_F" ms\n", __func__, __LINE__, (unsigned int)waitMS);
return OS_FAIL; return OS_FAIL;
} }

View File

@ -182,7 +182,7 @@ int hal_queue_recv(hal_queue_t queue, void *buffer, int timeout)
} }
ret = rt_mq_recv(queue, buffer, queue->msg_size, timeout); ret = rt_mq_recv(queue, buffer, queue->msg_size, timeout);
if (ret != RT_EOK) if (ret < 0)
{ {
return -2; return -2;
} }

View File

@ -74,7 +74,7 @@ void rt_init_thread_entry(void *parameter)
while(1) while(1)
{ {
if (rt_mq_recv(&mq, &msg, sizeof(msg), RT_WAITING_FOREVER) == RT_EOK) if (rt_mq_recv(&mq, &msg, sizeof(msg), RT_WAITING_FOREVER) >= 0)
{ {
switch(msg.type) switch(msg.type)
{ {

View File

@ -788,7 +788,7 @@ osa_status_t OSA_MsgQGet(osa_msgq_handle_t msgqHandle, osa_msg_handle_t pMessage
{ {
timeoutTicks = rt_tick_from_millisecond(millisec); timeoutTicks = rt_tick_from_millisecond(millisec);
} }
if (RT_EOK != rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks)) if (rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks) < 0)
{ {
osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */ osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */
} }

View File

@ -788,7 +788,7 @@ osa_status_t OSA_MsgQGet(osa_msgq_handle_t msgqHandle, osa_msg_handle_t pMessage
{ {
timeoutTicks = rt_tick_from_millisecond(millisec); timeoutTicks = rt_tick_from_millisecond(millisec);
} }
if (RT_EOK != rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks)) if (rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks) < 0)
{ {
osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */ osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */
} }

View File

@ -788,7 +788,7 @@ osa_status_t OSA_MsgQGet(osa_msgq_handle_t msgqHandle, osa_msg_handle_t pMessage
{ {
timeoutTicks = rt_tick_from_millisecond(millisec); timeoutTicks = rt_tick_from_millisecond(millisec);
} }
if (RT_EOK != rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks)) if (rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks) < 0)
{ {
osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */ osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */
} }

View File

@ -788,7 +788,7 @@ osa_status_t OSA_MsgQGet(osa_msgq_handle_t msgqHandle, osa_msg_handle_t pMessage
{ {
timeoutTicks = rt_tick_from_millisecond(millisec); timeoutTicks = rt_tick_from_millisecond(millisec);
} }
if (RT_EOK != rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks)) if (rt_mq_recv(handler, pMessage, handler->msg_size, timeoutTicks) < 0)
{ {
osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */ osaStatus = KOSA_StatusTimeout; /* not able to send it to the queue? */
} }

View File

@ -50,7 +50,7 @@ static void serial_thread_entry(void *parameter)
{ {
rt_memset(&msg, 0, sizeof(msg)); rt_memset(&msg, 0, sizeof(msg));
result = rt_mq_recv(&rx_mq, &msg, sizeof(msg), RT_WAITING_FOREVER); result = rt_mq_recv(&rx_mq, &msg, sizeof(msg), RT_WAITING_FOREVER);
if (result == RT_EOK) if (result >= 0)
{ {
rx_length = rt_device_read(msg.dev, 0, rx_buffer, msg.size); rx_length = rt_device_read(msg.dev, 0, rx_buffer, msg.size);
rx_buffer[rx_length] = '\0'; rx_buffer[rx_length] = '\0';

View File

@ -627,7 +627,7 @@ static void hid_thread_entry(void* parameter)
hiddev = (struct hid_s *)parameter; hiddev = (struct hid_s *)parameter;
while(1) while(1)
{ {
if(rt_mq_recv(&hiddev->hid_mq, &report, sizeof(report),RT_WAITING_FOREVER) != RT_EOK ) if(rt_mq_recv(&hiddev->hid_mq, &report, sizeof(report),RT_WAITING_FOREVER) < 0)
continue; continue;
HID_Report_Received(&report); HID_Report_Received(&report);
} }

View File

@ -2160,7 +2160,7 @@ static void rt_usbd_thread_entry(void* parameter)
/* receive message */ /* receive message */
if(rt_mq_recv(&usb_mq, &msg, sizeof(struct udev_msg), if(rt_mq_recv(&usb_mq, &msg, sizeof(struct udev_msg),
RT_WAITING_FOREVER) != RT_EOK ) RT_WAITING_FOREVER) < 0)
continue; continue;
device = rt_usbd_find_device(msg.dcd); device = rt_usbd_find_device(msg.dcd);

View File

@ -659,8 +659,8 @@ static void rt_usbh_hub_thread_entry(void* parameter)
struct uhost_msg msg; struct uhost_msg msg;
/* receive message */ /* receive message */
if(rt_mq_recv(hcd->usb_mq, &msg, sizeof(struct uhost_msg), RT_WAITING_FOREVER) if (rt_mq_recv(hcd->usb_mq, &msg, sizeof(struct uhost_msg), RT_WAITING_FOREVER) < 0)
!= RT_EOK ) continue; continue;
//RT_DEBUG_LOG(RT_DEBUG_USB, ("msg type %d\n", msg.type)); //RT_DEBUG_LOG(RT_DEBUG_USB, ("msg type %d\n", msg.type));

View File

@ -62,7 +62,7 @@ public:
else else
tick = rt_tick_from_millisecond(millisec); tick = rt_tick_from_millisecond(millisec);
return rt_mq_recv(&mID, &data, sizeof(data), tick) == RT_EOK; return rt_mq_recv(&mID, &data, sizeof(data), tick) >= 0;
} }
private: private:

View File

@ -233,7 +233,7 @@ ssize_t mq_receive(mqd_t id, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
} }
result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER); result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
if (result == RT_EOK) if (result >= 0)
return rt_strlen(msg_ptr); return rt_strlen(msg_ptr);
rt_set_errno(EBADF); rt_set_errno(EBADF);
@ -288,7 +288,7 @@ ssize_t mq_timedreceive(mqd_t id,
tick = rt_timespec_to_tick(abs_timeout); tick = rt_timespec_to_tick(abs_timeout);
result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick); result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
if (result == RT_EOK) if (result >= 0)
return rt_strlen(msg_ptr); return rt_strlen(msg_ptr);
if (result == -RT_ETIMEOUT) if (result == -RT_ETIMEOUT)

View File

@ -575,7 +575,7 @@ static void serial_thread_entry(void *parameter)
rt_memset(&msg, 0, sizeof(msg)); rt_memset(&msg, 0, sizeof(msg));
/* Read messages from the message queue*/ /* Read messages from the message queue*/
result = rt_mq_recv(&rx_mq, &msg, sizeof(msg), RT_WAITING_FOREVER); result = rt_mq_recv(&rx_mq, &msg, sizeof(msg), RT_WAITING_FOREVER);
if (result == RT_EOK) if (result >= 0)
{ {
/*Read data from the serial port*/ /*Read data from the serial port*/
rx_length = rt_device_read(msg.dev, 0, rx_buffer, msg.size); rx_length = rt_device_read(msg.dev, 0, rx_buffer, msg.size);

View File

@ -595,7 +595,7 @@ static void thread1_entry(void *parameter)
while (1) while (1)
{ {
/* Receive messages from the message queue */ /* Receive messages from the message queue */
if (rt_mq_recv(&mq, &buf, sizeof(buf), RT_WAITING_FOREVER) == RT_EOK) if (rt_mq_recv(&mq, &buf, sizeof(buf), RT_WAITING_FOREVER) >= 0)
{ {
rt_kprintf("thread1: recv msg from msg queue, the content:%c\n", buf); rt_kprintf("thread1: recv msg from msg queue, the content:%c\n", buf);
if (cnt == 19) if (cnt == 19)
@ -784,7 +784,7 @@ void message_handler()
struct msg msg_ptr; /* Local variable used to place the message */ struct msg msg_ptr; /* Local variable used to place the message */
/* Receive messages from the message queue into msg_ptr */ /* Receive messages from the message queue into msg_ptr */
if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg)) == RT_EOK) if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg)) >= 0)
{ {
/* Successfully received the message, corresponding data processing is performed */ /* Successfully received the message, corresponding data processing is performed */
} }

View File

@ -15,7 +15,7 @@
#define MAX_MSGS 5 #define MAX_MSGS 5
static struct rt_messagequeue static_mq; static struct rt_messagequeue static_mq;
static rt_uint8_t mq_buf[(MSG_SIZE + (rt_uint8_t)sizeof(rt_ubase_t)) * MAX_MSGS]; static rt_uint8_t mq_buf[RT_MQ_BUF_SIZE(MSG_SIZE, MAX_MSGS)];
static struct rt_thread mq_send_thread; static struct rt_thread mq_send_thread;
static struct rt_thread mq_recv_thread; static struct rt_thread mq_recv_thread;
@ -111,14 +111,14 @@ static void mq_recv_case(rt_mq_t testmq)
for (int var = 0; var < MAX_MSGS + 1; ++var) for (int var = 0; var < MAX_MSGS + 1; ++var)
{ {
ret = rt_mq_recv(testmq, &recv_buf[var], sizeof(recv_buf[0]), RT_WAITING_FOREVER); ret = rt_mq_recv(testmq, &recv_buf[var], sizeof(recv_buf[0]), RT_WAITING_FOREVER);
uassert_true(ret == RT_EOK); uassert_true(ret >= 0);
uassert_true(recv_buf[var] == (var + 1)); uassert_true(recv_buf[var] == (var + 1));
} }
for (int var = 0; var < 3; ++var) for (int var = 0; var < 3; ++var)
{ {
ret = rt_mq_recv(testmq, &recv_buf[var], sizeof(recv_buf[0]), RT_WAITING_FOREVER); ret = rt_mq_recv(testmq, &recv_buf[var], sizeof(recv_buf[0]), RT_WAITING_FOREVER);
uassert_true(ret == RT_EOK); uassert_true(ret >= 0);
uassert_true(recv_buf[var] == (var + 1)); uassert_true(recv_buf[var] == (var + 1));
} }
} }

View File

@ -458,6 +458,16 @@ rt_err_t rt_mb_control(rt_mailbox_t mb, int cmd, void *arg);
#endif #endif
#ifdef RT_USING_MESSAGEQUEUE #ifdef RT_USING_MESSAGEQUEUE
struct rt_mq_message
{
struct rt_mq_message *next;
rt_ssize_t length;
};
#define RT_MQ_BUF_SIZE(msg_size, max_msgs) \
((RT_ALIGN((msg_size), RT_ALIGN_SIZE) + sizeof(struct rt_mq_message)) * (max_msgs))
/* /*
* message queue interface * message queue interface
*/ */
@ -492,15 +502,15 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t mq,
rt_size_t size, rt_size_t size,
rt_int32_t timeout); rt_int32_t timeout);
rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size); rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_recv(rt_mq_t mq, rt_ssize_t rt_mq_recv(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout); rt_int32_t timeout);
rt_err_t rt_mq_recv_interruptible(rt_mq_t mq, rt_ssize_t rt_mq_recv_interruptible(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout); rt_int32_t timeout);
rt_err_t rt_mq_recv_killable(rt_mq_t mq, rt_ssize_t rt_mq_recv_killable(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout); rt_int32_t timeout);

View File

@ -44,6 +44,7 @@
* 2022-04-08 Stanley Correct descriptions * 2022-04-08 Stanley Correct descriptions
* 2022-10-15 Bernard add nested mutex feature * 2022-10-15 Bernard add nested mutex feature
* 2022-10-16 Bernard add prioceiling feature in mutex * 2022-10-16 Bernard add prioceiling feature in mutex
* 2023-04-16 Xin-zheqi redesigen queue recv and send function return real message size
*/ */
#include <rtthread.h> #include <rtthread.h>
@ -59,6 +60,7 @@
#define __on_rt_object_put_hook(parent) __ON_HOOK_ARGS(rt_object_put_hook, (parent)) #define __on_rt_object_put_hook(parent) __ON_HOOK_ARGS(rt_object_put_hook, (parent))
#endif #endif
#define GET_MESSAGEBYTE_ADDR(msg) ((struct rt_mq_message *) msg + 1)
#if defined(RT_USING_HOOK) && defined(RT_HOOK_USING_FUNC_PTR) #if defined(RT_USING_HOOK) && defined(RT_HOOK_USING_FUNC_PTR)
extern void (*rt_object_trytake_hook)(struct rt_object *object); extern void (*rt_object_trytake_hook)(struct rt_object *object);
extern void (*rt_object_take_hook)(struct rt_object *object); extern void (*rt_object_take_hook)(struct rt_object *object);
@ -2858,12 +2860,6 @@ RTM_EXPORT(rt_mb_control);
* @{ * @{
*/ */
struct rt_mq_message
{
struct rt_mq_message *next;
};
/** /**
* @brief Initialize a static messagequeue object. * @brief Initialize a static messagequeue object.
* *
@ -3300,8 +3296,11 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq,
/* the msg is the new tailer of list, the next shall be NULL */ /* the msg is the new tailer of list, the next shall be NULL */
msg->next = RT_NULL; msg->next = RT_NULL;
/* add the length */
((struct rt_mq_message *)msg)->length = size;
/* copy buffer */ /* copy buffer */
rt_memcpy(msg + 1, buffer, size); rt_memcpy(GET_MESSAGEBYTE_ADDR(msg), buffer, size);
/* disable interrupt */ /* disable interrupt */
level = rt_hw_interrupt_disable(); level = rt_hw_interrupt_disable();
@ -3355,7 +3354,7 @@ rt_err_t rt_mq_send_wait(rt_mq_t mq,
{ {
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE); return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE);
} }
RTM_EXPORT(rt_mq_send_wait) RTM_EXPORT(rt_mq_send_wait);
rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq, rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq,
const void *buffer, const void *buffer,
@ -3364,7 +3363,7 @@ rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq,
{ {
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE); return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE);
} }
RTM_EXPORT(rt_mq_send_wait_interruptible) RTM_EXPORT(rt_mq_send_wait_interruptible);
rt_err_t rt_mq_send_wait_killable(rt_mq_t mq, rt_err_t rt_mq_send_wait_killable(rt_mq_t mq,
const void *buffer, const void *buffer,
@ -3373,7 +3372,7 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t mq,
{ {
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE); return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE);
} }
RTM_EXPORT(rt_mq_send_wait_killable) RTM_EXPORT(rt_mq_send_wait_killable);
/** /**
* @brief This function will send a message to the messagequeue object. * @brief This function will send a message to the messagequeue object.
* If there is a thread suspended on the messagequeue, the thread will be resumed. * If there is a thread suspended on the messagequeue, the thread will be resumed.
@ -3467,8 +3466,10 @@ rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size)
/* enable interrupt */ /* enable interrupt */
rt_hw_interrupt_enable(level); rt_hw_interrupt_enable(level);
/* add the length */
((struct rt_mq_message *)msg)->length = size;
/* copy buffer */ /* copy buffer */
rt_memcpy(msg + 1, buffer, size); rt_memcpy(GET_MESSAGEBYTE_ADDR(msg), buffer, size);
/* disable interrupt */ /* disable interrupt */
level = rt_hw_interrupt_disable(); level = rt_hw_interrupt_disable();
@ -3536,10 +3537,10 @@ RTM_EXPORT(rt_mq_urgent);
* If use macro RT_WAITING_NO to set this parameter, which means that this * If use macro RT_WAITING_NO to set this parameter, which means that this
* function is non-blocking and will return immediately. * function is non-blocking and will return immediately.
* *
* @return Return the operation status. When the return value is RT_EOK, the operation is successful. * @return Return the real length of the message. When the return value is larger than zero, the operation is successful.
* If the return value is any other values, it means that the mailbox release failed. * If the return value is any other values, it means that the mailbox release failed.
*/ */
static rt_err_t _rt_mq_recv(rt_mq_t mq, static rt_ssize_t _rt_mq_recv(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout, rt_int32_t timeout,
@ -3550,6 +3551,7 @@ static rt_err_t _rt_mq_recv(rt_mq_t mq,
struct rt_mq_message *msg; struct rt_mq_message *msg;
rt_uint32_t tick_delta; rt_uint32_t tick_delta;
rt_err_t ret; rt_err_t ret;
rt_size_t len;
/* parameter check */ /* parameter check */
RT_ASSERT(mq != RT_NULL); RT_ASSERT(mq != RT_NULL);
@ -3665,8 +3667,13 @@ static rt_err_t _rt_mq_recv(rt_mq_t mq,
/* enable interrupt */ /* enable interrupt */
rt_hw_interrupt_enable(level); rt_hw_interrupt_enable(level);
/* get real message length */
len = ((struct rt_mq_message *)msg)->length;
if (len > size)
len = size;
/* copy message */ /* copy message */
rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size); rt_memcpy(buffer, GET_MESSAGEBYTE_ADDR(msg), len);
/* disable interrupt */ /* disable interrupt */
level = rt_hw_interrupt_disable(); level = rt_hw_interrupt_disable();
@ -3686,7 +3693,7 @@ static rt_err_t _rt_mq_recv(rt_mq_t mq,
rt_schedule(); rt_schedule();
return RT_EOK; return len;
} }
/* enable interrupt */ /* enable interrupt */
@ -3694,10 +3701,10 @@ static rt_err_t _rt_mq_recv(rt_mq_t mq,
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
return RT_EOK; return len;
} }
rt_err_t rt_mq_recv(rt_mq_t mq, rt_ssize_t rt_mq_recv(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout) rt_int32_t timeout)
@ -3706,7 +3713,7 @@ rt_err_t rt_mq_recv(rt_mq_t mq,
} }
RTM_EXPORT(rt_mq_recv); RTM_EXPORT(rt_mq_recv);
rt_err_t rt_mq_recv_interruptible(rt_mq_t mq, rt_ssize_t rt_mq_recv_interruptible(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout) rt_int32_t timeout)
@ -3715,7 +3722,7 @@ rt_err_t rt_mq_recv_interruptible(rt_mq_t mq,
} }
RTM_EXPORT(rt_mq_recv_interruptible); RTM_EXPORT(rt_mq_recv_interruptible);
rt_err_t rt_mq_recv_killable(rt_mq_t mq, rt_ssize_t rt_mq_recv_killable(rt_mq_t mq,
void *buffer, void *buffer,
rt_size_t size, rt_size_t size,
rt_int32_t timeout) rt_int32_t timeout)