rewrite posix semaphore and mqueue implementation.

git-svn-id: https://rt-thread.googlecode.com/svn/trunk@1103 bbd45198-f89e-11dd-88c7-29a3b14d5316
This commit is contained in:
bernard.xiong@gmail.com 2010-11-24 00:04:16 +00:00
parent 558023a020
commit 1b014359b0
6 changed files with 524 additions and 67 deletions

View File

@ -5,6 +5,65 @@
#include <errno.h> #include <errno.h>
#include <sys/fcntl.h> #include <sys/fcntl.h>
static posix_mq_t* posix_mq_list = RT_NULL;
static struct rt_semaphore posix_mq_lock;
void posix_mq_system_init()
{
rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO);
}
rt_inline void posix_mq_insert(posix_mq_t *pmq)
{
pmq->next = posix_mq_list;
posix_mq_list = pmq;
}
static void posix_mq_delete(posix_mq_t *pmq)
{
posix_mq_t *iter;
if (posix_mq_list == pmq)
{
posix_mq_list = pmq->next;
rt_mq_delete(pmq->mq);
rt_free(pmq);
return;
}
for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next)
{
if (iter->next == pmq)
{
/* delete this mq */
if (pmq->next != RT_NULL)
iter->next = pmq->next;
else
iter->next = RT_NULL;
/* delete RT-Thread mqueue */
rt_mq_delete(pmq->mq);
rt_free(pmq);
return ;
}
}
}
static posix_mq_t *posix_mq_find(const char* name)
{
posix_mq_t *iter;
rt_object_t object;
for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next)
{
object = (rt_object_t)&(iter->mq);
if (strncmp(object->name, name, RT_NAME_MAX) == 0)
{
return iter;
}
}
}
int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
struct mq_attr *omqstat) struct mq_attr *omqstat)
{ {
@ -20,8 +79,8 @@ int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
return -1; return -1;
} }
mqstat->mq_maxmsg = mqdes->max_msgs; mqstat->mq_maxmsg = mqdes->mq->mq->max_msgs;
mqstat->mq_msgsize = mqdes->msg_size; mqstat->mq_msgsize = mqdes->mq->mq->msg_size;
mqstat->mq_curmsgs = 0; mqstat->mq_curmsgs = 0;
mqstat->mq_flags = 0; mqstat->mq_flags = 0;
@ -30,11 +89,15 @@ int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
mqd_t mq_open(const char *name, int oflag, ...) mqd_t mq_open(const char *name, int oflag, ...)
{ {
rt_mq_t mq; mqd_t mqdes;
va_list arg; va_list arg;
mode_t mode; mode_t mode;
struct mq_attr *attr = RT_NULL; struct mq_attr *attr = RT_NULL;
/* lock posix mqueue list */
rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
mqdes = RT_NULL;
if (oflag & O_CREAT) if (oflag & O_CREAT)
{ {
va_start(arg, oflag); va_start(arg, oflag);
@ -42,21 +105,82 @@ mqd_t mq_open(const char *name, int oflag, ...)
attr = (struct mq_attr *) va_arg(arg, struct mq_attr *); attr = (struct mq_attr *) va_arg(arg, struct mq_attr *);
va_end(arg); va_end(arg);
mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO); if (oflag & O_EXCL)
if (mq == RT_NULL) /* create failed */ {
if (posix_mq_find(name) != RT_NULL)
{
rt_set_errno(EEXIST);
goto __return;
}
}
mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
if (mqdes == RT_NULL)
{
rt_set_errno(ENFILE);
goto __return;
}
mqdes->flags = oflag;
mqdes->mq = (posix_mq_t*) rt_malloc (sizeof(posix_mq_t));
if (mqdes->mq == RT_NULL)
{
rt_set_errno(ENFILE);
goto __return;
}
/* create RT-Thread message queue */
mqdes->mq->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO);
if (mqdes->mq->mq == RT_NULL) /* create failed */
{ {
rt_set_errno(ENFILE); rt_set_errno(ENFILE);
return RT_NULL; goto __return;
}
/* initialize reference count */
mqdes->mq->refcount = 1;
mqdes->mq->unlinked = 0;
/* insert mq to posix mq list */
posix_mq_insert(mqdes->mq);
}
else
{
posix_mq_t *mq;
/* find mqueue */
mq = posix_mq_find(name);
if (mq != RT_NULL)
{
mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
mqdes->mq = mq;
mqdes->flags = oflag;
mq->refcount ++; /* increase reference count */
}
else
{
rt_set_errno(ENOENT);
goto __return;
} }
} }
rt_sem_release(&posix_mq_lock);
return mqdes;
if (oflag & O_EXCL) __return:
/* release lock */
rt_sem_release(&posix_mq_lock);
/* release allocated memory */
if (mqdes != RT_NULL)
{ {
mq = (rt_mq_t)rt_object_find(name, RT_Object_Class_MessageQueue); if (mqdes->mq != RT_NULL)
if (mq == RT_NULL) rt_set_errno(ENOSPC); {
/* delete RT-Thread message queue */
if (mqdes->mq->mq != RT_NULL)
rt_mq_delete(mqdes->mq->mq);
rt_free(mqdes->mq);
}
rt_free(mqdes);
} }
return RT_NULL;
return mq;
} }
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio) ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
@ -69,7 +193,14 @@ ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_pri
return -1; return -1;
} }
result = rt_mq_recv(mqdes, msg_ptr, msg_len, RT_WAITING_FOREVER); /* permission check */
if (!(mqdes->flags & O_RDONLY))
{
rt_set_errno(EBADF);
return -1;
}
result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
if (result == RT_EOK) if (result == RT_EOK)
return msg_len; return msg_len;
@ -87,7 +218,14 @@ int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
return -1; return -1;
} }
result = rt_mq_send(mqdes, msg_ptr, msg_len); /* permission check */
if (!(mqdes->flags & O_WRONLY))
{
rt_set_errno(EBADF);
return -1;
}
result = rt_mq_send(mqdes->mq->mq, (void*)msg_ptr, msg_len);
if (result == RT_EOK) if (result == RT_EOK)
return 0; return 0;
@ -101,14 +239,22 @@ ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
int tick; int tick;
rt_err_t result; rt_err_t result;
/* parameters check */
if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL)) if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
{ {
rt_set_errno(EINVAL); rt_set_errno(EINVAL);
return -1; return -1;
} }
/* permission check */
if (!(mqdes->flags & O_RDONLY))
{
rt_set_errno(EBADF);
return -1;
}
tick = libc_time_to_tick(abs_timeout); tick = libc_time_to_tick(abs_timeout);
result = rt_mq_recv(mqdes, msg_ptr, msg_len, tick); result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, tick);
if (result == RT_EOK) return msg_len; if (result == RT_EOK) return msg_len;
if (result == -RT_ETIMEOUT) if (result == -RT_ETIMEOUT)
@ -134,21 +280,49 @@ int mq_notify(mqd_t mqdes, const struct sigevent *notification)
int mq_close(mqd_t mqdes) int mq_close(mqd_t mqdes)
{ {
return 0; if (mqdes == RT_NULL)
{
rt_set_errno(EINVAL);
return -1;
}
/* lock posix mqueue list */
rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
mqdes->mq->refcount --;
if (mqdes->mq->refcount == 0)
{
/* delete from posix mqueue list */
if (mqdes->mq->unlinked)
posix_mq_delete(mqdes->mq);
mqdes->mq = RT_NULL;
}
rt_sem_release(&posix_mq_lock);
rt_free(mqdes);
return 0;
} }
int mq_unlink(const char *name) int mq_unlink(const char *name)
{ {
rt_mq_t mq; posix_mq_t *pmq;
mq = (rt_mq_t)rt_object_find(name, RT_Object_Class_MessageQueue); /* lock posix mqueue list */
if (mq == RT_NULL) rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
{ pmq = posix_mq_find(name);
rt_set_errno(ENOENT); if (pmq != RT_NULL)
return -1; {
} pmq->unlinked = 1;
if (pmq->refcount == 0)
{
/* remove this mqueue */
posix_mq_delete(pmq);
}
rt_sem_release(&posix_mq_lock);
return 0;
}
rt_sem_release(&posix_mq_lock);
/* delete this message queue */ /* no this entry */
rt_mq_delete(mq); rt_set_errno(ENOENT);
return 0; return -1;
} }

