[ipc] support of lockless rt_completion (#8887)

* [ipc] lockless rt_completion implementation

The new rt_completion implemented by lockless algorithm can improve timer resolution for up to ~12%, compare to sem IPC.

Signed-off-by: Shell <smokewood@qq.com>

* fixup: error

* remove useless changes

---------

Signed-off-by: Shell <smokewood@qq.com>
This commit is contained in:
Shell 2024-05-08 09:25:57 +08:00 committed by GitHub
parent 9ba6cec663
commit 48bd0e49f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 968 additions and 91 deletions

View File

@ -7,30 +7,7 @@ config RT_USING_DM
Enable device driver model with device tree (FDT). It will use more memory Enable device driver model with device tree (FDT). It will use more memory
to parse and support device tree feature. to parse and support device tree feature.
config RT_USING_DEVICE_IPC source "$RTT_DIR/components/drivers/ipc/Kconfig"
bool "Using device drivers IPC"
default y
config RT_UNAMED_PIPE_NUMBER
int "The number of unamed pipe"
depends on RT_USING_DEVICE_IPC
default 64
if RT_USING_DEVICE_IPC
config RT_USING_SYSTEM_WORKQUEUE
bool "Using system default workqueue"
default n
if RT_USING_SYSTEM_WORKQUEUE
config RT_SYSTEM_WORKQUEUE_STACKSIZE
int "The stack size for system workqueue thread"
default 2048
config RT_SYSTEM_WORKQUEUE_PRIORITY
int "The priority level of system workqueue thread"
default 23
endif
endif
menuconfig RT_USING_SERIAL menuconfig RT_USING_SERIAL
bool "USING Serial device drivers" bool "USING Serial device drivers"

View File

@ -5,6 +5,7 @@
* *
* Change Logs: * Change Logs:
* Date Author Notes * Date Author Notes
* 2024-04-28 Shell Add new wait_flags() & wakeup_by_errno() API
*/ */
#ifndef COMPLETION_H_ #ifndef COMPLETION_H_
#define COMPLETION_H_ #define COMPLETION_H_
@ -13,7 +14,7 @@
#include <rtconfig.h> #include <rtconfig.h>
/** /**
* Completion - A tiny IPC implementation for resource-constrained scenarios * Completion - A tiny & rapid IPC primitive for resource-constrained scenarios
* *
* It's an IPC using one CPU word with the encoding: * It's an IPC using one CPU word with the encoding:
* *
@ -24,7 +25,7 @@
struct rt_completion struct rt_completion
{ {
/* suspended thread, and completed flag */ /* suspended thread, and completed flag */
rt_base_t susp_thread_n_flag; rt_atomic_t susp_thread_n_flag;
}; };
#define RT_COMPLETION_INIT(comp) {0} #define RT_COMPLETION_INIT(comp) {0}
@ -32,7 +33,9 @@ struct rt_completion
void rt_completion_init(struct rt_completion *completion); void rt_completion_init(struct rt_completion *completion);
rt_err_t rt_completion_wait(struct rt_completion *completion, rt_err_t rt_completion_wait(struct rt_completion *completion,
rt_int32_t timeout); rt_int32_t timeout);
rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
rt_int32_t timeout, int suspend_flag);
void rt_completion_done(struct rt_completion *completion); void rt_completion_done(struct rt_completion *completion);
rt_err_t rt_completion_wakeup(struct rt_completion *completion); rt_err_t rt_completion_wakeup(struct rt_completion *completion);
rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion, rt_err_t error);
#endif #endif

View File

@ -0,0 +1,23 @@
menuconfig RT_USING_DEVICE_IPC
bool "Using device drivers IPC"
default y
if RT_USING_DEVICE_IPC
config RT_UNAMED_PIPE_NUMBER
int "The number of unamed pipe"
default 64
config RT_USING_SYSTEM_WORKQUEUE
bool "Using system default workqueue"
default n
if RT_USING_SYSTEM_WORKQUEUE
config RT_SYSTEM_WORKQUEUE_STACKSIZE
int "The stack size for system workqueue thread"
default 2048
config RT_SYSTEM_WORKQUEUE_PRIORITY
int "The priority level of system workqueue thread"
default 23
endif
endif

View File

