rt-thread-official/components/drivers/ipc/condvar.c

174 lines
4.7 KiB
C

/*
* Copyright (c) 2006-2023, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2023-11-20 Shell Support of condition variable
*/
#define DBG_TAG "ipc.condvar"
#define DBG_LVL DBG_INFO
#include <rtdbg.h>
#include <rtdevice.h>
#include <rtatomic.h>
#include <rtthread.h>
static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
#define CV_ASSERT_LOCKED(cv) \
RT_ASSERT(!(cv)->waiting_mtx || \
rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
rt_thread_self())
void rt_condvar_init(rt_condvar_t cv, char *name)
{
#ifdef USING_RT_OBJECT
/* TODO: support rt object */
rt_object_init();
#endif
rt_wqueue_init(&cv->event);
rt_atomic_store(&cv->waiters_cnt, 0);
rt_atomic_store(&cv->waiting_mtx, 0);
}
static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
rt_tick_t timeout, int suspend_flag)
{
rt_thread_t tcb = node->polling_thread;
rt_timer_t timer = &(tcb->thread_timer);
rt_err_t ret;
if (queue->flag != RT_WQ_FLAG_WAKEUP)
{
ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
if (ret == RT_EOK)
{
rt_wqueue_add(queue, node);
if (timeout != RT_WAITING_FOREVER)
{
rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
rt_timer_start(timer);
}
}
}
else
{
ret = RT_EOK;
}
return ret;
}
#define INIT_WAITQ_NODE(node) \
{ \
.polling_thread = rt_thread_self(), .key = 0, \
.wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
.list = RT_LIST_OBJECT_INIT(node.list) \
}
int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
rt_tick_t timeout)
{
rt_err_t acq_mtx_succ, rc;
rt_atomic_t waiting_mtx;
struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
/* not allowed in IRQ & critical section */
RT_DEBUG_SCHEDULER_AVAILABLE(1);
CV_ASSERT_LOCKED(cv);
/**
* for the worst case, this is racy with the following works to reset field
* before mutex is taken. The spinlock then comes to rescue.
*/
rt_spin_lock(&_local_cv_queue_lock);
waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
if (!waiting_mtx)
acq_mtx_succ = rt_atomic_compare_exchange_strong(
&cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
else
acq_mtx_succ = 0;
rt_spin_unlock(&_local_cv_queue_lock);
if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
{
rt_atomic_add(&cv->waiters_cnt, 1);
rt_enter_critical();
if (suspend_flag == RT_INTERRUPTIBLE)
rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
acq_mtx_succ = rt_mutex_release(mtx);
RT_ASSERT(acq_mtx_succ == 0);
rt_exit_critical();
if (rc == RT_EOK)
{
rt_schedule();
rc = rt_get_errno();
rc = rc > 0 ? -rc : rc;
}
else
{
LOG_D("%s() failed to suspend", __func__);
}
rt_wqueue_remove(&node);
rt_spin_lock(&_local_cv_queue_lock);
if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
{
waiting_mtx = (size_t)mtx;
acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
&waiting_mtx, 0);
RT_ASSERT(acq_mtx_succ == 1);
}
rt_spin_unlock(&_local_cv_queue_lock);
acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
RT_ASSERT(acq_mtx_succ == 0);
}
else
{
LOG_D("%s: conflict waiting mutex", __func__);
rc = -EBUSY;
}
return rc;
}
/** Keep in mind that we always operating when cv.waiting_mtx is taken */
int rt_condvar_signal(rt_condvar_t cv)
{
CV_ASSERT_LOCKED(cv);
/* to avoid spurious wakeups */
if (rt_atomic_load(&cv->waiters_cnt) > 0)
rt_wqueue_wakeup(&cv->event, 0);
cv->event.flag = 0;
return 0;
}
int rt_condvar_broadcast(rt_condvar_t cv)
{
CV_ASSERT_LOCKED(cv);
/* to avoid spurious wakeups */
if (rt_atomic_load(&cv->waiters_cnt) > 0)
rt_wqueue_wakeup_all(&cv->event, 0);
cv->event.flag = 0;
return 0;
}