View File

@ -7,7 +7,26 @@
#include <sys/signal.h> #include <sys/signal.h>
#include <pthread.h> #include <pthread.h>
typedef rt_mq_t mqd_t; struct posix_mq
{
/* reference count and unlinked */
rt_uint16_t refcount;
rt_uint16_t unlinked;
/* RT-Thread message queue */
rt_mq_t mq;
/* next posix mqueue */
struct posix_mq* next;
};
typedef struct posix_mq posix_mq_t;
struct mqdes
{
rt_uint32_t flags;
posix_mq_t* mq;
};
typedef struct mqdes* mqd_t;
struct mq_attr struct mq_attr
{ {
long mq_flags; /* Message queue flags. */ long mq_flags; /* Message queue flags. */

View File

@ -6,6 +6,10 @@ int pthread_system_init(void)
{ {
/* initialize key area */ /* initialize key area */
pthread_key_system_init(); pthread_key_system_init();
/* initialize posix mqueue */
posix_mq_system_init();
/* initialize posix semaphore */
posix_sem_system_init();
return 0; return 0;
} }

View File

@ -54,5 +54,7 @@ rt_inline _pthread_data_t* _pthread_get_data(pthread_t thread)
} }
extern int libc_time_to_tick(const struct timespec *time); extern int libc_time_to_tick(const struct timespec *time);
void posix_mq_system_init(void);
void posix_sem_system_init(void);
#endif #endif