@ -8,6 +8,11 @@ if not GetDepend('RT_USING_HEAP'):
SrcRemove(src, 'dataqueue.c') SrcRemove(src, 'dataqueue.c')
SrcRemove(src, 'pipe.c') SrcRemove(src, 'pipe.c')
if not GetDepend('RT_USING_SMP'):
SrcRemove(src, 'completion_mp.c')
else:
SrcRemove(src, 'completion_up.c')
group = DefineGroup('DeviceDrivers', src, depend = ['RT_USING_DEVICE_IPC'], CPPPATH = CPPPATH, LOCAL_CPPDEFINES=['__RT_IPC_SOURCE__']) group = DefineGroup('DeviceDrivers', src, depend = ['RT_USING_DEVICE_IPC'], CPPPATH = CPPPATH, LOCAL_CPPDEFINES=['__RT_IPC_SOURCE__'])
Return('group') Return('group')

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2006-2024, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2024-04-26 Shell lockless rt_completion
*/
#include <rtthread.h>
#include <rthw.h>
#include <rtdevice.h>
/**
* @brief This function indicates a completion has done.
*
* @param completion is a pointer to a completion object.
*/
void rt_completion_done(struct rt_completion *completion)
{
rt_completion_wakeup_by_errno(completion, -1);
}
RTM_EXPORT(rt_completion_done);
/**
* @brief This function indicates a completion has done and wakeup the thread
*
* @param completion is a pointer to a completion object.
* @return RT_EOK if wakeup succeed.
* RT_EEMPTY if wakeup failure and the completion is set to completed.
* RT_EBUSY if the completion is still in completed state
*/
rt_err_t rt_completion_wakeup(struct rt_completion *completion)
{
return rt_completion_wakeup_by_errno(completion, -1);
}
/**
* @brief This function will wait for a completion, if the completion is unavailable, the thread shall wait for
* the completion up to a specified time.
*
* @param completion is a pointer to a completion object.
*
* @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
* the completion done up to the amount of time specified by the argument.
* NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
* completion is unavailable, the thread will be waitting forever.
*
* @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
* If the return value is any other values, it means that the completion wait failed.
*
* @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
*/
rt_err_t rt_completion_wait(struct rt_completion *completion,
rt_int32_t timeout)
{
return rt_completion_wait_flags(completion, timeout, RT_UNINTERRUPTIBLE);
}
RTM_EXPORT(rt_completion_wait);

View File

