From 1b014359b07799cf77a3045b78b285ffbb9b12b6 Mon Sep 17 00:00:00 2001 From: "bernard.xiong@gmail.com" Date: Wed, 24 Nov 2010 00:04:16 +0000 Subject: [PATCH] rewrite posix semaphore and mqueue implementation. git-svn-id: https://rt-thread.googlecode.com/svn/trunk@1103 bbd45198-f89e-11dd-88c7-29a3b14d5316 --- components/pthreads/mqueue.c | 224 +++++++++++++++-- components/pthreads/mqueue.h | 21 +- components/pthreads/pthread.c | 4 + components/pthreads/pthread_internal.h | 2 + components/pthreads/semaphore.c | 318 +++++++++++++++++++++---- components/pthreads/semaphore.h | 22 +- 6 files changed, 524 insertions(+), 67 deletions(-) diff --git a/components/pthreads/mqueue.c b/components/pthreads/mqueue.c index a3cc2c7c73..bd31b4450a 100644 --- a/components/pthreads/mqueue.c +++ b/components/pthreads/mqueue.c @@ -5,6 +5,65 @@ #include #include +static posix_mq_t* posix_mq_list = RT_NULL; +static struct rt_semaphore posix_mq_lock; +void posix_mq_system_init() +{ + rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO); +} + +rt_inline void posix_mq_insert(posix_mq_t *pmq) +{ + pmq->next = posix_mq_list; + posix_mq_list = pmq; +} + +static void posix_mq_delete(posix_mq_t *pmq) +{ + posix_mq_t *iter; + if (posix_mq_list == pmq) + { + posix_mq_list = pmq->next; + + rt_mq_delete(pmq->mq); + rt_free(pmq); + + return; + } + for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next) + { + if (iter->next == pmq) + { + /* delete this mq */ + if (pmq->next != RT_NULL) + iter->next = pmq->next; + else + iter->next = RT_NULL; + + /* delete RT-Thread mqueue */ + rt_mq_delete(pmq->mq); + rt_free(pmq); + return ; + } + } +} + +static posix_mq_t *posix_mq_find(const char* name) +{ + posix_mq_t *iter; + rt_object_t object; + + for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next) + { + object = (rt_object_t)&(iter->mq); + + if (strncmp(object->name, name, RT_NAME_MAX) == 0) + { + return iter; + } + } +} + int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) { @@ -20,8 +79,8 @@ int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) return -1; } - mqstat->mq_maxmsg = mqdes->max_msgs; - mqstat->mq_msgsize = mqdes->msg_size; + mqstat->mq_maxmsg = mqdes->mq->mq->max_msgs; + mqstat->mq_msgsize = mqdes->mq->mq->msg_size; mqstat->mq_curmsgs = 0; mqstat->mq_flags = 0; @@ -30,11 +89,15 @@ int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) mqd_t mq_open(const char *name, int oflag, ...) { - rt_mq_t mq; + mqd_t mqdes; va_list arg; mode_t mode; struct mq_attr *attr = RT_NULL; + /* lock posix mqueue list */ + rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); + + mqdes = RT_NULL; if (oflag & O_CREAT) { va_start(arg, oflag); @@ -42,21 +105,82 @@ mqd_t mq_open(const char *name, int oflag, ...) attr = (struct mq_attr *) va_arg(arg, struct mq_attr *); va_end(arg); - mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO); - if (mq == RT_NULL) /* create failed */ + if (oflag & O_EXCL) + { + if (posix_mq_find(name) != RT_NULL) + { + rt_set_errno(EEXIST); + goto __return; + } + } + mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes)); + if (mqdes == RT_NULL) + { + rt_set_errno(ENFILE); + goto __return; + } + + mqdes->flags = oflag; + mqdes->mq = (posix_mq_t*) rt_malloc (sizeof(posix_mq_t)); + if (mqdes->mq == RT_NULL) + { + rt_set_errno(ENFILE); + goto __return; + } + + /* create RT-Thread message queue */ + mqdes->mq->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO); + if (mqdes->mq->mq == RT_NULL) /* create failed */ { rt_set_errno(ENFILE); - return RT_NULL; + goto __return; + } + /* initialize reference count */ + mqdes->mq->refcount = 1; + mqdes->mq->unlinked = 0; + + /* insert mq to posix mq list */ + posix_mq_insert(mqdes->mq); + } + else + { + posix_mq_t *mq; + + /* find mqueue */ + mq = posix_mq_find(name); + if (mq != RT_NULL) + { + mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes)); + mqdes->mq = mq; + mqdes->flags = oflag; + mq->refcount ++; /* increase reference count */ + } + else + { + rt_set_errno(ENOENT); + goto __return; } } + rt_sem_release(&posix_mq_lock); + return mqdes; - if (oflag & O_EXCL) +__return: + /* release lock */ + rt_sem_release(&posix_mq_lock); + + /* release allocated memory */ + if (mqdes != RT_NULL) { - mq = (rt_mq_t)rt_object_find(name, RT_Object_Class_MessageQueue); - if (mq == RT_NULL) rt_set_errno(ENOSPC); + if (mqdes->mq != RT_NULL) + { + /* delete RT-Thread message queue */ + if (mqdes->mq->mq != RT_NULL) + rt_mq_delete(mqdes->mq->mq); + rt_free(mqdes->mq); + } + rt_free(mqdes); } - - return mq; + return RT_NULL; } ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio) @@ -69,7 +193,14 @@ ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_pri return -1; } - result = rt_mq_recv(mqdes, msg_ptr, msg_len, RT_WAITING_FOREVER); + /* permission check */ + if (!(mqdes->flags & O_RDONLY)) + { + rt_set_errno(EBADF); + return -1; + } + + result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, RT_WAITING_FOREVER); if (result == RT_EOK) return msg_len; @@ -87,7 +218,14 @@ int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio) return -1; } - result = rt_mq_send(mqdes, msg_ptr, msg_len); + /* permission check */ + if (!(mqdes->flags & O_WRONLY)) + { + rt_set_errno(EBADF); + return -1; + } + + result = rt_mq_send(mqdes->mq->mq, (void*)msg_ptr, msg_len); if (result == RT_EOK) return 0; @@ -101,14 +239,22 @@ ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, int tick; rt_err_t result; + /* parameters check */ if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL)) { rt_set_errno(EINVAL); return -1; } + /* permission check */ + if (!(mqdes->flags & O_RDONLY)) + { + rt_set_errno(EBADF); + return -1; + } + tick = libc_time_to_tick(abs_timeout); - result = rt_mq_recv(mqdes, msg_ptr, msg_len, tick); + result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, tick); if (result == RT_EOK) return msg_len; if (result == -RT_ETIMEOUT) @@ -134,21 +280,49 @@ int mq_notify(mqd_t mqdes, const struct sigevent *notification) int mq_close(mqd_t mqdes) { - return 0; + if (mqdes == RT_NULL) + { + rt_set_errno(EINVAL); + return -1; + } + + /* lock posix mqueue list */ + rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); + mqdes->mq->refcount --; + if (mqdes->mq->refcount == 0) + { + /* delete from posix mqueue list */ + if (mqdes->mq->unlinked) + posix_mq_delete(mqdes->mq); + mqdes->mq = RT_NULL; + } + rt_sem_release(&posix_mq_lock); + + rt_free(mqdes); + return 0; } int mq_unlink(const char *name) { - rt_mq_t mq; + posix_mq_t *pmq; - mq = (rt_mq_t)rt_object_find(name, RT_Object_Class_MessageQueue); - if (mq == RT_NULL) - { - rt_set_errno(ENOENT); - return -1; - } + /* lock posix mqueue list */ + rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); + pmq = posix_mq_find(name); + if (pmq != RT_NULL) + { + pmq->unlinked = 1; + if (pmq->refcount == 0) + { + /* remove this mqueue */ + posix_mq_delete(pmq); + } + rt_sem_release(&posix_mq_lock); + return 0; + } + rt_sem_release(&posix_mq_lock); - /* delete this message queue */ - rt_mq_delete(mq); - return 0; + /* no this entry */ + rt_set_errno(ENOENT); + return -1; } diff --git a/components/pthreads/mqueue.h b/components/pthreads/mqueue.h index 80df241d85..fb99e58068 100644 --- a/components/pthreads/mqueue.h +++ b/components/pthreads/mqueue.h @@ -7,7 +7,26 @@ #include #include -typedef rt_mq_t mqd_t; +struct posix_mq +{ + /* reference count and unlinked */ + rt_uint16_t refcount; + rt_uint16_t unlinked; + + /* RT-Thread message queue */ + rt_mq_t mq; + /* next posix mqueue */ + struct posix_mq* next; +}; +typedef struct posix_mq posix_mq_t; + +struct mqdes +{ + rt_uint32_t flags; + posix_mq_t* mq; +}; +typedef struct mqdes* mqd_t; + struct mq_attr { long mq_flags; /* Message queue flags. */ diff --git a/components/pthreads/pthread.c b/components/pthreads/pthread.c index e3958a5e20..fc312dfd11 100644 --- a/components/pthreads/pthread.c +++ b/components/pthreads/pthread.c @@ -6,6 +6,10 @@ int pthread_system_init(void) { /* initialize key area */ pthread_key_system_init(); + /* initialize posix mqueue */ + posix_mq_system_init(); + /* initialize posix semaphore */ + posix_sem_system_init(); return 0; } diff --git a/components/pthreads/pthread_internal.h b/components/pthreads/pthread_internal.h index cc0b512ac0..5d00d15091 100644 --- a/components/pthreads/pthread_internal.h +++ b/components/pthreads/pthread_internal.h @@ -54,5 +54,7 @@ rt_inline _pthread_data_t* _pthread_get_data(pthread_t thread) } extern int libc_time_to_tick(const struct timespec *time); +void posix_mq_system_init(void); +void posix_sem_system_init(void); #endif diff --git a/components/pthreads/semaphore.c b/components/pthreads/semaphore.c index 214f66b42f..6231dc96af 100644 --- a/components/pthreads/semaphore.c +++ b/components/pthreads/semaphore.c @@ -4,38 +4,151 @@ #include #include "semaphore.h" +static posix_sem_t* posix_sem_list = RT_NULL; +static struct rt_semaphore posix_sem_lock; +void posix_sem_system_init() +{ + rt_sem_init(&posix_sem_lock, "psem", 1, RT_IPC_FLAG_FIFO); +} + +rt_inline void posix_sem_insert(posix_sem_t *psem) +{ + psem->next = posix_sem_list; + posix_sem_list = psem; +} + +static void posix_sem_delete(posix_sem_t *psem) +{ + posix_sem_t *iter; + if (posix_sem_list == psem) + { + posix_sem_list = psem->next; + + rt_sem_delete(psem->sem); + rt_free(psem); + + return; + } + for (iter = posix_sem_list; iter->next != RT_NULL; iter = iter->next) + { + if (iter->next == psem) + { + /* delete this mq */ + if (psem->next != RT_NULL) + iter->next = psem->next; + else + iter->next = RT_NULL; + + /* delete RT-Thread mqueue */ + rt_sem_delete(psem->sem); + rt_free(psem); + return ; + } + } +} + +static posix_sem_t *posix_sem_find(const char* name) +{ + posix_sem_t *iter; + rt_object_t object; + + for (iter = posix_sem_list; iter != RT_NULL; iter = iter->next) + { + object = (rt_object_t)&(iter->sem); + + if (strncmp(object->name, name, RT_NAME_MAX) == 0) + { + return iter; + } + } +} + int sem_close(sem_t *sem) { - if (!sem) return EINVAL; + if (sem == RT_NULL) + { + rt_set_errno(EINVAL); + return -1; + } - /* delete semaphore allocated in sem_open */ - rt_sem_delete(sem); - return 0; + /* lock posix semaphore list */ + rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER); + sem->sem->refcount --; + if (sem->sem->refcount == 0) + { + /* delete from posix semaphore list */ + if (sem->sem->unlinked) + posix_sem_delete(sem->sem); + sem->sem = RT_NULL; + } + rt_sem_release(&posix_sem_lock); + + rt_free(sem); + return 0; } int sem_destroy(sem_t *sem) { rt_err_t result; - if (!sem) return EINVAL; + if ((!sem) || !(sem->sem->unamed)) + { + rt_set_errno(EINVAL); + return -1; + } - /* check whether semaphore is busy or not */ - result = rt_sem_trytake(sem); - if (result != RT_EOK) return EBUSY; + /* lock posix semaphore list */ + rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER); + result = rt_sem_trytake(sem->sem->sem); + if (result != RT_EOK) + { + rt_sem_release(&posix_sem_lock); + rt_set_errno(EBUSY); + return -1; + } - rt_memset(sem, 0, sizeof(sem_t)); - return 0; + /* destroy an unamed posix semaphore */ + posix_sem_delete(sem->sem); + rt_sem_release(&posix_sem_lock); + + rt_free(sem); + return 0; } int sem_unlink(const char *name) { - return EACCES; + posix_sem_t *psem; + + /* lock posix semaphore list */ + rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER); + psem = posix_sem_find(name); + if (psem != RT_NULL) + { + psem->unlinked = 1; + if (psem->refcount == 0) + { + /* remove this semaphore */ + posix_sem_delete(psem); + } + rt_sem_release(&posix_sem_lock); + return 0; + } + rt_sem_release(&posix_sem_lock); + + /* no this entry */ + rt_set_errno(ENOENT); + return -1; } int sem_getvalue(sem_t *sem, int *sval) { - RT_ASSERT(sem != RT_NULL); - if (sval) *sval = sem->value; + if (!sem || !sval) + { + rt_set_errno(EINVAL); + return -1; + } + *sval = sem->sem->sem->value; + return 0; } int sem_init(sem_t *sem, int pshared, unsigned int value) @@ -47,42 +160,146 @@ int sem_init(sem_t *sem, int pshared, unsigned int value) RT_ASSERT(sem != RT_NULL); rt_snprintf(name, sizeof(name), "psem%02d", psem_number++); - result = rt_sem_init(sem, name, value, RT_IPC_FLAG_FIFO); - if (result == RT_EOK) + sem->sem = (struct posix_sem*) rt_malloc (sizeof(struct posix_sem)); + if (sem->sem == RT_NULL) { - /* detach kernel object */ - rt_object_detach(&(sem->parent.parent)); - return 0; + rt_set_errno(EINVAL); + return -1; + } + sem->sem->sem = rt_sem_create(name, value, RT_IPC_FLAG_FIFO); + if (sem->sem->sem == RT_NULL) + { + rt_free(sem->sem); + sem->sem = RT_NULL; + rt_set_errno(ENOMEM); + return -1; } - return ENOSPC; + /* initialize posix semaphore */ + sem->sem->refcount = 1; + sem->sem->unlinked = 0; + sem->sem->unamed = 1; + /* lock posix semaphore list */ + rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER); + posix_sem_insert(sem->sem); + rt_sem_release(&posix_sem_lock); + sem->flags = 0; + + return 0; } sem_t *sem_open(const char *name, int oflag, ...) { - rt_sem_t sem; + sem_t* sem; + va_list arg; + mode_t mode; + unsigned int value; - sem = RT_NULL; + sem = RT_NULL; + + /* lock posix semaphore list */ + rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER); if (oflag & O_CREAT) { - sem = rt_sem_create(name, 1, RT_IPC_FLAG_FIFO); - if (sem == RT_NULL) - rt_set_errno(ENOSPC); - } + va_start(arg, oflag); + mode = (mode_t) va_arg( arg, unsigned int); + value = va_arg( arg, unsigned int); + va_end(arg); - /* find semaphore */ - if (oflag & O_EXCL) + if (oflag & O_EXCL) + { + if (posix_sem_find(name) != RT_NULL) + { + rt_set_errno(EEXIST); + goto __return; + } + } + sem = (sem_t*) rt_malloc (sizeof(struct semdes)); + if (sem == RT_NULL) + { + rt_set_errno(ENFILE); + goto __return; + } + + sem->flags = oflag; + sem->sem = (posix_sem_t*) rt_malloc (sizeof(posix_sem_t)); + if (sem->sem == RT_NULL) + { + rt_set_errno(ENFILE); + goto __return; + } + + /* create RT-Thread semaphore */ + sem->sem->sem = rt_sem_create(name, value, RT_IPC_FLAG_FIFO); + if (sem->sem->sem == RT_NULL) /* create failed */ + { + rt_set_errno(ENFILE); + goto __return; + } + /* initialize reference count */ + sem->sem->refcount = 1; + sem->sem->unlinked = 0; + sem->sem->unamed = 0; + + /* insert semaphore to posix semaphore list */ + posix_sem_insert(sem->sem); + } + else { - sem = (rt_sem_t)rt_object_find(name, RT_Object_Class_Semaphore); - if (sem == RT_NULL) rt_set_errno(ENOSPC); - } + posix_sem_t *psem; + /* find semaphore */ + psem = posix_sem_find(name); + if (psem != RT_NULL) + { + sem = (sem_t*) rt_malloc (sizeof(struct semdes)); + sem->sem = psem; + sem->flags = oflag; + psem->refcount ++; /* increase reference count */ + } + else + { + rt_set_errno(ENOENT); + goto __return; + } + } + rt_sem_release(&posix_sem_lock); return sem; + +__return: + /* release lock */ + rt_sem_release(&posix_sem_lock); + + /* release allocated memory */ + if (sem != RT_NULL) + { + if (sem->sem != RT_NULL) + { + /* delete RT-Thread semaphore */ + if (sem->sem->sem != RT_NULL) + rt_sem_delete(sem->sem->sem); + rt_free(sem->sem); + } + rt_free(sem); + } + return RT_NULL; } int sem_post(sem_t *sem) { - rt_sem_release(sem); + rt_err_t result; + + if (!sem) + { + rt_set_errno(EINVAL); + return -1; + } + + result = rt_sem_release(sem->sem->sem); + if (result == RT_EOK) return 0; + + rt_set_errno(EINVAL); + return -1; } int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) @@ -95,33 +312,54 @@ int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) /* calculate os tick */ tick = libc_time_to_tick(abs_timeout); - result = rt_sem_take(sem, tick); - if (result == -RT_ETIMEOUT) return ETIMEDOUT; + result = rt_sem_take(sem->sem->sem, tick); + if (result == -RT_ETIMEOUT) + { + rt_set_errno(ETIMEDOUT); + return -1; + } if (result == RT_EOK) return 0; - return EINTR; + rt_set_errno(EINTR); + return -1; } int sem_trywait(sem_t *sem) { rt_err_t result; - if (!sem) return EINVAL; + if (!sem) + { + rt_set_errno(EINVAL); + return -1; + } - result = rt_sem_take(sem, RT_WAITING_FOREVER); - if (result == -RT_ETIMEOUT) return EAGAIN; + result = rt_sem_take(sem->sem->sem, RT_WAITING_FOREVER); + if (result == -RT_ETIMEOUT) + { + rt_set_errno(EAGAIN); + return -1; + } if (result == RT_EOK) return 0; - return EINTR; + rt_set_errno(EINTR); + return -1; } int sem_wait(sem_t *sem) { rt_err_t result; - result = rt_sem_take(sem, RT_WAITING_FOREVER); + if (!sem) + { + rt_set_errno(EINVAL); + return -1; + } + + result = rt_sem_take(sem->sem->sem, RT_WAITING_FOREVER); if (result == RT_EOK) return 0; - return EINTR; + rt_set_errno(EINTR); + return -1; } diff --git a/components/pthreads/semaphore.h b/components/pthreads/semaphore.h index 29e2911a54..09e4cab938 100644 --- a/components/pthreads/semaphore.h +++ b/components/pthreads/semaphore.h @@ -4,7 +4,27 @@ #include #include -typedef struct rt_semaphore sem_t; +struct posix_sem +{ + /* reference count and unlinked */ + rt_uint16_t refcount; + rt_uint8_t unlinked; + rt_uint8_t unamed; + + /* RT-Thread semaphore */ + rt_sem_t sem; + + /* next posix semaphore */ + struct posix_sem* next; +}; +typedef struct posix_sem posix_sem_t; + +struct semdes +{ + rt_uint32_t flags; + posix_sem_t* sem; +}; +typedef struct semdes sem_t; int sem_close(sem_t *sem); int sem_destroy(sem_t *sem);