View File

@ -4,38 +4,151 @@
#include <rtthread.h> #include <rtthread.h>
#include "semaphore.h" #include "semaphore.h"
static posix_sem_t* posix_sem_list = RT_NULL;
static struct rt_semaphore posix_sem_lock;
void posix_sem_system_init()
{
rt_sem_init(&posix_sem_lock, "psem", 1, RT_IPC_FLAG_FIFO);
}
rt_inline void posix_sem_insert(posix_sem_t *psem)
{
psem->next = posix_sem_list;
posix_sem_list = psem;
}
static void posix_sem_delete(posix_sem_t *psem)
{
posix_sem_t *iter;
if (posix_sem_list == psem)
{
posix_sem_list = psem->next;
rt_sem_delete(psem->sem);
rt_free(psem);
return;
}
for (iter = posix_sem_list; iter->next != RT_NULL; iter = iter->next)
{
if (iter->next == psem)
{
/* delete this mq */
if (psem->next != RT_NULL)
iter->next = psem->next;
else
iter->next = RT_NULL;
/* delete RT-Thread mqueue */
rt_sem_delete(psem->sem);
rt_free(psem);
return ;
}
}
}
static posix_sem_t *posix_sem_find(const char* name)
{
posix_sem_t *iter;
rt_object_t object;
for (iter = posix_sem_list; iter != RT_NULL; iter = iter->next)
{
object = (rt_object_t)&(iter->sem);
if (strncmp(object->name, name, RT_NAME_MAX) == 0)
{
return iter;
}
}
}
int sem_close(sem_t *sem) int sem_close(sem_t *sem)
{ {
if (!sem) return EINVAL; if (sem == RT_NULL)
{
rt_set_errno(EINVAL);
return -1;
}
/* delete semaphore allocated in sem_open */ /* lock posix semaphore list */
rt_sem_delete(sem); rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER);
return 0; sem->sem->refcount --;
if (sem->sem->refcount == 0)
{
/* delete from posix semaphore list */
if (sem->sem->unlinked)
posix_sem_delete(sem->sem);
sem->sem = RT_NULL;
}
rt_sem_release(&posix_sem_lock);
rt_free(sem);
return 0;
} }
int sem_destroy(sem_t *sem) int sem_destroy(sem_t *sem)
{ {
rt_err_t result; rt_err_t result;
if (!sem) return EINVAL; if ((!sem) || !(sem->sem->unamed))
{
rt_set_errno(EINVAL);
return -1;
}
/* check whether semaphore is busy or not */ /* lock posix semaphore list */
result = rt_sem_trytake(sem); rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER);
if (result != RT_EOK) return EBUSY; result = rt_sem_trytake(sem->sem->sem);
if (result != RT_EOK)
{
rt_sem_release(&posix_sem_lock);
rt_set_errno(EBUSY);
return -1;
}
rt_memset(sem, 0, sizeof(sem_t)); /* destroy an unamed posix semaphore */
return 0; posix_sem_delete(sem->sem);
rt_sem_release(&posix_sem_lock);
rt_free(sem);
return 0;
} }
int sem_unlink(const char *name) int sem_unlink(const char *name)
{ {
return EACCES; posix_sem_t *psem;
/* lock posix semaphore list */
rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER);
psem = posix_sem_find(name);
if (psem != RT_NULL)
{
psem->unlinked = 1;
if (psem->refcount == 0)
{
/* remove this semaphore */
posix_sem_delete(psem);
}
rt_sem_release(&posix_sem_lock);
return 0;
}
rt_sem_release(&posix_sem_lock);
/* no this entry */
rt_set_errno(ENOENT);
return -1;
} }
int sem_getvalue(sem_t *sem, int *sval) int sem_getvalue(sem_t *sem, int *sval)
{ {
RT_ASSERT(sem != RT_NULL); if (!sem || !sval)
if (sval) *sval = sem->value; {
rt_set_errno(EINVAL);
return -1;
}
*sval = sem->sem->sem->value;
return 0;
} }
int sem_init(sem_t *sem, int pshared, unsigned int value) int sem_init(sem_t *sem, int pshared, unsigned int value)
@ -47,42 +160,146 @@ int sem_init(sem_t *sem, int pshared, unsigned int value)
RT_ASSERT(sem != RT_NULL); RT_ASSERT(sem != RT_NULL);
rt_snprintf(name, sizeof(name), "psem%02d", psem_number++); rt_snprintf(name, sizeof(name), "psem%02d", psem_number++);
result = rt_sem_init(sem, name, value, RT_IPC_FLAG_FIFO); sem->sem = (struct posix_sem*) rt_malloc (sizeof(struct posix_sem));
if (result == RT_EOK) if (sem->sem == RT_NULL)
{ {
/* detach kernel object */ rt_set_errno(EINVAL);
rt_object_detach(&(sem->parent.parent)); return -1;
return 0; }
sem->sem->sem = rt_sem_create(name, value, RT_IPC_FLAG_FIFO);
if (sem->sem->sem == RT_NULL)
{
rt_free(sem->sem);
sem->sem = RT_NULL;
rt_set_errno(ENOMEM);
return -1;
} }
return ENOSPC; /* initialize posix semaphore */
sem->sem->refcount = 1;
sem->sem->unlinked = 0;
sem->sem->unamed = 1;
/* lock posix semaphore list */
rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER);
posix_sem_insert(sem->sem);
rt_sem_release(&posix_sem_lock);
sem->flags = 0;
return 0;
} }
sem_t *sem_open(const char *name, int oflag, ...) sem_t *sem_open(const char *name, int oflag, ...)
{ {
rt_sem_t sem; sem_t* sem;
va_list arg;
mode_t mode;
unsigned int value;
sem = RT_NULL; sem = RT_NULL;
/* lock posix semaphore list */
rt_sem_take(&posix_sem_lock, RT_WAITING_FOREVER);
if (oflag & O_CREAT) if (oflag & O_CREAT)
{ {
sem = rt_sem_create(name, 1, RT_IPC_FLAG_FIFO); va_start(arg, oflag);
if (sem == RT_NULL) mode = (mode_t) va_arg( arg, unsigned int);
rt_set_errno(ENOSPC); value = va_arg( arg, unsigned int);
} va_end(arg);
/* find semaphore */ if (oflag & O_EXCL)
if (oflag & O_EXCL) {
if (posix_sem_find(name) != RT_NULL)
{
rt_set_errno(EEXIST);
goto __return;
}
}
sem = (sem_t*) rt_malloc (sizeof(struct semdes));
if (sem == RT_NULL)
{
rt_set_errno(ENFILE);
goto __return;
}
sem->flags = oflag;
sem->sem = (posix_sem_t*) rt_malloc (sizeof(posix_sem_t));
if (sem->sem == RT_NULL)
{
rt_set_errno(ENFILE);
goto __return;
}
/* create RT-Thread semaphore */
sem->sem->sem = rt_sem_create(name, value, RT_IPC_FLAG_FIFO);
if (sem->sem->sem == RT_NULL) /* create failed */
{
rt_set_errno(ENFILE);
goto __return;
}
/* initialize reference count */
sem->sem->refcount = 1;
sem->sem->unlinked = 0;
sem->sem->unamed = 0;
/* insert semaphore to posix semaphore list */
posix_sem_insert(sem->sem);
}
else
{ {
sem = (rt_sem_t)rt_object_find(name, RT_Object_Class_Semaphore); posix_sem_t *psem;
if (sem == RT_NULL) rt_set_errno(ENOSPC);
}
/* find semaphore */
psem = posix_sem_find(name);
if (psem != RT_NULL)
{
sem = (sem_t*) rt_malloc (sizeof(struct semdes));
sem->sem = psem;
sem->flags = oflag;
psem->refcount ++; /* increase reference count */
}
else
{
rt_set_errno(ENOENT);
goto __return;
}
}
rt_sem_release(&posix_sem_lock);
return sem; return sem;
__return:
/* release lock */
rt_sem_release(&posix_sem_lock);
/* release allocated memory */
if (sem != RT_NULL)
{
if (sem->sem != RT_NULL)
{
/* delete RT-Thread semaphore */
if (sem->sem->sem != RT_NULL)
rt_sem_delete(sem->sem->sem);
rt_free(sem->sem);
}
rt_free(sem);
}
return RT_NULL;
} }
int sem_post(sem_t *sem) int sem_post(sem_t *sem)
{ {
rt_sem_release(sem); rt_err_t result;
if (!sem)
{
rt_set_errno(EINVAL);
return -1;
}
result = rt_sem_release(sem->sem->sem);
if (result == RT_EOK) return 0;
rt_set_errno(EINVAL);
return -1;
} }
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
@ -95,33 +312,54 @@ int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
/* calculate os tick */ /* calculate os tick */
tick = libc_time_to_tick(abs_timeout); tick = libc_time_to_tick(abs_timeout);
result = rt_sem_take(sem, tick); result = rt_sem_take(sem->sem->sem, tick);
if (result == -RT_ETIMEOUT) return ETIMEDOUT; if (result == -RT_ETIMEOUT)
{
rt_set_errno(ETIMEDOUT);
return -1;
}
if (result == RT_EOK) return 0; if (result == RT_EOK) return 0;
return EINTR; rt_set_errno(EINTR);
return -1;
} }
int sem_trywait(sem_t *sem) int sem_trywait(sem_t *sem)
{ {
rt_err_t result; rt_err_t result;
if (!sem) return EINVAL; if (!sem)
{
rt_set_errno(EINVAL);
return -1;
}
result = rt_sem_take(sem, RT_WAITING_FOREVER); result = rt_sem_take(sem->sem->sem, RT_WAITING_FOREVER);
if (result == -RT_ETIMEOUT) return EAGAIN; if (result == -RT_ETIMEOUT)
{
rt_set_errno(EAGAIN);
return -1;
}
if (result == RT_EOK) return 0; if (result == RT_EOK) return 0;
return EINTR; rt_set_errno(EINTR);
return -1;
} }
int sem_wait(sem_t *sem) int sem_wait(sem_t *sem)
{ {
rt_err_t result; rt_err_t result;
result = rt_sem_take(sem, RT_WAITING_FOREVER); if (!sem)
{
rt_set_errno(EINVAL);
return -1;
}
result = rt_sem_take(sem->sem->sem, RT_WAITING_FOREVER);
if (result == RT_EOK) return 0; if (result == RT_EOK) return 0;
return EINTR; rt_set_errno(EINTR);
return -1;
} }

View File

@ -4,7 +4,27 @@
#include <rtthread.h> #include <rtthread.h>
#include <sys/time.h> #include <sys/time.h>
typedef struct rt_semaphore sem_t; struct posix_sem
{
/* reference count and unlinked */
rt_uint16_t refcount;
rt_uint8_t unlinked;
rt_uint8_t unamed;
/* RT-Thread semaphore */
rt_sem_t sem;
/* next posix semaphore */
struct posix_sem* next;
};
typedef struct posix_sem posix_sem_t;
struct semdes
{
rt_uint32_t flags;
posix_sem_t* sem;
};
typedef struct semdes sem_t;
int sem_close(sem_t *sem); int sem_close(sem_t *sem);
int sem_destroy(sem_t *sem); int sem_destroy(sem_t *sem);