@ -0,0 +1,346 @@
/*
* Copyright (c) 2006-2024, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2024-04-26 Shell lockless rt_completion for MP system
*/
#define DBG_TAG "drivers.ipc"
#define DBG_LVL DBG_INFO
#include <rtdbg.h>
#include <rtthread.h>
#include <rthw.h>
#include <rtdevice.h>
#define RT_COMPLETED 1
#define RT_UNCOMPLETED 0
#define RT_WAKING (-1)
#define RT_OCCUPIED (-2)
#define RT_COMPLETION_NEW_STAT(thread, flag) (((flag) & 1) | (((rt_base_t)thread) & ~1))
/**
* The C11 atomic can be ~5% and even faster in testing on the arm64 platform
* compared to rt_atomic. So the C11 way is always preferred.
*/
#ifdef RT_USING_STDC_ATOMIC
#include <stdatomic.h>
#define IPC_STORE(dst, val, morder) atomic_store_explicit(dst, val, morder)
#define IPC_LOAD(dst, morder) atomic_load_explicit(dst, morder)
#define IPC_BARRIER(morder) atomic_thread_fence(morder)
#define IPC_CAS(dst, exp, desired, succ, fail) \
atomic_compare_exchange_strong_explicit(dst, exp, desired, succ, fail)
#else /* !RT_USING_STDC_ATOMIC */
#include <rtatomic.h>
#define IPC_STORE(dst, val, morder) rt_atomic_store(dst, val)
#define IPC_LOAD(dst, morder) rt_atomic_load(dst)
#define IPC_BARRIER(morder)
#define IPC_CAS(dst, exp, desired, succ, fail) \
rt_atomic_compare_exchange_strong(dst, exp, desired)
#endif /* RT_USING_STDC_ATOMIC */
static rt_err_t _comp_susp_thread(struct rt_completion *completion,
rt_thread_t thread, rt_int32_t timeout,
int suspend_flag);
/**
* @brief This function will initialize a completion object.
*
* @param completion is a pointer to a completion object.
*/
void rt_completion_init(struct rt_completion *completion)
{
RT_ASSERT(completion != RT_NULL);
IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
memory_order_relaxed);
}
RTM_EXPORT(rt_completion_init);
/**
* @brief This function will wait for a completion, if the completion is unavailable, the thread shall wait for
* the completion up to a specified time.
*
* @param completion is a pointer to a completion object.
*
* @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
* the completion done up to the amount of time specified by the argument.
* NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
* completion is unavailable, the thread will be waitting forever.
* @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
*
* @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
* If the return value is any other values, it means that the completion wait failed.
*
* @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
*/
rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
rt_int32_t timeout, int suspend_flag)
{
rt_err_t result = -RT_ERROR;
rt_thread_t thread;
rt_bool_t exchange_succ;
rt_base_t expected_value;
RT_ASSERT(completion != RT_NULL);
/* current context checking */
RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
thread = rt_thread_self();
do
{
/* try to consume one completion */
expected_value = RT_COMPLETED;
exchange_succ =
IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
RT_UNCOMPLETED, memory_order_acquire, memory_order_relaxed);
if (exchange_succ)
{
/* consume succeed, now return EOK */
result = RT_EOK;
break;
}
else if (expected_value == RT_WAKING)
{
/* previous wake is not done yet, yield thread & try again */
rt_thread_yield();
}
else
{
/**
* API rules say: only one thread can suspend on complete.
* So we assert if debug.
*/
RT_ASSERT(expected_value == RT_UNCOMPLETED);
if (timeout != 0)
{
/**
* try to occupy completion, noted that we are assuming that
* `expected_value == RT_UNCOMPLETED`
*/
exchange_succ = IPC_CAS(
&completion->susp_thread_n_flag, &expected_value,
RT_OCCUPIED, memory_order_relaxed, memory_order_relaxed);
if (exchange_succ)
{
/* complete waiting business and return result */
result = _comp_susp_thread(completion, thread, timeout,
suspend_flag);
RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
RT_OCCUPIED);
break;
}
else
{
/* try again */
}
}
else
{
result = -RT_ETIMEOUT;
break;
}
}
} while (1);
return result;
}
static rt_base_t _wait_until_update(struct rt_completion *completion, rt_base_t expected)
{
rt_base_t current_value;
/* spinning for update */
do
{
rt_hw_isb();
current_value =
IPC_LOAD(&completion->susp_thread_n_flag, memory_order_relaxed);
} while (current_value == expected);
return current_value;
}
/**
* Try to suspend thread and update completion
*/
static rt_err_t _comp_susp_thread(struct rt_completion *completion,
rt_thread_t thread, rt_int32_t timeout,
int suspend_flag)
{
rt_err_t error = -RT_ERROR;
rt_base_t clevel;
rt_base_t comp_waiting;
/* suspend thread */
clevel = rt_enter_critical();
/* reset thread error number */
thread->error = RT_EOK;
error = rt_thread_suspend_with_flag(thread, suspend_flag);
if (error)
{
rt_exit_critical_safe(clevel);
RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
RT_OCCUPIED);
IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
memory_order_relaxed);
}
else
{
/* set to waiting */
comp_waiting = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED);
RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
RT_OCCUPIED);
IPC_STORE(&completion->susp_thread_n_flag, comp_waiting,
memory_order_relaxed);
/* current context checking */
RT_DEBUG_NOT_IN_INTERRUPT;
/* start timer */
if (timeout > 0)
{
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* do schedule */
rt_schedule();
rt_exit_critical_safe(clevel);
/* thread is woken up */
error = thread->error;
error = error > 0 ? -error : error;
/* clean completed flag & remove susp_thread on the case of waking by timeout */
if (!error)
{
/* completion done successfully */
RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
comp_waiting);
/* the necessary barrier is done during thread sched */
}
else
{
/* try to cancel waiting if woken up expectedly or timeout */
if (!IPC_CAS(&completion->susp_thread_n_flag, &comp_waiting,
RT_UNCOMPLETED, memory_order_relaxed,
memory_order_relaxed))
{
/* cancel failed, producer had woken us in the past, fix error */
if (comp_waiting == RT_WAKING)
{
_wait_until_update(completion, RT_WAKING);
}
IPC_BARRIER(memory_order_acquire);
error = RT_EOK;
}
}
}
return error;
}
/**
* @brief This function indicates a completion has done and wakeup the thread
* and update its errno. No update is applied if it's a negative value.
*
* @param completion is a pointer to a completion object.
* @param thread_errno is the errno set to waking thread.
* @return RT_EOK if wakeup succeed.
* RT_EEMPTY if wakeup failure and the completion is set to completed.
* RT_EBUSY if the completion is still in completed state
*/
rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion,
rt_err_t thread_errno)
{
rt_err_t error = -RT_ERROR;
rt_thread_t suspend_thread;
rt_bool_t exchange_succ;
rt_base_t expected_value;
RT_ASSERT(completion != RT_NULL);
do
{
/* try to transform from uncompleted to completed */
expected_value = RT_UNCOMPLETED;
exchange_succ =
IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
RT_COMPLETED, memory_order_release, memory_order_relaxed);
if (exchange_succ)
{
error = -RT_EEMPTY;
break;
}
else
{
if (expected_value == RT_COMPLETED)
{
/* completion still in completed state */
error = -RT_EBUSY;
break;
}
else if (expected_value == RT_OCCUPIED ||
expected_value == RT_WAKING)
{
continue;
}
else
{
/* try to resume the thread and set uncompleted */
exchange_succ = IPC_CAS(
&completion->susp_thread_n_flag, &expected_value,
RT_WAKING, memory_order_relaxed, memory_order_relaxed);
if (exchange_succ)
{
#define GET_THREAD(val) ((rt_thread_t)((val) & ~1))
suspend_thread = GET_THREAD(expected_value);
if (thread_errno >= 0)
{
suspend_thread->error = thread_errno;
}
/* safe to assume publication done even on resume failure */
rt_thread_resume(suspend_thread);
RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
RT_WAKING);
IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
memory_order_release);
error = RT_EOK;
break;
}
else
{
/* failed in racing to resume thread, try again */
}
}
}
} while (1);
return error;
}

View File

