/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2006-03-14 Bernard the first version * 2006-04-25 Bernard implement semaphore * 2006-05-03 Bernard add RT_IPC_DEBUG * modify the type of IPC waiting time to rt_int32_t * 2006-05-10 Bernard fix the semaphore take bug and add IPC object * 2006-05-12 Bernard implement mailbox and message queue * 2006-05-20 Bernard implement mutex * 2006-05-23 Bernard implement fast event * 2006-05-24 Bernard implement event * 2006-06-03 Bernard fix the thread timer init bug * 2006-06-05 Bernard fix the mutex release bug * 2006-06-07 Bernard fix the message queue send bug * 2006-08-04 Bernard add hook support * 2009-05-21 Yi.qiu fix the sem release bug * 2009-07-18 Bernard fix the event clear bug * 2009-09-09 Bernard remove fast event and fix ipc release bug * 2009-10-10 Bernard change semaphore and mutex value to unsigned value * 2009-10-25 Bernard change the mb/mq receive timeout to 0 if the * re-calculated delta tick is a negative number. * 2009-12-16 Bernard fix the rt_ipc_object_suspend issue when IPC flag * is RT_IPC_FLAG_PRIO * 2010-01-20 mbbill remove rt_ipc_object_decrease function. * 2010-04-20 Bernard move memcpy outside interrupt disable in mq * 2010-10-26 yi.qiu add module support in rt_mp_delete and rt_mq_delete * 2010-11-10 Bernard add IPC reset command implementation. * 2011-12-18 Bernard add more parameter checking in message queue * 2013-09-14 Grissiom add an option check in rt_event_recv * 2018-10-02 Bernard add 64bit support for mailbox * 2019-09-16 tyx add send wait support for message queue * 2020-07-29 Meco Man fix thread->event_set/event_info when received an event without pending */ #include #include #ifdef RT_USING_HOOK extern void (*rt_object_trytake_hook)(struct rt_object *object); extern void (*rt_object_take_hook)(struct rt_object *object); extern void (*rt_object_put_hook)(struct rt_object *object); #endif /** * @addtogroup IPC */ /**@{*/ /** * This function will initialize an IPC object * * @param ipc the IPC object * * @return the operation status, RT_EOK on successful */ rt_inline rt_err_t rt_ipc_object_init(struct rt_ipc_object *ipc) { /* initialize ipc object */ rt_list_init(&(ipc->suspend_thread)); return RT_EOK; } /** * This function will suspend a thread to a specified list. IPC object or some * double-queue object (mailbox etc.) contains this kind of list. * * @param list the IPC suspended thread list * @param thread the thread object to be suspended * @param flag the IPC object flag, * which shall be RT_IPC_FLAG_FIFO/RT_IPC_FLAG_PRIO. * * @return the operation status, RT_EOK on successful */ rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list, struct rt_thread *thread, rt_uint8_t flag) { /* suspend thread */ rt_thread_suspend(thread); switch (flag) { case RT_IPC_FLAG_FIFO: rt_list_insert_before(list, &(thread->tlist)); break; case RT_IPC_FLAG_PRIO: { struct rt_list_node *n; struct rt_thread *sthread; /* find a suitable position */ for (n = list->next; n != list; n = n->next) { sthread = rt_list_entry(n, struct rt_thread, tlist); /* find out */ if (thread->current_priority < sthread->current_priority) { /* insert this thread before the sthread */ rt_list_insert_before(&(sthread->tlist), &(thread->tlist)); break; } } /* * not found a suitable position, * append to the end of suspend_thread list */ if (n == list) rt_list_insert_before(list, &(thread->tlist)); } break; default: break; } return RT_EOK; } /** * This function will resume the first thread in the list of a IPC object: * - remove the thread from suspend queue of IPC object * - put the thread into system ready queue * * @param list the thread list * * @return the operation status, RT_EOK on successful */ rt_inline rt_err_t rt_ipc_list_resume(rt_list_t *list) { struct rt_thread *thread; /* get thread entry */ thread = rt_list_entry(list->next, struct rt_thread, tlist); RT_DEBUG_LOG(RT_DEBUG_IPC, ("resume thread:%s\n", thread->name)); /* resume it */ rt_thread_resume(thread); return RT_EOK; } /** * This function will resume all suspended threads in a list, including * suspend list of IPC object and private list of mailbox etc. * * @param list of the threads to resume * * @return the operation status, RT_EOK on successful */ rt_inline rt_err_t rt_ipc_list_resume_all(rt_list_t *list) { struct rt_thread *thread; register rt_ubase_t temp; /* wakeup all suspended threads */ while (!rt_list_isempty(list)) { /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get next suspended thread */ thread = rt_list_entry(list->next, struct rt_thread, tlist); /* set error code to RT_ERROR */ thread->error = -RT_ERROR; /* * resume thread * In rt_thread_resume function, it will remove current thread from * suspended list */ rt_thread_resume(thread); /* enable interrupt */ rt_hw_interrupt_enable(temp); } return RT_EOK; } #ifdef RT_USING_SEMAPHORE /** * This function will initialize a semaphore and put it under control of * resource management. * * @param sem the semaphore object * @param name the name of semaphore * @param value the initial value of semaphore * @param flag the flag of semaphore * * @return the operation status, RT_EOK on successful */ rt_err_t rt_sem_init(rt_sem_t sem, const char *name, rt_uint32_t value, rt_uint8_t flag) { RT_ASSERT(sem != RT_NULL); RT_ASSERT(value < 0x10000U); /* initialize object */ rt_object_init(&(sem->parent.parent), RT_Object_Class_Semaphore, name); /* initialize ipc object */ rt_ipc_object_init(&(sem->parent)); /* set initial value */ sem->value = (rt_uint16_t)value; /* set parent */ sem->parent.parent.flag = flag; return RT_EOK; } RTM_EXPORT(rt_sem_init); /** * This function will detach a semaphore from resource management * * @param sem the semaphore object * * @return the operation status, RT_EOK on successful * * @see rt_sem_delete */ rt_err_t rt_sem_detach(rt_sem_t sem) { /* parameter check */ RT_ASSERT(sem != RT_NULL); RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore); RT_ASSERT(rt_object_is_systemobject(&sem->parent.parent)); /* wakeup all suspended threads */ rt_ipc_list_resume_all(&(sem->parent.suspend_thread)); /* detach semaphore object */ rt_object_detach(&(sem->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_sem_detach); #ifdef RT_USING_HEAP /** * This function will create a semaphore from system resource * * @param name the name of semaphore * @param value the initial value of semaphore * @param flag the flag of semaphore * * @return the created semaphore, RT_NULL on error happen * * @see rt_sem_init */ rt_sem_t rt_sem_create(const char *name, rt_uint32_t value, rt_uint8_t flag) { rt_sem_t sem; RT_DEBUG_NOT_IN_INTERRUPT; RT_ASSERT(value < 0x10000U); /* allocate object */ sem = (rt_sem_t)rt_object_allocate(RT_Object_Class_Semaphore, name); if (sem == RT_NULL) return sem; /* initialize ipc object */ rt_ipc_object_init(&(sem->parent)); /* set initial value */ sem->value = value; /* set parent */ sem->parent.parent.flag = flag; return sem; } RTM_EXPORT(rt_sem_create); /** * This function will delete a semaphore object and release the memory * * @param sem the semaphore object * * @return the error code * * @see rt_sem_detach */ rt_err_t rt_sem_delete(rt_sem_t sem) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(sem != RT_NULL); RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore); RT_ASSERT(rt_object_is_systemobject(&sem->parent.parent) == RT_FALSE); /* wakeup all suspended threads */ rt_ipc_list_resume_all(&(sem->parent.suspend_thread)); /* delete semaphore object */ rt_object_delete(&(sem->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_sem_delete); #endif /** * This function will take a semaphore, if the semaphore is unavailable, the * thread shall wait for a specified time. * * @param sem the semaphore object * @param time the waiting time * * @return the error code */ rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time) { register rt_base_t temp; struct rt_thread *thread; /* parameter check */ RT_ASSERT(sem != RT_NULL); RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s take sem:%s, which value is: %d\n", rt_thread_self()->name, ((struct rt_object *)sem)->name, sem->value)); if (sem->value > 0) { /* semaphore is available */ sem->value --; /* enable interrupt */ rt_hw_interrupt_enable(temp); } else { /* no waiting, return with timeout */ if (time == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } else { /* current context checking */ RT_DEBUG_IN_THREAD_CONTEXT; /* semaphore is unavailable, push to suspend list */ /* get current thread */ thread = rt_thread_self(); /* reset thread error number */ thread->error = RT_EOK; RT_DEBUG_LOG(RT_DEBUG_IPC, ("sem take: suspend thread - %s\n", thread->name)); /* suspend thread */ rt_ipc_list_suspend(&(sem->parent.suspend_thread), thread, sem->parent.parent.flag); /* has waiting time, start thread timer */ if (time > 0) { RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &time); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* do schedule */ rt_schedule(); if (thread->error != RT_EOK) { return thread->error; } } } RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_sem_take); /** * This function will try to take a semaphore and immediately return * * @param sem the semaphore object * * @return the error code */ rt_err_t rt_sem_trytake(rt_sem_t sem) { return rt_sem_take(sem, 0); } RTM_EXPORT(rt_sem_trytake); /** * This function will release a semaphore, if there are threads suspended on * semaphore, it will be waked up. * * @param sem the semaphore object * * @return the error code */ rt_err_t rt_sem_release(rt_sem_t sem) { register rt_base_t temp; register rt_bool_t need_schedule; /* parameter check */ RT_ASSERT(sem != RT_NULL); RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore); RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent))); need_schedule = RT_FALSE; /* disable interrupt */ temp = rt_hw_interrupt_disable(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s releases sem:%s, which value is: %d\n", rt_thread_self()->name, ((struct rt_object *)sem)->name, sem->value)); if (!rt_list_isempty(&sem->parent.suspend_thread)) { /* resume the suspended thread */ rt_ipc_list_resume(&(sem->parent.suspend_thread)); need_schedule = RT_TRUE; } else sem->value ++; /* increase value */ /* enable interrupt */ rt_hw_interrupt_enable(temp); /* resume a thread, re-schedule */ if (need_schedule == RT_TRUE) rt_schedule(); return RT_EOK; } RTM_EXPORT(rt_sem_release); /** * This function can get or set some extra attributions of a semaphore object. * * @param sem the semaphore object * @param cmd the execution command * @param arg the execution argument * * @return the error code */ rt_err_t rt_sem_control(rt_sem_t sem, int cmd, void *arg) { rt_ubase_t level; /* parameter check */ RT_ASSERT(sem != RT_NULL); RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore); if (cmd == RT_IPC_CMD_RESET) { rt_ubase_t value; /* get value */ value = (rt_ubase_t)arg; /* disable interrupt */ level = rt_hw_interrupt_disable(); /* resume all waiting thread */ rt_ipc_list_resume_all(&sem->parent.suspend_thread); /* set new value */ sem->value = (rt_uint16_t)value; /* enable interrupt */ rt_hw_interrupt_enable(level); rt_schedule(); return RT_EOK; } return -RT_ERROR; } RTM_EXPORT(rt_sem_control); #endif /* end of RT_USING_SEMAPHORE */ #ifdef RT_USING_MUTEX /** * This function will initialize a mutex and put it under control of resource * management. * * @param mutex the mutex object * @param name the name of mutex * @param flag the flag of mutex * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mutex_init(rt_mutex_t mutex, const char *name, rt_uint8_t flag) { /* parameter check */ RT_ASSERT(mutex != RT_NULL); /* initialize object */ rt_object_init(&(mutex->parent.parent), RT_Object_Class_Mutex, name); /* initialize ipc object */ rt_ipc_object_init(&(mutex->parent)); mutex->value = 1; mutex->owner = RT_NULL; mutex->original_priority = 0xFF; mutex->hold = 0; /* set flag */ mutex->parent.parent.flag = flag; return RT_EOK; } RTM_EXPORT(rt_mutex_init); /** * This function will detach a mutex from resource management * * @param mutex the mutex object * * @return the operation status, RT_EOK on successful * * @see rt_mutex_delete */ rt_err_t rt_mutex_detach(rt_mutex_t mutex) { /* parameter check */ RT_ASSERT(mutex != RT_NULL); RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex); RT_ASSERT(rt_object_is_systemobject(&mutex->parent.parent)); /* wakeup all suspended threads */ rt_ipc_list_resume_all(&(mutex->parent.suspend_thread)); /* detach semaphore object */ rt_object_detach(&(mutex->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mutex_detach); #ifdef RT_USING_HEAP /** * This function will create a mutex from system resource * * @param name the name of mutex * @param flag the flag of mutex * * @return the created mutex, RT_NULL on error happen * * @see rt_mutex_init */ rt_mutex_t rt_mutex_create(const char *name, rt_uint8_t flag) { struct rt_mutex *mutex; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mutex = (rt_mutex_t)rt_object_allocate(RT_Object_Class_Mutex, name); if (mutex == RT_NULL) return mutex; /* initialize ipc object */ rt_ipc_object_init(&(mutex->parent)); mutex->value = 1; mutex->owner = RT_NULL; mutex->original_priority = 0xFF; mutex->hold = 0; /* set flag */ mutex->parent.parent.flag = flag; return mutex; } RTM_EXPORT(rt_mutex_create); /** * This function will delete a mutex object and release the memory * * @param mutex the mutex object * * @return the error code * * @see rt_mutex_detach */ rt_err_t rt_mutex_delete(rt_mutex_t mutex) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(mutex != RT_NULL); RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex); RT_ASSERT(rt_object_is_systemobject(&mutex->parent.parent) == RT_FALSE); /* wakeup all suspended threads */ rt_ipc_list_resume_all(&(mutex->parent.suspend_thread)); /* delete mutex object */ rt_object_delete(&(mutex->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mutex_delete); #endif /** * This function will take a mutex, if the mutex is unavailable, the * thread shall wait for a specified time. * * @param mutex the mutex object * @param time the waiting time * * @return the error code */ rt_err_t rt_mutex_take(rt_mutex_t mutex, rt_int32_t time) { register rt_base_t temp; struct rt_thread *thread; /* this function must not be used in interrupt even if time = 0 */ RT_DEBUG_IN_THREAD_CONTEXT; /* parameter check */ RT_ASSERT(mutex != RT_NULL); RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex); /* get current thread */ thread = rt_thread_self(); /* disable interrupt */ temp = rt_hw_interrupt_disable(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mutex->parent.parent))); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_take: current thread %s, mutex value: %d, hold: %d\n", thread->name, mutex->value, mutex->hold)); /* reset thread error */ thread->error = RT_EOK; if (mutex->owner == thread) { /* it's the same thread */ mutex->hold ++; } else { #ifdef RT_USING_SIGNALS __again: #endif /* end of RT_USING_SIGNALS */ /* The value of mutex is 1 in initial status. Therefore, if the * value is great than 0, it indicates the mutex is avaible. */ if (mutex->value > 0) { /* mutex is available */ mutex->value --; /* set mutex owner and original priority */ mutex->owner = thread; mutex->original_priority = thread->current_priority; mutex->hold ++; } else { /* no waiting, return with timeout */ if (time == 0) { /* set error as timeout */ thread->error = -RT_ETIMEOUT; /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } else { /* mutex is unavailable, push to suspend list */ RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_take: suspend thread: %s\n", thread->name)); /* change the owner thread priority of mutex */ if (thread->current_priority < mutex->owner->current_priority) { /* change the owner thread priority */ rt_thread_control(mutex->owner, RT_THREAD_CTRL_CHANGE_PRIORITY, &thread->current_priority); } /* suspend current thread */ rt_ipc_list_suspend(&(mutex->parent.suspend_thread), thread, mutex->parent.parent.flag); /* has waiting time, start thread timer */ if (time > 0) { RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_take: start the timer of thread:%s\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &time); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* do schedule */ rt_schedule(); if (thread->error != RT_EOK) { #ifdef RT_USING_SIGNALS /* interrupt by signal, try it again */ if (thread->error == -RT_EINTR) goto __again; #endif /* end of RT_USING_SIGNALS */ /* return error */ return thread->error; } else { /* the mutex is taken successfully. */ /* disable interrupt */ temp = rt_hw_interrupt_disable(); } } } } /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mutex->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_mutex_take); /** * This function will release a mutex, if there are threads suspended on mutex, * it will be waked up. * * @param mutex the mutex object * * @return the error code */ rt_err_t rt_mutex_release(rt_mutex_t mutex) { register rt_base_t temp; struct rt_thread *thread; rt_bool_t need_schedule; /* parameter check */ RT_ASSERT(mutex != RT_NULL); RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex); need_schedule = RT_FALSE; /* only thread could release mutex because we need test the ownership */ RT_DEBUG_IN_THREAD_CONTEXT; /* get current thread */ thread = rt_thread_self(); /* disable interrupt */ temp = rt_hw_interrupt_disable(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_release:current thread %s, mutex value: %d, hold: %d\n", thread->name, mutex->value, mutex->hold)); RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mutex->parent.parent))); /* mutex only can be released by owner */ if (thread != mutex->owner) { thread->error = -RT_ERROR; /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_ERROR; } /* decrease hold */ mutex->hold --; /* if no hold */ if (mutex->hold == 0) { /* change the owner thread to original priority */ if (mutex->original_priority != mutex->owner->current_priority) { rt_thread_control(mutex->owner, RT_THREAD_CTRL_CHANGE_PRIORITY, &(mutex->original_priority)); } /* wakeup suspended thread */ if (!rt_list_isempty(&mutex->parent.suspend_thread)) { /* get suspended thread */ thread = rt_list_entry(mutex->parent.suspend_thread.next, struct rt_thread, tlist); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_release: resume thread: %s\n", thread->name)); /* set new owner and priority */ mutex->owner = thread; mutex->original_priority = thread->current_priority; mutex->hold ++; /* resume thread */ rt_ipc_list_resume(&(mutex->parent.suspend_thread)); need_schedule = RT_TRUE; } else { /* increase value */ mutex->value ++; /* clear owner */ mutex->owner = RT_NULL; mutex->original_priority = 0xff; } } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* perform a schedule */ if (need_schedule == RT_TRUE) rt_schedule(); return RT_EOK; } RTM_EXPORT(rt_mutex_release); /** * This function can get or set some extra attributions of a mutex object. * * @param mutex the mutex object * @param cmd the execution command * @param arg the execution argument * * @return the error code */ rt_err_t rt_mutex_control(rt_mutex_t mutex, int cmd, void *arg) { /* parameter check */ RT_ASSERT(mutex != RT_NULL); RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex); return -RT_ERROR; } RTM_EXPORT(rt_mutex_control); #endif /* end of RT_USING_MUTEX */ #ifdef RT_USING_EVENT /** * This function will initialize an event and put it under control of resource * management. * * @param event the event object * @param name the name of event * @param flag the flag of event * * @return the operation status, RT_EOK on successful */ rt_err_t rt_event_init(rt_event_t event, const char *name, rt_uint8_t flag) { /* parameter check */ RT_ASSERT(event != RT_NULL); /* initialize object */ rt_object_init(&(event->parent.parent), RT_Object_Class_Event, name); /* set parent flag */ event->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(event->parent)); /* initialize event */ event->set = 0; return RT_EOK; } RTM_EXPORT(rt_event_init); /** * This function will detach an event object from resource management * * @param event the event object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_event_detach(rt_event_t event) { /* parameter check */ RT_ASSERT(event != RT_NULL); RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event); RT_ASSERT(rt_object_is_systemobject(&event->parent.parent)); /* resume all suspended thread */ rt_ipc_list_resume_all(&(event->parent.suspend_thread)); /* detach event object */ rt_object_detach(&(event->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_event_detach); #ifdef RT_USING_HEAP /** * This function will create an event object from system resource * * @param name the name of event * @param flag the flag of event * * @return the created event, RT_NULL on error happen */ rt_event_t rt_event_create(const char *name, rt_uint8_t flag) { rt_event_t event; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ event = (rt_event_t)rt_object_allocate(RT_Object_Class_Event, name); if (event == RT_NULL) return event; /* set parent */ event->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(event->parent)); /* initialize event */ event->set = 0; return event; } RTM_EXPORT(rt_event_create); /** * This function will delete an event object and release the memory * * @param event the event object * * @return the error code */ rt_err_t rt_event_delete(rt_event_t event) { /* parameter check */ RT_ASSERT(event != RT_NULL); RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event); RT_ASSERT(rt_object_is_systemobject(&event->parent.parent) == RT_FALSE); RT_DEBUG_NOT_IN_INTERRUPT; /* resume all suspended thread */ rt_ipc_list_resume_all(&(event->parent.suspend_thread)); /* delete event object */ rt_object_delete(&(event->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_event_delete); #endif /** * This function will send an event to the event object, if there are threads * suspended on event object, it will be waked up. * * @param event the event object * @param set the event set * * @return the error code */ rt_err_t rt_event_send(rt_event_t event, rt_uint32_t set) { struct rt_list_node *n; struct rt_thread *thread; register rt_ubase_t level; register rt_base_t status; rt_bool_t need_schedule; /* parameter check */ RT_ASSERT(event != RT_NULL); RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event); if (set == 0) return -RT_ERROR; need_schedule = RT_FALSE; /* disable interrupt */ level = rt_hw_interrupt_disable(); /* set event */ event->set |= set; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(event->parent.parent))); if (!rt_list_isempty(&event->parent.suspend_thread)) { /* search thread list to resume thread */ n = event->parent.suspend_thread.next; while (n != &(event->parent.suspend_thread)) { /* get thread */ thread = rt_list_entry(n, struct rt_thread, tlist); status = -RT_ERROR; if (thread->event_info & RT_EVENT_FLAG_AND) { if ((thread->event_set & event->set) == thread->event_set) { /* received an AND event */ status = RT_EOK; } } else if (thread->event_info & RT_EVENT_FLAG_OR) { if (thread->event_set & event->set) { /* save the received event set */ thread->event_set = thread->event_set & event->set; /* received an OR event */ status = RT_EOK; } } else { /* enable interrupt */ rt_hw_interrupt_enable(level); return -RT_EINVAL; } /* move node to the next */ n = n->next; /* condition is satisfied, resume thread */ if (status == RT_EOK) { /* clear event */ if (thread->event_info & RT_EVENT_FLAG_CLEAR) event->set &= ~thread->event_set; /* resume thread, and thread list breaks out */ rt_thread_resume(thread); /* need do a scheduling */ need_schedule = RT_TRUE; } } } /* enable interrupt */ rt_hw_interrupt_enable(level); /* do a schedule */ if (need_schedule == RT_TRUE) rt_schedule(); return RT_EOK; } RTM_EXPORT(rt_event_send); /** * This function will receive an event from event object, if the event is * unavailable, the thread shall wait for a specified time. * * @param event the fast event object * @param set the interested event set * @param option the receive option, either RT_EVENT_FLAG_AND or * RT_EVENT_FLAG_OR should be set. * @param timeout the waiting time * @param recved the received event, if you don't care, RT_NULL can be set. * * @return the error code */ rt_err_t rt_event_recv(rt_event_t event, rt_uint32_t set, rt_uint8_t option, rt_int32_t timeout, rt_uint32_t *recved) { struct rt_thread *thread; register rt_ubase_t level; register rt_base_t status; RT_DEBUG_IN_THREAD_CONTEXT; /* parameter check */ RT_ASSERT(event != RT_NULL); RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event); if (set == 0) return -RT_ERROR; /* initialize status */ status = -RT_ERROR; /* get current thread */ thread = rt_thread_self(); /* reset thread error */ thread->error = RT_EOK; RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(event->parent.parent))); /* disable interrupt */ level = rt_hw_interrupt_disable(); /* check event set */ if (option & RT_EVENT_FLAG_AND) { if ((event->set & set) == set) status = RT_EOK; } else if (option & RT_EVENT_FLAG_OR) { if (event->set & set) status = RT_EOK; } else { /* either RT_EVENT_FLAG_AND or RT_EVENT_FLAG_OR should be set */ RT_ASSERT(0); } if (status == RT_EOK) { /* set received event */ if (recved) *recved = (event->set & set); /* fill thread event info */ thread->event_set = (event->set & set); thread->event_info = option; /* received event */ if (option & RT_EVENT_FLAG_CLEAR) event->set &= ~set; } else if (timeout == 0) { /* no waiting */ thread->error = -RT_ETIMEOUT; /* enable interrupt */ rt_hw_interrupt_enable(level); return -RT_ETIMEOUT; } else { /* fill thread event info */ thread->event_set = set; thread->event_info = option; /* put thread to suspended thread list */ rt_ipc_list_suspend(&(event->parent.suspend_thread), thread, event->parent.parent.flag); /* if there is a waiting timeout, active thread timer */ if (timeout > 0) { /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(level); /* do a schedule */ rt_schedule(); if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* received an event, disable interrupt to protect */ level = rt_hw_interrupt_disable(); /* set received event */ if (recved) *recved = thread->event_set; } /* enable interrupt */ rt_hw_interrupt_enable(level); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(event->parent.parent))); return thread->error; } RTM_EXPORT(rt_event_recv); /** * This function can get or set some extra attributions of an event object. * * @param event the event object * @param cmd the execution command * @param arg the execution argument * * @return the error code */ rt_err_t rt_event_control(rt_event_t event, int cmd, void *arg) { rt_ubase_t level; /* parameter check */ RT_ASSERT(event != RT_NULL); RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event); if (cmd == RT_IPC_CMD_RESET) { /* disable interrupt */ level = rt_hw_interrupt_disable(); /* resume all waiting thread */ rt_ipc_list_resume_all(&event->parent.suspend_thread); /* initialize event set */ event->set = 0; /* enable interrupt */ rt_hw_interrupt_enable(level); rt_schedule(); return RT_EOK; } return -RT_ERROR; } RTM_EXPORT(rt_event_control); #endif /* end of RT_USING_EVENT */ #ifdef RT_USING_MAILBOX /** * This function will initialize a mailbox and put it under control of resource * management. * * @param mb the mailbox object * @param name the name of mailbox * @param msgpool the begin address of buffer to save received mail * @param size the size of mailbox * @param flag the flag of mailbox * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mb_init(rt_mailbox_t mb, const char *name, void *msgpool, rt_size_t size, rt_uint8_t flag) { RT_ASSERT(mb != RT_NULL); /* initialize object */ rt_object_init(&(mb->parent.parent), RT_Object_Class_MailBox, name); /* set parent flag */ mb->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(mb->parent)); /* initialize mailbox */ mb->msg_pool = (rt_ubase_t *)msgpool; mb->size = size; mb->entry = 0; mb->in_offset = 0; mb->out_offset = 0; /* initialize an additional list of sender suspend thread */ rt_list_init(&(mb->suspend_sender_thread)); return RT_EOK; } RTM_EXPORT(rt_mb_init); /** * This function will detach a mailbox from resource management * * @param mb the mailbox object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mb_detach(rt_mailbox_t mb) { /* parameter check */ RT_ASSERT(mb != RT_NULL); RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox); RT_ASSERT(rt_object_is_systemobject(&mb->parent.parent)); /* resume all suspended thread */ rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); /* also resume all mailbox private suspended thread */ rt_ipc_list_resume_all(&(mb->suspend_sender_thread)); /* detach mailbox object */ rt_object_detach(&(mb->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mb_detach); #ifdef RT_USING_HEAP /** * This function will create a mailbox object from system resource * * @param name the name of mailbox * @param size the size of mailbox * @param flag the flag of mailbox * * @return the created mailbox, RT_NULL on error happen */ rt_mailbox_t rt_mb_create(const char *name, rt_size_t size, rt_uint8_t flag) { rt_mailbox_t mb; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mb = (rt_mailbox_t)rt_object_allocate(RT_Object_Class_MailBox, name); if (mb == RT_NULL) return mb; /* set parent */ mb->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(mb->parent)); /* initialize mailbox */ mb->size = size; mb->msg_pool = (rt_ubase_t *)RT_KERNEL_MALLOC(mb->size * sizeof(rt_ubase_t)); if (mb->msg_pool == RT_NULL) { /* delete mailbox object */ rt_object_delete(&(mb->parent.parent)); return RT_NULL; } mb->entry = 0; mb->in_offset = 0; mb->out_offset = 0; /* initialize an additional list of sender suspend thread */ rt_list_init(&(mb->suspend_sender_thread)); return mb; } RTM_EXPORT(rt_mb_create); /** * This function will delete a mailbox object and release the memory * * @param mb the mailbox object * * @return the error code */ rt_err_t rt_mb_delete(rt_mailbox_t mb) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(mb != RT_NULL); RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox); RT_ASSERT(rt_object_is_systemobject(&mb->parent.parent) == RT_FALSE); /* resume all suspended thread */ rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); /* also resume all mailbox private suspended thread */ rt_ipc_list_resume_all(&(mb->suspend_sender_thread)); /* free mailbox pool */ RT_KERNEL_FREE(mb->msg_pool); /* delete mailbox object */ rt_object_delete(&(mb->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mb_delete); #endif /** * This function will send a mail to mailbox object. If the mailbox is full, * current thread will be suspended until timeout. * * @param mb the mailbox object * @param value the mail * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mb_send_wait(rt_mailbox_t mb, rt_ubase_t value, rt_int32_t timeout) { struct rt_thread *thread; register rt_ubase_t temp; rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mb != RT_NULL); RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mb->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* for non-blocking call */ if (mb->entry == mb->size && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* mailbox is full */ while (mb->entry == mb->size) { /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } RT_DEBUG_IN_THREAD_CONTEXT; /* suspend current thread */ rt_ipc_list_suspend(&(mb->suspend_sender_thread), thread, mb->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_send_wait: start timer of thread:%s\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule(); /* resume from suspend state */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* set ptr */ mb->msg_pool[mb->in_offset] = value; /* increase input offset */ ++ mb->in_offset; if (mb->in_offset >= mb->size) mb->in_offset = 0; /* increase message entry */ mb->entry ++; /* resume suspended thread */ if (!rt_list_isempty(&mb->parent.suspend_thread)) { rt_ipc_list_resume(&(mb->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mb_send_wait); /** * This function will send a mail to mailbox object, if there are threads * suspended on mailbox object, it will be waked up. This function will return * immediately, if you want blocking send, use rt_mb_send_wait instead. * * @param mb the mailbox object * @param value the mail * * @return the error code */ rt_err_t rt_mb_send(rt_mailbox_t mb, rt_ubase_t value) { return rt_mb_send_wait(mb, value, 0); } RTM_EXPORT(rt_mb_send); /** * This function will receive a mail from mailbox object, if there is no mail * in mailbox object, the thread shall wait for a specified time. * * @param mb the mailbox object * @param value the received mail will be saved in * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mb_recv(rt_mailbox_t mb, rt_ubase_t *value, rt_int32_t timeout) { struct rt_thread *thread; register rt_ubase_t temp; rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mb != RT_NULL); RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mb->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* for non-blocking call */ if (mb->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* mailbox is empty */ while (mb->entry == 0) { /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; } RT_DEBUG_IN_THREAD_CONTEXT; /* suspend current thread */ rt_ipc_list_suspend(&(mb->parent.suspend_thread), thread, mb->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_recv: start timer of thread:%s\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule(); /* resume from suspend state */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* fill ptr */ *value = mb->msg_pool[mb->out_offset]; /* increase output offset */ ++ mb->out_offset; if (mb->out_offset >= mb->size) mb->out_offset = 0; /* decrease message entry */ mb->entry --; /* resume suspended thread */ if (!rt_list_isempty(&(mb->suspend_sender_thread))) { rt_ipc_list_resume(&(mb->suspend_sender_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent))); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_mb_recv); /** * This function can get or set some extra attributions of a mailbox object. * * @param mb the mailbox object * @param cmd the execution command * @param arg the execution argument * * @return the error code */ rt_err_t rt_mb_control(rt_mailbox_t mb, int cmd, void *arg) { rt_ubase_t level; /* parameter check */ RT_ASSERT(mb != RT_NULL); RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox); if (cmd == RT_IPC_CMD_RESET) { /* disable interrupt */ level = rt_hw_interrupt_disable(); /* resume all waiting thread */ rt_ipc_list_resume_all(&(mb->parent.suspend_thread)); /* also resume all mailbox private suspended thread */ rt_ipc_list_resume_all(&(mb->suspend_sender_thread)); /* re-init mailbox */ mb->entry = 0; mb->in_offset = 0; mb->out_offset = 0; /* enable interrupt */ rt_hw_interrupt_enable(level); rt_schedule(); return RT_EOK; } return -RT_ERROR; } RTM_EXPORT(rt_mb_control); #endif /* end of RT_USING_MAILBOX */ #ifdef RT_USING_MESSAGEQUEUE struct rt_mq_message { struct rt_mq_message *next; }; /** * This function will initialize a message queue and put it under control of * resource management. * * @param mq the message object * @param name the name of message queue * @param msgpool the beginning address of buffer to save messages * @param msg_size the maximum size of message * @param pool_size the size of buffer to save messages * @param flag the flag of message queue * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_init(rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_size, rt_size_t pool_size, rt_uint8_t flag) { struct rt_mq_message *head; register rt_base_t temp; /* parameter check */ RT_ASSERT(mq != RT_NULL); /* initialize object */ rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name); /* set parent flag */ mq->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(mq->parent)); /* set message pool */ mq->msg_pool = msgpool; /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message)); /* initialize message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* initialize message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; /* initialize an additional list of sender suspend thread */ rt_list_init(&(mq->suspend_sender_thread)); return RT_EOK; } RTM_EXPORT(rt_mq_init); /** * This function will detach a message queue object from resource management * * @param mq the message queue object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_detach(rt_mq_t mq) { /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent)); /* resume all suspended thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* also resume all message queue private suspended thread */ rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* detach message queue object */ rt_object_detach(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_detach); #ifdef RT_USING_HEAP /** * This function will create a message queue object from system resource * * @param name the name of message queue * @param msg_size the size of message * @param max_msgs the maximum number of message in queue * @param flag the flag of message queue * * @return the created message queue, RT_NULL on error happen */ rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag) { struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq; /* set parent */ mq->parent.parent.flag = flag; /* initialize ipc object */ rt_ipc_object_init(&(mq->parent)); /* initialize message queue */ /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs; /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_object_delete(&(mq->parent.parent)); return RT_NULL; } /* initialize message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* initialize message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; /* initialize an additional list of sender suspend thread */ rt_list_init(&(mq->suspend_sender_thread)); return mq; } RTM_EXPORT(rt_mq_create); /** * This function will delete a message queue object and release the memory * * @param mq the message queue object * * @return the error code */ rt_err_t rt_mq_delete(rt_mq_t mq) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE); /* resume all suspended thread */ rt_ipc_list_resume_all(&(mq->parent.suspend_thread)); /* also resume all message queue private suspended thread */ rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* free message queue pool */ RT_KERNEL_FREE(mq->msg_pool); /* delete message queue object */ rt_object_delete(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_delete); #endif /** * This function will send a message to message queue object. If the message queue is full, * current thread will be suspended until timeout. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mq_send_wait(rt_mq_t mq, const void *buffer, rt_size_t size, rt_int32_t timeout) { register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; struct rt_thread *thread; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* for non-blocking call */ if (msg == RT_NULL && timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* message queue is full */ while ((msg = mq->msg_queue_free) == RT_NULL) { /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } RT_DEBUG_IN_THREAD_CONTEXT; /* suspend current thread */ rt_ipc_list_suspend(&(mq->suspend_sender_thread), thread, mq->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule(); /* resume from suspend state */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* the msg is the new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer */ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; } /* set new tail */ mq->msg_queue_tail = msg; /* if the head is empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; /* increase message entry */ mq->entry ++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_send_wait) /** * This function will send a message to message queue object, if there are * threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size) { return rt_mq_send_wait(mq, buffer, size, 0); } RTM_EXPORT(rt_mq_send); /** * This function will send an urgent message to message queue object, which * means the message will be inserted to the head of message queue. If there * are threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size) { register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy buffer */ rt_memcpy(msg + 1, buffer, size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to the beginning of message queue */ msg->next = (struct rt_mq_message *)mq->msg_queue_head; mq->msg_queue_head = msg; /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg; /* increase message entry */ mq->entry ++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_urgent); /** * This function will receive a message from message queue object, if there is * no message in message queue object, the thread shall wait for a specified * time. * * @param mq the message queue object * @param buffer the received message will be saved in * @param size the size of buffer * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout) { struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT; /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; } /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag); /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule(); /* recv message */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL; /* decrease message entry */ mq->entry --; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy message */ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* resume suspended thread */ if (!rt_list_isempty(&(mq->suspend_sender_thread))) { rt_ipc_list_resume(&(mq->suspend_sender_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_mq_recv); /** * This function can get or set some extra attributions of a message queue * object. * * @param mq the message queue object * @param cmd the execution command * @param arg the execution argument * * @return the error code */ rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg) { rt_ubase_t level; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); if (cmd == RT_IPC_CMD_RESET) { /* disable interrupt */ level = rt_hw_interrupt_disable(); /* resume all waiting thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* also resume all message queue private suspended thread */ rt_ipc_list_resume_all(&(mq->suspend_sender_thread)); /* release all message in the queue */ while (mq->msg_queue_head != RT_NULL) { /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL; /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; } /* clean entry */ mq->entry = 0; /* enable interrupt */ rt_hw_interrupt_enable(level); rt_schedule(); return RT_EOK; } return -RT_ERROR; } RTM_EXPORT(rt_mq_control); #endif /* end of RT_USING_MESSAGEQUEUE */ /**@}*/