From 252bc41a2ca1dff8e2f8306f3e92c0bab5111d98 Mon Sep 17 00:00:00 2001 From: "mbbill@gmail.com" Date: Wed, 8 Jun 2011 16:40:07 +0000 Subject: [PATCH] add a blocking mailbox post function git-svn-id: https://rt-thread.googlecode.com/svn/trunk@1471 bbd45198-f89e-11dd-88c7-29a3b14d5316 --- include/rtdef.h | 2 + src/ipc.c | 205 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 154 insertions(+), 53 deletions(-) diff --git a/include/rtdef.h b/include/rtdef.h index 898bf91399..f5d1449d45 100644 --- a/include/rtdef.h +++ b/include/rtdef.h @@ -489,6 +489,8 @@ struct rt_mailbox rt_uint16_t entry; /**< index of messages in msg_pool. */ rt_uint16_t in_offset, out_offset; /**< in/output offset of the message buffer. */ + + rt_list_t suspend_sender_thread; /**< sender thread suspended on this mb */ }; typedef struct rt_mailbox* rt_mailbox_t; #endif diff --git a/src/ipc.c b/src/ipc.c index e012a54aa6..2721d8591e 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -71,23 +71,23 @@ rt_inline rt_err_t rt_ipc_object_init(struct rt_ipc_object *ipc) } /** - * This function will suspend a thread for a specified IPC object and put the - * thread into suspend queue of IPC object + * This function will suspend a thread to a specified list. IPC object or some double-queue + * object (mailbox etc.) contains this kind of list. * * @param ipc the IPC object * @param thread the thread object to be suspended * * @return the operation status, RT_EOK on successful */ -rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_thread *thread) +rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list, struct rt_thread *thread, rt_uint8_t flag) { /* suspend thread */ rt_thread_suspend(thread); - switch (ipc->parent.flag) + switch (flag) { case RT_IPC_FLAG_FIFO: - rt_list_insert_before(&(ipc->suspend_thread), &(thread->tlist)); + rt_list_insert_before(list, &(thread->tlist)); break; case RT_IPC_FLAG_PRIO: @@ -96,8 +96,7 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th struct rt_thread* sthread; /* find a suitable position */ - for (n = ipc->suspend_thread.next; n != &(ipc->suspend_thread); - n = n->next) + for (n = list->next; n != list; n = n->next) { sthread = rt_list_entry(n, struct rt_thread, tlist); @@ -111,8 +110,8 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th } /* not found a suitable position, append to the end of suspend_thread list */ - if (n == &(ipc->suspend_thread)) - rt_list_insert_before(&(ipc->suspend_thread), &(thread->tlist)); + if (n == list) + rt_list_insert_before(list, &(thread->tlist)); } break; } @@ -121,20 +120,20 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th } /** - * This function will resume a thread from an IPC object: + * This function will resume the first thread in the list of a IPC object: * - remove the thread from suspend queue of IPC object * - put the thread into system ready queue * - * @param ipc the IPC object + * @param list the thread list * * @return the operation status, RT_EOK on successful */ -rt_inline rt_err_t rt_ipc_object_resume(struct rt_ipc_object* ipc) +rt_inline rt_err_t rt_ipc_list_resume(rt_list_t *list) { struct rt_thread *thread; /* get thread entry */ - thread = rt_list_entry(ipc->suspend_thread.next, struct rt_thread, tlist); + thread = rt_list_entry(list->next, struct rt_thread, tlist); #ifdef RT_IPC_DEBUG rt_kprintf("resume thread:%s\n", thread->name); @@ -147,25 +146,26 @@ rt_inline rt_err_t rt_ipc_object_resume(struct rt_ipc_object* ipc) } /** - * This function will resume all suspended threads in an IPC object. + * This function will resume all suspended threads in a list, including + * suspend list of IPC object and private list of mailbox etc. * - * @param ipc the IPC object + * @param list of the threads to resume * * @return the operation status, RT_EOK on successful */ -rt_inline rt_err_t rt_ipc_object_resume_all(struct rt_ipc_object* ipc) +rt_inline rt_err_t rt_ipc_list_resume_all(rt_list_t *list) { struct rt_thread* thread; register rt_ubase_t temp; /* wakeup all suspend threads */ - while (!rt_list_isempty(&(ipc->suspend_thread))) + while (!rt_list_isempty(list)) { /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get next suspend thread */ - thread = rt_list_entry(ipc->suspend_thread.next, struct rt_thread, tlist); + thread = rt_list_entry(list->next, struct rt_thread, tlist); /* set error code to RT_ERROR */ thread->error = -RT_ERROR; @@ -229,7 +229,7 @@ rt_err_t rt_sem_detach (rt_sem_t sem) RT_ASSERT(sem != RT_NULL); /* wakeup all suspend threads */ - rt_ipc_object_resume_all(&(sem->parent)); + rt_ipc_list_resume_all(&(sem->parent.suspend_thread)); /* detach semaphore object */ rt_object_detach(&(sem->parent.parent)); @@ -283,7 +283,7 @@ rt_err_t rt_sem_delete (rt_sem_t sem) RT_ASSERT(sem != RT_NULL); /* wakeup all suspend threads */ - rt_ipc_object_resume_all(&(sem->parent)); + rt_ipc_list_resume_all(&(sem->parent.suspend_thread)); /* delete semaphore object */ rt_object_delete(&(sem->parent.parent)); @@ -315,7 +315,7 @@ rt_err_t rt_sem_take (rt_sem_t sem, rt_int32_t time) /* disable interrupt */ temp = rt_hw_interrupt_disable(); -#ifdef RT_IPC_DEBUG +#ifdef RT_IPC_DEBU rt_kprintf("thread %s take sem:%s, which value is: %d\n", rt_thread_self()->name, ((struct rt_object*)sem)->name, sem->value); #endif @@ -349,7 +349,8 @@ rt_err_t rt_sem_take (rt_sem_t sem, rt_int32_t time) #endif /* suspend thread */ - rt_ipc_object_suspend(&(sem->parent), thread); + rt_ipc_list_suspend(&(sem->parent.suspend_thread), + thread, sem->parent.parent.flag); /* has waiting time, start thread timer */ if (time > 0) @@ -424,7 +425,7 @@ rt_err_t rt_sem_release(rt_sem_t sem) if ( !rt_list_isempty(&sem->parent.suspend_thread) ) { /* resume the suspended thread */ - rt_ipc_object_resume(&(sem->parent)); + rt_ipc_list_resume(&(sem->parent.suspend_thread)); need_schedule = RT_TRUE; } else sem->value ++; /* increase value */ @@ -457,12 +458,12 @@ rt_err_t rt_sem_control(rt_sem_t sem, rt_uint8_t cmd, void* arg) rt_uint32_t value; /* get value */ - value = (rt_uint32_t)arg; + value = (rt_uint32_t)arg; /* disable interrupt */ level = rt_hw_interrupt_disable(); /* resume all waiting thread */ - rt_ipc_object_resume_all(&sem->parent); + rt_ipc_list_resume_all(&sem->parent.suspend_thread); /* set new value */ sem->value = (rt_uint16_t)value; @@ -525,7 +526,7 @@ rt_err_t rt_mutex_detach (rt_mutex_t mutex) RT_ASSERT(mutex != RT_NULL); /* wakeup all suspend threads */ - rt_ipc_object_resume_all(&(mutex->parent)); + rt_ipc_list_resume_all(&(mutex->parent.suspend_thread)); /* detach semaphore object */ rt_object_detach(&(mutex->parent.parent)); @@ -580,7 +581,7 @@ rt_err_t rt_mutex_delete (rt_mutex_t mutex) RT_ASSERT(mutex != RT_NULL); /* wakeup all suspend threads */ - rt_ipc_object_resume_all(&(mutex->parent)); + rt_ipc_list_resume_all(&(mutex->parent.suspend_thread)); /* delete semaphore object */ rt_object_delete(&(mutex->parent.parent)); @@ -671,7 +672,8 @@ rt_err_t rt_mutex_take (rt_mutex_t mutex, rt_int32_t time) } /* suspend current thread */ - rt_ipc_object_suspend(&(mutex->parent), thread); + rt_ipc_list_suspend(&(mutex->parent.suspend_thread), + thread, mutex->parent.parent.flag); /* has waiting time, start thread timer */ if (time > 0) @@ -784,7 +786,7 @@ rt_err_t rt_mutex_release(rt_mutex_t mutex) mutex->hold ++; /* resume thread */ - rt_ipc_object_resume(&(mutex->parent)); + rt_ipc_list_resume(&(mutex->parent.suspend_thread)); need_schedule = RT_TRUE; } @@ -868,7 +870,7 @@ rt_err_t rt_event_detach(rt_event_t event) RT_ASSERT(event != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all(&(event->parent)); + rt_ipc_list_resume_all(&(event->parent.suspend_thread)); /* detach event object */ rt_object_detach(&(event->parent.parent)); @@ -918,7 +920,7 @@ rt_err_t rt_event_delete (rt_event_t event) RT_ASSERT(event != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all(&(event->parent)); + rt_ipc_list_resume_all(&(event->parent.suspend_thread)); /* delete event object */ rt_object_delete(&(event->parent.parent)); @@ -1085,7 +1087,8 @@ rt_err_t rt_event_recv(rt_event_t event, rt_uint32_t set, rt_uint8_t option, rt_ thread->event_info = option; /* put thread to suspended thread list */ - rt_ipc_object_suspend(&(event->parent), thread); + rt_ipc_list_suspend(&(event->parent.suspend_thread), + thread, event->parent.parent.flag); /* if there is a waiting timeout, active thread timer */ if (timeout > 0) @@ -1144,7 +1147,7 @@ rt_err_t rt_event_control (rt_event_t event, rt_uint8_t cmd, void* arg) level = rt_hw_interrupt_disable(); /* resume all waiting thread */ - rt_ipc_object_resume_all(&event->parent); + rt_ipc_list_resume_all(&event->parent.suspend_thread); /* init event set */ event->set = 0; @@ -1194,6 +1197,8 @@ rt_err_t rt_mb_init(rt_mailbox_t mb, const char* name, void* msgpool, rt_size_t mb->in_offset = 0; mb->out_offset = 0; + rt_list_init(&(mb->suspend_sender_thread)); + return RT_EOK; } @@ -1210,7 +1215,9 @@ rt_err_t rt_mb_detach(rt_mailbox_t mb) RT_ASSERT(mb != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all(&(mb->parent)); + rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); + /* also resume all mailbox private suspended thread */ + rt_ipc_list_resume_all(&(mb->suspend_sender_thread)); /* detach mailbox object */ rt_object_detach(&(mb->parent.parent)); @@ -1272,11 +1279,13 @@ rt_err_t rt_mb_delete (rt_mailbox_t mb) RT_ASSERT(mb != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all(&(mb->parent)); + rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); + /* also resume all mailbox private suspended thread */ + rt_ipc_list_resume_all(&(mb->suspend_sender_thread)); #ifdef RT_USING_MODULE /* the mb object belongs to an application module */ - if(mb->parent.parent.flag & RT_OBJECT_FLAG_MODULE) + if(mb->parent.parent.flag & RT_OBJECT_FLAG_MODULE) rt_module_free(mb->parent.parent.module_id, mb->msg_pool); else #endif @@ -1292,17 +1301,20 @@ rt_err_t rt_mb_delete (rt_mailbox_t mb) #endif /** - * This function will send a mail to mailbox object, if there are threads suspended - * on mailbox object, it will be waked up. + * This function will send a mail to mailbox object. If the mailbox is full, + * current thread will be suspended until timeout. * * @param mb the mailbox object * @param value the mail + * @param timeout the waiting time * * @return the error code */ -rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value) +rt_err_t rt_mb_send_wait (rt_mailbox_t mb, rt_uint32_t value, rt_int32_t timeout) { + struct rt_thread *thread; register rt_ubase_t temp; + rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mb != RT_NULL); @@ -1314,15 +1326,69 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value) /* disable interrupt */ temp = rt_hw_interrupt_disable(); + /* get current thread */ + thread = rt_thread_self(); + /* mailbox is full */ - if (mb->entry == mb->size) + while (mb->entry == mb->size) { + /* reset error number in thread */ + thread->error = RT_EOK; + + /* no waiting, return timeout */ + if (timeout == 0) + { + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + + thread->error = -RT_EFULL; + return -RT_EFULL; + } + + /* suspend current thread */ + rt_ipc_list_suspend(&(mb->suspend_sender_thread), + thread, mb->parent.parent.flag); + + /* has waiting time, start thread timer */ + if (timeout > 0) + { + /* get the start tick of timer */ + tick_delta = rt_tick_get(); + +#ifdef RT_IPC_DEBUG + rt_kprintf("mb_send_wait: start timer of thread:%s\n", thread->name); +#endif + /* reset the timeout of thread timer and start it */ + rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); + rt_timer_start(&(thread->thread_timer)); + } + /* enable interrupt */ rt_hw_interrupt_enable(temp); - return -RT_EFULL; + /* re-schedule */ + rt_schedule(); + + /* resume from suspend state */ + if (thread->error != RT_EOK) + { + /* return error */ + return thread->error; + } + + /* disable interrupt */ + temp = rt_hw_interrupt_disable(); + + /* re-calculate timeout tick */ + if (timeout > 0) + { + tick_delta = rt_tick_get() - tick_delta; + timeout -= tick_delta; + if (timeout < 0) timeout = 0; + } } + /* set ptr */ mb->msg_pool[mb->in_offset] = value; /* increase input offset */ @@ -1334,7 +1400,7 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value) /* resume suspended thread */ if( !rt_list_isempty(&mb->parent.suspend_thread) ) { - rt_ipc_object_resume(&(mb->parent)); + rt_ipc_list_resume(&(mb->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); @@ -1349,6 +1415,21 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value) return RT_EOK; } +/** + * This function will send a mail to mailbox object, if there are threads suspended + * on mailbox object, it will be waked up. This function will return immediately, if + * you want blocking send, use rt_mb_send_wait instead. + * + * @param mb the mailbox object + * @param value the mail + * + * @return the error code + */ +rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value) +{ + return rt_mb_send_wait(mb,value,0); +} + /** * This function will receive a mail from mailbox object, if there is no mail in * mailbox object, the thread shall wait for a specified time. @@ -1396,7 +1477,8 @@ rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout) } /* suspend current thread */ - rt_ipc_object_suspend(&(mb->parent), thread); + rt_ipc_list_suspend(&(mb->parent.suspend_thread), + thread, mb->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) @@ -1446,6 +1528,22 @@ rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout) /* decrease message entry */ mb->entry --; + /* resume suspended thread */ + if( !rt_list_isempty(&(mb->suspend_sender_thread)) ) + { + rt_ipc_list_resume(&(mb->suspend_sender_thread)); + + /* enable interrupt */ + rt_hw_interrupt_enable(temp); + +#ifdef RT_USING_HOOK + if (rt_object_take_hook != RT_NULL) rt_object_take_hook(&(mb->parent.parent)); +#endif + rt_schedule(); + + return RT_EOK; + } + /* enable interrupt */ rt_hw_interrupt_enable(temp); @@ -1476,13 +1574,13 @@ rt_err_t rt_mb_control(rt_mailbox_t mb, rt_uint8_t cmd, void* arg) level = rt_hw_interrupt_disable(); /* resume all waiting thread */ - rt_ipc_object_resume_all(&mb->parent); + rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); /* re-init mailbox */ mb->entry = 0; mb->in_offset = 0; mb->out_offset = 0; - + /* enable interrupt */ rt_hw_interrupt_enable(level); @@ -1571,7 +1669,7 @@ rt_err_t rt_mq_detach(rt_mq_t mq) RT_ASSERT(mq != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all((struct rt_ipc_object*)mq); + rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* detach message queue object */ rt_object_detach(&(mq->parent.parent)); @@ -1653,11 +1751,11 @@ rt_err_t rt_mq_delete (rt_mq_t mq) RT_ASSERT(mq != RT_NULL); /* resume all suspended thread */ - rt_ipc_object_resume_all(&(mq->parent)); + rt_ipc_list_resume_all(&(mq->parent.suspend_thread)); #ifdef RT_USING_MODULE /* the mq object belongs to an application module */ - if(mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE) + if(mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE) rt_module_free(mq->parent.parent.module_id, mq->msg_pool); else #endif @@ -1727,6 +1825,7 @@ rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size) ((struct rt_mq_message*)mq->msg_queue_tail)->next = msg; } + /* set new tail */ mq->msg_queue_tail = msg; /* if the head is empty, set head */ @@ -1738,7 +1837,7 @@ rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size) /* resume suspended thread */ if( !rt_list_isempty(&mq->parent.suspend_thread) ) { - rt_ipc_object_resume(&(mq->parent)); + rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); @@ -1814,7 +1913,7 @@ rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size) /* resume suspended thread */ if( !rt_list_isempty(&mq->parent.suspend_thread) ) { - rt_ipc_object_resume(&(mq->parent)); + rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); @@ -1875,7 +1974,8 @@ rt_err_t rt_mq_recv (rt_mq_t mq, void* buffer, rt_size_t size, rt_int32_t timeou } /* suspend current thread */ - rt_ipc_object_suspend(&(mq->parent), thread); + rt_ipc_list_suspend(&(mq->parent.suspend_thread), + thread, mq->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) @@ -1971,7 +2071,7 @@ rt_err_t rt_mq_control(rt_mq_t mq, rt_uint8_t cmd, void* arg) level = rt_hw_interrupt_disable(); /* resume all waiting thread */ - rt_ipc_object_resume_all(&mq->parent); + rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* release all message in the queue */ while (mq->msg_queue_head != RT_NULL) @@ -2002,5 +2102,4 @@ rt_err_t rt_mq_control(rt_mq_t mq, rt_uint8_t cmd, void* arg) } #endif /* end of RT_USING_MESSAGEQUEUE */ - /*@}*/