@ -19,6 +19,11 @@
#include <rthw.h> #include <rthw.h>
#include <rtdevice.h> #include <rtdevice.h>
/**
* This is an implementation of completion core on UP system.
* Noted that spinlock is (preempt_lock + irq_mask) on UP scheduler.
*/
#define RT_COMPLETED 1 #define RT_COMPLETED 1
#define RT_UNCOMPLETED 0 #define RT_UNCOMPLETED 0
#define RT_COMPLETION_FLAG(comp) ((comp)->susp_thread_n_flag & 1) #define RT_COMPLETION_FLAG(comp) ((comp)->susp_thread_n_flag & 1)
@ -50,14 +55,15 @@ RTM_EXPORT(rt_completion_init);
* the completion done up to the amount of time specified by the argument. * the completion done up to the amount of time specified by the argument.
* NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the * NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
* completion is unavailable, the thread will be waitting forever. * completion is unavailable, the thread will be waitting forever.
* @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
* *
* @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful. * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
* If the return value is any other values, it means that the completion wait failed. * If the return value is any other values, it means that the completion wait failed.
* *
* @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context. * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
*/ */
rt_err_t rt_completion_wait(struct rt_completion *completion, rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
rt_int32_t timeout) rt_int32_t timeout, int suspend_flag)
{ {
rt_err_t result; rt_err_t result;
rt_base_t level; rt_base_t level;
@ -71,6 +77,8 @@ rt_err_t rt_completion_wait(struct rt_completion *completion,
thread = rt_thread_self(); thread = rt_thread_self();
level = rt_spin_lock_irqsave(&_completion_lock); level = rt_spin_lock_irqsave(&_completion_lock);
__try_again:
if (RT_COMPLETION_FLAG(completion) != RT_COMPLETED) if (RT_COMPLETION_FLAG(completion) != RT_COMPLETED)
{ {
/* only one thread can suspend on complete */ /* only one thread can suspend on complete */
@ -87,11 +95,12 @@ rt_err_t rt_completion_wait(struct rt_completion *completion,
thread->error = RT_EOK; thread->error = RT_EOK;
/* suspend thread */ /* suspend thread */
result = rt_thread_suspend_with_flag(thread, RT_UNINTERRUPTIBLE); result = rt_thread_suspend_with_flag(thread, suspend_flag);
if (result == RT_EOK) if (result == RT_EOK)
{ {
/* add to suspended thread */ /* add to suspended thread */
completion->susp_thread_n_flag = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED); rt_base_t waiting_stat = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED);
completion->susp_thread_n_flag = waiting_stat;
/* current context checking */ /* current context checking */
RT_DEBUG_NOT_IN_INTERRUPT; RT_DEBUG_NOT_IN_INTERRUPT;
@ -111,10 +120,21 @@ rt_err_t rt_completion_wait(struct rt_completion *completion,
/* do schedule */ /* do schedule */
rt_schedule(); rt_schedule();
/* thread is waked up */
result = thread->error;
level = rt_spin_lock_irqsave(&_completion_lock); level = rt_spin_lock_irqsave(&_completion_lock);
if (completion->susp_thread_n_flag != waiting_stat)
{
/* completion may be completed after we suspend */
timeout = 0;
goto __try_again;
}
else
{
/* no changes, waiting failed */
result = thread->error;
result = result > 0 ? -result : result;
RT_ASSERT(result != RT_EOK);
}
} }
} }
} }
@ -130,11 +150,17 @@ __exit:
RTM_EXPORT(rt_completion_wait); RTM_EXPORT(rt_completion_wait);
/** /**
* @brief This function indicates a completion has done. * @brief This function indicates a completion has done and wakeup the thread
* and update its errno. No update is applied if it's a negative value.
* *
* @param completion is a pointer to a completion object. * @param completion is a pointer to a completion object.
* @param thread_errno is the errno set to waking thread.
* @return RT_EOK if wakeup succeed.
* RT_EEMPTY if wakeup failure and the completion is set to completed.
* RT_EBUSY if the completion is still in completed state
*/ */
static int _completion_done(struct rt_completion *completion) rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion,
rt_err_t thread_errno)
{ {
rt_base_t level; rt_base_t level;
rt_err_t error; rt_err_t error;
@ -153,11 +179,16 @@ static int _completion_done(struct rt_completion *completion)
{ {
/* there is one thread in suspended list */ /* there is one thread in suspended list */
/* resume it */ if (thread_errno >= 0)
{
suspend_thread->error = thread_errno;
}
error = rt_thread_resume(suspend_thread); error = rt_thread_resume(suspend_thread);
if (error) if (error)
{ {
LOG_D("%s: failed to resume thread", __func__); LOG_D("%s: failed to resume thread with %d", __func__, error);
error = -RT_EEMPTY;
} }
} }
else else
@ -172,26 +203,3 @@ static int _completion_done(struct rt_completion *completion)
return error; return error;
} }
/**
* @brief This function indicates a completion has done.
*
* @param completion is a pointer to a completion object.
*/
void rt_completion_done(struct rt_completion *completion)
{
_completion_done(completion);
}
RTM_EXPORT(rt_completion_done);
/**
* @brief This function indicates a completion has done and wakeup the thread
*
* @param completion is a pointer to a completion object.
*/
rt_err_t rt_completion_wakeup(struct rt_completion *completion)
{
return _completion_done(completion);
}
RTM_EXPORT(rt_completion_wakeup);

View File

@ -6,6 +6,7 @@
* Change Logs: * Change Logs:
* Date Author Notes * Date Author Notes
* 2023-07-10 xqyjlj The first version. * 2023-07-10 xqyjlj The first version.
* 2024-04-26 Shell Improve ipc performance
*/ */
#ifndef __KTIME_H__ #ifndef __KTIME_H__
@ -13,6 +14,7 @@
#include <stdint.h> #include <stdint.h>
#include <sys/time.h> #include <sys/time.h>
#include <ipc/completion.h>
#include "rtthread.h" #include "rtthread.h"
@ -20,13 +22,13 @@
struct rt_ktime_hrtimer struct rt_ktime_hrtimer
{ {
struct rt_object parent; /**< inherit from rt_object */ struct rt_object parent; /**< inherit from rt_object */
rt_list_t row; rt_list_t row;
void *parameter; void *parameter;
unsigned long init_cnt; unsigned long init_cnt;
unsigned long timeout_cnt; unsigned long timeout_cnt;
rt_err_t error; rt_err_t error;
struct rt_semaphore sem; struct rt_completion completion;
void (*timeout_func)(void *parameter); void (*timeout_func)(void *parameter);
}; };
typedef struct rt_ktime_hrtimer *rt_ktime_hrtimer_t; typedef struct rt_ktime_hrtimer *rt_ktime_hrtimer_t;

View File

@ -107,9 +107,8 @@ static unsigned long _cnt_convert(unsigned long cnt)
static void _sleep_timeout(void *parameter) static void _sleep_timeout(void *parameter)
{ {
struct rt_semaphore *sem; struct rt_ktime_hrtimer *timer = parameter;
sem = (struct rt_semaphore *)parameter; rt_completion_done(&timer->completion);
rt_sem_release(sem);
} }
static void _set_next_timeout(void); static void _set_next_timeout(void);
@ -119,18 +118,15 @@ static void _timeout_callback(void *parameter)
timer = (rt_ktime_hrtimer_t)parameter; timer = (rt_ktime_hrtimer_t)parameter;
rt_base_t level; rt_base_t level;
if (timer->parent.flag & RT_TIMER_FLAG_ACTIVATED)
{
timer->timeout_func(timer->parameter);
}
level = rt_spin_lock_irqsave(&_spinlock); level = rt_spin_lock_irqsave(&_spinlock);
_nowtimer = RT_NULL; _nowtimer = RT_NULL;
rt_list_remove(&(timer->row)); rt_list_remove(&(timer->row));
if (timer->parent.flag & RT_TIMER_FLAG_ACTIVATED) rt_spin_unlock_irqrestore(&_spinlock, level);
{
rt_spin_unlock_irqrestore(&_spinlock, level);
timer->timeout_func(timer->parameter);
}
else
{
rt_spin_unlock_irqrestore(&_spinlock, level);
}
_set_next_timeout(); _set_next_timeout();
} }
@ -195,7 +191,7 @@ void rt_ktime_hrtimer_init(rt_ktime_hrtimer_t timer,
timer->init_cnt = cnt; timer->init_cnt = cnt;
rt_list_init(&(timer->row)); rt_list_init(&(timer->row));
rt_sem_init(&(timer->sem), "hrtimer", 0, RT_IPC_FLAG_PRIO); rt_completion_init(&timer->completion);
} }
rt_err_t rt_ktime_hrtimer_start(rt_ktime_hrtimer_t timer) rt_err_t rt_ktime_hrtimer_start(rt_ktime_hrtimer_t timer)
@ -206,6 +202,9 @@ rt_err_t rt_ktime_hrtimer_start(rt_ktime_hrtimer_t timer)
/* parameter check */ /* parameter check */
RT_ASSERT(timer != RT_NULL); RT_ASSERT(timer != RT_NULL);
/* notify the timer stop event */
rt_completion_wakeup_by_errno(&timer->completion, RT_ERROR);
level = rt_spin_lock_irqsave(&_spinlock); level = rt_spin_lock_irqsave(&_spinlock);
rt_list_remove(&timer->row); /* remove timer from list */ rt_list_remove(&timer->row); /* remove timer from list */
/* change status of timer */ /* change status of timer */
@ -333,6 +332,9 @@ rt_err_t rt_ktime_hrtimer_detach(rt_ktime_hrtimer_t timer)
/* parameter check */ /* parameter check */
RT_ASSERT(timer != RT_NULL); RT_ASSERT(timer != RT_NULL);
/* notify the timer stop event */
rt_completion_wakeup_by_errno(&timer->completion, RT_ERROR);
level = rt_spin_lock_irqsave(&_spinlock); level = rt_spin_lock_irqsave(&_spinlock);
/* stop timer */ /* stop timer */
@ -349,7 +351,6 @@ rt_err_t rt_ktime_hrtimer_detach(rt_ktime_hrtimer_t timer)
{ {
rt_spin_unlock_irqrestore(&_spinlock, level); rt_spin_unlock_irqrestore(&_spinlock, level);
} }
rt_sem_detach(&(timer->sem));
return RT_EOK; return RT_EOK;
} }
@ -359,7 +360,7 @@ rt_err_t rt_ktime_hrtimer_detach(rt_ktime_hrtimer_t timer)
void rt_ktime_hrtimer_delay_init(struct rt_ktime_hrtimer *timer) void rt_ktime_hrtimer_delay_init(struct rt_ktime_hrtimer *timer)
{ {
rt_ktime_hrtimer_init(timer, "hrtimer_sleep", 0, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_HARD_TIMER, rt_ktime_hrtimer_init(timer, "hrtimer_sleep", 0, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_HARD_TIMER,
_sleep_timeout, &(timer->sem)); _sleep_timeout, timer);
} }
void rt_ktime_hrtimer_delay_detach(struct rt_ktime_hrtimer *timer) void rt_ktime_hrtimer_delay_detach(struct rt_ktime_hrtimer *timer)
@ -378,7 +379,8 @@ rt_err_t rt_ktime_hrtimer_sleep(struct rt_ktime_hrtimer *timer, unsigned long cn
timer->init_cnt = cnt; timer->init_cnt = cnt;
rt_ktime_hrtimer_start(timer); /* reset the timeout of thread timer and start it */ rt_ktime_hrtimer_start(timer); /* reset the timeout of thread timer and start it */
err = rt_sem_take_interruptible(&(timer->sem), RT_WAITING_FOREVER); err = rt_completion_wait_flags(&(timer->completion), RT_WAITING_FOREVER,
RT_INTERRUPTIBLE);
rt_ktime_hrtimer_keep_errno(timer, err); rt_ktime_hrtimer_keep_errno(timer, err);
return RT_EOK; return RT_EOK;

View File

@ -11,6 +11,7 @@ source "$RTT_DIR/examples/utest/testcases/utest/Kconfig"
source "$RTT_DIR/examples/utest/testcases/kernel/Kconfig" source "$RTT_DIR/examples/utest/testcases/kernel/Kconfig"
source "$RTT_DIR/examples/utest/testcases/cpp11/Kconfig" source "$RTT_DIR/examples/utest/testcases/cpp11/Kconfig"
source "$RTT_DIR/examples/utest/testcases/drivers/serial_v2/Kconfig" source "$RTT_DIR/examples/utest/testcases/drivers/serial_v2/Kconfig"
source "$RTT_DIR/examples/utest/testcases/drivers/ipc/Kconfig"
source "$RTT_DIR/examples/utest/testcases/posix/Kconfig" source "$RTT_DIR/examples/utest/testcases/posix/Kconfig"
source "$RTT_DIR/examples/utest/testcases/mm/Kconfig" source "$RTT_DIR/examples/utest/testcases/mm/Kconfig"

View File

@ -0,0 +1,7 @@
menu "Utest IPC Testcase"
config UTEST_COMPLETION_TC
bool "rt_completion testcase"
default n
endmenu

View File

@ -0,0 +1,13 @@
Import('rtconfig')
from building import *
cwd = GetCurrentDir()
src = []
CPPPATH = [cwd]
if GetDepend(['UTEST_COMPLETION_TC']):
src += ['completion_tc.c', 'completion_timeout_tc.c']
group = DefineGroup('utestcases', src, depend = ['RT_USING_UTESTCASES'], CPPPATH = CPPPATH)
Return('group')

View File

@ -0,0 +1,199 @@
/*
* Copyright (c) 2006-2024, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2024-04-30 Shell init ver.
*/
/**
* Test Case for rt_completion API
*
* The test simulates a producer-consumer interaction where a producer thread
* generates data, and a consumer thread consumes the data after waiting for its
* availability using rt_completion synchronization primitives.
*
* Test Criteria:
* - The producer should correctly increment the test data and signal
* completion.
* - The consumer should correctly wait for data update, consume it, and signal
* completion.
* - Data integrity should be maintained between producer and consumer.
* - Synchronization is properly done so both can see consistent data.
* - Random latency is introduced to simulate racing scenarios.
*/
#define TEST_LATENCY_TICK (1)
#define TEST_LOOP_TIMES (60 * RT_TICK_PER_SECOND)
#define TEST_PROGRESS_ON (RT_TICK_PER_SECOND)
#include "utest.h"
#include <ipc/completion.h>
#include <rtthread.h>
#include <stdlib.h>
static struct rt_completion _prod_completion;
static struct rt_completion _cons_completion;
static int _test_data = 0;
static rt_atomic_t _progress_counter;
static struct rt_semaphore _thr_exit_sem;
static void done_safely(struct rt_completion *completion)
{
rt_err_t error;
/* Signal completion */
error = rt_completion_wakeup(completion);
/* try again if failed to produce */
if (error == -RT_EEMPTY)
{
rt_thread_yield();
}
else if (error)
{
uassert_false(0);
rt_thread_delete(rt_thread_self());
}
}
static void wait_safely(struct rt_completion *completion)
{
rt_err_t error;
do
{
error = rt_completion_wait_flags(completion, RT_WAITING_FOREVER,
RT_INTERRUPTIBLE);
if (error)
{
uassert_true(error == -RT_EINTR);
rt_thread_yield();
}
else
{
break;
}
} while (1);
}
static void producer_thread_entry(void *parameter)
{
for (size_t i = 0; i < TEST_LOOP_TIMES; i++)
{
/* Produce data */
_test_data++;
/* notify consumer */
done_safely(&_prod_completion);
/* Delay before producing next data */
rt_thread_delay(TEST_LATENCY_TICK);
/* sync with consumer */
wait_safely(&_cons_completion);
}
rt_sem_release(&_thr_exit_sem);
}
static void _wait_until_edge(void)
{
rt_tick_t entry_level, current;
rt_base_t random_latency;
entry_level = rt_tick_get();
do
{
current = rt_tick_get();
} while (current == entry_level);
/* give a random latency for test */
random_latency = rand();
entry_level = current;
for (size_t i = 0; i < random_latency; i++)
{
current = rt_tick_get();
if (current != entry_level) break;
}
}
static void consumer_thread_entry(void *parameter)
{
int local_test_data = 0;
rt_thread_startup(parameter);
for (size_t i = 0; i < TEST_LOOP_TIMES; i++)
{
/* add more random case for test */
_wait_until_edge();
/* Wait for data update */
wait_safely(&_prod_completion);
/* Consume data */
if (local_test_data + 1 != _test_data)
{
LOG_I("local test data is %d, shared test data is %d",
local_test_data, _test_data);
uassert_true(0);
}
else if (rt_atomic_add(&_progress_counter, 1) % TEST_PROGRESS_ON == 0)
{
uassert_true(1);
}
local_test_data = _test_data;
done_safely(&_cons_completion);
}
rt_sem_release(&_thr_exit_sem);
}
static void testcase(void)
{
/* Initialize completion object */
rt_completion_init(&_prod_completion);
rt_completion_init(&_cons_completion);
/* Create producer and consumer threads */
rt_thread_t producer_thread =
rt_thread_create("producer", producer_thread_entry, RT_NULL,
UTEST_THR_STACK_SIZE, UTEST_THR_PRIORITY, 100);
rt_thread_t consumer_thread =
rt_thread_create("consumer", consumer_thread_entry, producer_thread,
UTEST_THR_STACK_SIZE, UTEST_THR_PRIORITY, 100);
uassert_true(producer_thread != RT_NULL);
uassert_true(consumer_thread != RT_NULL);
LOG_I("Summary:\n"
"\tTest times: %ds(%d)",
TEST_LOOP_TIMES / RT_TICK_PER_SECOND, TEST_LOOP_TIMES);
rt_thread_startup(consumer_thread);
for (size_t i = 0; i < 2; i++)
{
rt_sem_take(&_thr_exit_sem, RT_WAITING_FOREVER);
}
}
static rt_err_t utest_tc_init(void)
{
_test_data = 0;
_progress_counter = 0;
rt_sem_init(&_thr_exit_sem, "test", 0, RT_IPC_FLAG_PRIO);
return RT_EOK;
}
static rt_err_t utest_tc_cleanup(void)
{
rt_sem_detach(&_thr_exit_sem);
return RT_EOK;
}
UTEST_TC_EXPORT(testcase, "testcases.drivers.ipc.rt_completion.basic",
utest_tc_init, utest_tc_cleanup, 10);

View File

@ -0,0 +1,213 @@
/*
* Copyright (c) 2006-2024, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2024-04-30 Shell init ver.
*/
/**
* Test Case for rt_completion API
*
* The test simulates a producer-consumer interaction where a producer thread
* generates data, and a consumer thread consumes the data after waiting for its
* availability using rt_completion synchronization primitives.
*
* Test Criteria:
* - The producer produces data correctly and notifies the consumer thread.
* - The consumer receives data correctly and acknowledges receipt to the
* producer.
* - The producer and consumer threads synchronize their operations effectively.
* - Verify the correctness of data production and consumption between producer
* and consumer threads.
* - The asynchronous woken of consumer thread was handled properly so the
* consumer don't lose woken from producer.
*
* Test APIs:
* - rt_completion_init()
* - rt_completion_wakeup()
* - rt_completion_wait_flags()
*/
#define TEST_LATENCY_TICK (1)
#define TEST_LOOP_TIMES (60 * RT_TICK_PER_SECOND)
#define TEST_PROGRESS_ON (RT_TICK_PER_SECOND)
#include "utest.h"
#include <ipc/completion.h>
#include <rtthread.h>
#include <stdlib.h>
static struct rt_completion _prod_completion;
static struct rt_completion _cons_completion;
static int _test_data;
static int _async_intr_count;
static rt_atomic_t _progress_counter;
static struct rt_semaphore _thr_exit_sem;
static void _test_thread_exit_failure(char *string)
{
LOG_E("\t[TEST failed] %s", string);
rt_sem_release(&_thr_exit_sem);
rt_thread_delete(rt_thread_self());
}
static void done_safely(struct rt_completion *completion)
{
rt_err_t error;
/* Signal completion */
error = rt_completion_wakeup(completion);
/* try again if failed to produce */
if (error == -RT_EEMPTY)
{
rt_thread_yield();
}
else if (error)
{
uassert_true(error == RT_EOK);
_test_thread_exit_failure("unexpected error");
}
}
static void wait_safely(struct rt_completion *completion)
{
int try_times = 3;
rt_err_t error;
do
{
/* wait for one tick, to add more random */
error = rt_completion_wait_flags(completion, 1, RT_INTERRUPTIBLE);
if (error)
{
if (error == -RT_ETIMEOUT || error == -RT_EINTR)
{
_async_intr_count++;
}
else
{
LOG_I("Async event %d\n", error);
uassert_true(0);
}
rt_thread_yield();
}
else
{
break;
}
} while (try_times--);
if (error != RT_EOK)
{
uassert_true(error == RT_EOK);
_test_thread_exit_failure("wait failed");
}
}
static void producer_thread_entry(void *parameter)
{
for (size_t i = 0; i < TEST_LOOP_TIMES; i++)
{
/* Produce data */
_test_data++;
/* Delay before producing next data */
rt_thread_delay(TEST_LATENCY_TICK);
/* notify consumer */
done_safely(&_prod_completion);
/* sync with consumer */
wait_safely(&_cons_completion);
}
rt_sem_release(&_thr_exit_sem);
}
static void consumer_thread_entry(void *parameter)
{
int local_test_data = 0;
rt_thread_startup(parameter);
for (size_t i = 0; i < TEST_LOOP_TIMES; i++)
{
/* Wait for data update */
wait_safely(&_prod_completion);
/* Consume data */
if (local_test_data + 1 != _test_data)
{
LOG_I("local test data is %d, shared test data is %d",
local_test_data, _test_data);
uassert_true(0);
}
else if (rt_atomic_add(&_progress_counter, 1) % TEST_PROGRESS_ON == 0)
{
uassert_true(1);
}
local_test_data = _test_data;
done_safely(&_cons_completion);
}
rt_sem_release(&_thr_exit_sem);
}
rt_thread_t _watching_thread1;
rt_thread_t _watching_thread2;
static void testcase(void)
{
/* Initialize completion object */
rt_completion_init(&_prod_completion);
rt_completion_init(&_cons_completion);
/* Create producer and consumer threads */
rt_thread_t producer_thread =
rt_thread_create("producer", producer_thread_entry, RT_NULL,
UTEST_THR_STACK_SIZE, UTEST_THR_PRIORITY, 100);
rt_thread_t consumer_thread =
rt_thread_create("consumer", consumer_thread_entry, producer_thread,
UTEST_THR_STACK_SIZE, UTEST_THR_PRIORITY, 100);
uassert_true(producer_thread != RT_NULL);
uassert_true(consumer_thread != RT_NULL);
_watching_thread1 = consumer_thread;
_watching_thread2 = producer_thread;
rt_thread_startup(consumer_thread);
for (size_t i = 0; i < 2; i++)
{
rt_sem_take(&_thr_exit_sem, RT_WAITING_FOREVER);
}
LOG_I("Summary:\n"
"\tTest times: %ds(%d times)\n"
"\tAsync interruption count: %d\n",
TEST_LOOP_TIMES / RT_TICK_PER_SECOND, TEST_LOOP_TIMES,
_async_intr_count);
}
static rt_err_t utest_tc_init(void)
{
_test_data = 0;
_progress_counter = 0;
_async_intr_count = 0;
rt_sem_init(&_thr_exit_sem, "test", 0, RT_IPC_FLAG_PRIO);
return RT_EOK;
}
static rt_err_t utest_tc_cleanup(void)
{
rt_sem_detach(&_thr_exit_sem);
return RT_EOK;
}
UTEST_TC_EXPORT(testcase, "testcases.drivers.ipc.rt_completion.timeout",
utest_tc_init, utest_tc_cleanup, 1000);

View File

@ -227,7 +227,8 @@ void rt_hw_secondary_cpu_up(void);
* secondary cpu idle function * secondary cpu idle function
*/ */
void rt_hw_secondary_cpu_idle_exec(void); void rt_hw_secondary_cpu_idle_exec(void);
#else
#else /* !RT_USING_SMP */
#define RT_DEFINE_HW_SPINLOCK(x) rt_ubase_t x #define RT_DEFINE_HW_SPINLOCK(x) rt_ubase_t x
@ -235,13 +236,13 @@ void rt_hw_secondary_cpu_idle_exec(void);
#define rt_hw_spin_unlock(lock) rt_hw_interrupt_enable(*(lock)) #define rt_hw_spin_unlock(lock) rt_hw_interrupt_enable(*(lock))
#endif #endif /* RT_USING_SMP */
#ifndef RT_USING_CACHE #ifndef RT_USING_CACHE
#define rt_hw_isb() #define rt_hw_isb()
#define rt_hw_dmb() #define rt_hw_dmb()
#define rt_hw_dsb() #define rt_hw_dsb()
#endif #endif /* RT_USING_CACHE */
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -0,0 +1,17 @@
/*
* Copyright (c) 2006-2024, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
* Change Logs:
* Date Author Notes
* 2024-04-26 Shell lockless rt_completion
*/
#include <rtthread.h>
#undef rt_hw_isb
rt_weak void rt_hw_isb(void)
{
return ;
}

View File

@ -35,6 +35,7 @@ config RT_USING_SMART
select RT_USING_POSIX_TERMIOS select RT_USING_POSIX_TERMIOS
select RT_USING_KTIME select RT_USING_KTIME
select RT_USING_STDC_ATOMIC select RT_USING_STDC_ATOMIC
select RT_USING_SYSTEM_WORKQUEUE
depends on ARCH_ARM_CORTEX_M || ARCH_ARM_ARM9 || ARCH_ARM_CORTEX_A || ARCH_ARMV8 || ARCH_RISCV64 depends on ARCH_ARM_CORTEX_M || ARCH_ARM_ARM9 || ARCH_ARM_CORTEX_A || ARCH_ARMV8 || ARCH_RISCV64
depends on !RT_USING_NANO depends on !RT_USING_NANO
help help