Cygwin: POSIX msg queues: move all mq_* functionality into fhandler_mqueue

The POSIX entry points are just wrappers now, calling into
fhandler_mqueue.  While at it, eliminate mqi_flags, replace with
standard fhandler nonblocking flag.

Signed-off-by: Corinna Vinschen <corinna@vinschen.de>
This commit is contained in:
Corinna Vinschen 2021-05-25 20:15:16 +02:00
parent e0cdd462f1
commit 46f3b0ce85
4 changed files with 495 additions and 479 deletions

View File

@ -3115,6 +3115,11 @@ class fhandler_mqueue: public fhandler_disk_file
int _dup (HANDLE parent, fhandler_mqueue *child);
int mutex_lock (HANDLE mtx, bool eintr);
int mutex_unlock (HANDLE mtx);
int cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime);
void cond_signal (HANDLE evt);
public:
fhandler_mqueue ();
fhandler_mqueue (void *) {}
@ -3126,6 +3131,13 @@ public:
int open (int, mode_t);
int mq_open (int, mode_t, struct mq_attr *);
int mq_getattr (struct mq_attr *);
int mq_setattr (const struct mq_attr *, struct mq_attr *);
int mq_notify (const struct sigevent *);
int mq_timedsend (const char *, size_t, unsigned int,
const struct timespec *);
ssize_t mq_timedrecv (char *, size_t, unsigned int *,
const struct timespec *);
struct mq_info *mqinfo () { return &mqi; }

View File

@ -11,6 +11,7 @@ details. */
#include "path.h"
#include "fhandler.h"
#include "dtable.h"
#include "clock.h"
#include <mqueue.h>
#include <sys/param.h>
@ -137,7 +138,7 @@ fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags,
get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */
mqinfo ()->mqi_sectsize = filesize;
mqinfo ()->mqi_mode = mode;
mqinfo ()->mqi_flags = flags;
set_nonblocking (flags & O_NONBLOCK);
__small_swprintf (buf, L"mqueue/mtx%s", get_name ());
RtlInitUnicodeString (&uname, buf);
@ -426,3 +427,436 @@ fhandler_mqueue::close ()
__endtry
return 0;
}
int
fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr)
{
switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
| (eintr ? cw_sig_eintr : cw_sig_restart)))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
return 0;
case WAIT_SIGNALED:
set_errno (EINTR);
return 1;
default:
break;
}
return geterrno_from_win_error ();
}
int
fhandler_mqueue::mutex_unlock (HANDLE mtx)
{
return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
}
int
fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx,
const struct timespec *abstime)
{
HANDLE w4[4] = { evt, };
DWORD cnt = 2;
DWORD timer_idx = 0;
int ret = 0;
wait_signal_arrived here (w4[1]);
if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
++cnt;
if (abstime)
{
if (!valid_timespec (*abstime))
return EINVAL;
/* If a timeout is set, we create a waitable timer to wait for.
This is the easiest way to handle the absolute timeout value, given
that NtSetTimer also takes absolute times and given the double
dependency on evt *and* mtx, which requires to call WFMO twice. */
NTSTATUS status;
LARGE_INTEGER duetime;
timer_idx = cnt++;
status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
NotificationTimer);
if (!NT_SUCCESS (status))
return geterrno_from_nt_status (status);
timespec_to_filetime (abstime, &duetime);
status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
if (!NT_SUCCESS (status))
{
NtClose (w4[timer_idx]);
return geterrno_from_nt_status (status);
}
}
ResetEvent (evt);
if ((ret = mutex_unlock (mtx)) != 0)
return ret;
/* Everything's set up, so now wait for the event to be signalled. */
restart1:
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart1;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread::static_cancel_self ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
if (ret == 0)
{
/* At this point we need to lock the mutex. The wait is practically
the same as before, just that we now wait on the mutex instead of the
event. */
restart2:
w4[0] = mtx;
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart2;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread_testcancel ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
}
if (timer_idx)
{
if (ret != ETIMEDOUT)
NtCancelTimer (w4[timer_idx], NULL);
NtClose (w4[timer_idx]);
}
return ret;
}
void
fhandler_mqueue::cond_signal (HANDLE evt)
{
SetEvent (evt);
}
int
fhandler_mqueue::mq_getattr (struct mq_attr *mqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; /* per-open */
mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
mqstat->mq_msgsize = attr->mq_msgsize;
mqstat->mq_curmsgs = attr->mq_curmsgs;
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat,
struct mq_attr *omqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
if (omqstat != NULL)
{
omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0;
omqstat->mq_maxmsg = attr->mq_maxmsg;
omqstat->mq_msgsize = attr->mq_msgsize;
omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
}
set_nonblocking (mqstat->mq_flags & O_NONBLOCK);
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_notify (const struct sigevent *notification)
{
int n;
pid_t pid;
struct mq_hdr *mqhdr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
pid = myself->pid;
if (!notification)
{
if (mqhdr->mqh_pid == pid)
mqhdr->mqh_pid = 0; /* unregister calling process */
}
else
{
if (mqhdr->mqh_pid != 0)
{
if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
{
set_errno (EBUSY);
mutex_unlock (mqinfo ()->mqi_lock);
__leave;
}
}
mqhdr->mqh_pid = pid;
mqhdr->mqh_event = *notification;
}
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio,
const struct timespec *abstime)
{
int n;
long index, freeindex;
int8_t *mptr;
struct sigevent *sigev;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
bool mutex_locked = false;
int ret = -1;
pthread_testcancel ();
__try
{
if (prio >= MQ_PRIO_MAX)
{
set_errno (EINVAL);
__leave;
}
mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
mutex_locked = true;
if (len > (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0)
{
if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
{
sigev = &mqhdr->mqh_event;
if (sigev->sigev_notify == SIGEV_SIGNAL)
sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
sigev->sigev_value);
mqhdr->mqh_pid = 0; /* unregister */
}
}
else if (attr->mq_curmsgs >= attr->mq_maxmsg)
{
/* Queue is full */
if (is_nonblocking ())
{
set_errno (EAGAIN);
__leave;
}
/* Wait for room for one message on the queue */
while (attr->mq_curmsgs >= attr->mq_maxmsg)
{
int ret = cond_timedwait (mqinfo ()->mqi_waitsend,
mqinfo ()->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
}
/* nmsghdr will point to new message */
if ((freeindex = mqhdr->mqh_free) == 0)
api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
nmsghdr = (struct msg_hdr *) &mptr[freeindex];
nmsghdr->msg_prio = prio;
nmsghdr->msg_len = len;
memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
/* Find right place for message in linked list */
index = mqhdr->mqh_head;
pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
while (index)
{
msghdr = (struct msg_hdr *) &mptr[index];
if (prio > msghdr->msg_prio)
{
nmsghdr->msg_next = index;
pmsghdr->msg_next = freeindex;
break;
}
index = msghdr->msg_next;
pmsghdr = msghdr;
}
if (index == 0)
{
/* Queue was empty or new goes at end of list */
pmsghdr->msg_next = freeindex;
nmsghdr->msg_next = 0;
}
/* Wake up anyone blocked in mq_receive waiting for a message */
if (attr->mq_curmsgs == 0)
cond_signal (mqinfo ()->mqi_waitrecv);
attr->mq_curmsgs++;
ret = 0;
}
__except (EBADF) {}
__endtry
if (mutex_locked)
mutex_unlock (mqinfo ()->mqi_lock);
return ret;
}
ssize_t
fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
const struct timespec *abstime)
{
int n;
long index;
int8_t *mptr;
ssize_t len = -1;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr;
bool mutex_locked = false;
pthread_testcancel ();
__try
{
mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
mutex_locked = true;
if (maxlen < (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0) /* queue is empty */
{
if (is_nonblocking ())
{
set_errno (EAGAIN);
__leave;
}
/* Wait for a message to be placed onto queue */
mqhdr->mqh_nwait++;
while (attr->mq_curmsgs == 0)
{
int ret = cond_timedwait (mqinfo ()->mqi_waitrecv,
mqinfo ()->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
mqhdr->mqh_nwait--;
}
if ((index = mqhdr->mqh_head) == 0)
api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
msghdr = (struct msg_hdr *) &mptr[index];
mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
len = msghdr->msg_len;
memcpy(ptr, msghdr + 1, len); /* copy the message itself */
if (priop != NULL)
*priop = msghdr->msg_prio;
/* Just-read message goes to front of free list */
msghdr->msg_next = mqhdr->mqh_free;
mqhdr->mqh_free = index;
/* Wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
cond_signal (mqinfo ()->mqi_waitsend);
attr->mq_curmsgs--;
}
__except (EBADF) {}
__endtry
if (mutex_locked)
mutex_unlock (mqinfo ()->mqi_lock);
return len;
}

View File

@ -58,7 +58,6 @@ struct mq_info
HANDLE mqi_waitsend; /* and condition variable for full queue */
HANDLE mqi_waitrecv; /* and condition variable for empty queue */
uint32_t mqi_magic; /* magic number if open */
int mqi_flags; /* flags for this process */
};

View File

@ -104,135 +104,6 @@ check_path (char *res_name, ipc_type_t type, const char *name, size_t len)
return true;
}
static int
ipc_mutex_lock (HANDLE mtx, bool eintr)
{
switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
| (eintr ? cw_sig_eintr : cw_sig_restart)))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
return 0;
case WAIT_SIGNALED:
set_errno (EINTR);
return 1;
default:
break;
}
return geterrno_from_win_error ();
}
static inline int
ipc_mutex_unlock (HANDLE mtx)
{
return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
}
static int
ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime)
{
HANDLE w4[4] = { evt, };
DWORD cnt = 2;
DWORD timer_idx = 0;
int ret = 0;
wait_signal_arrived here (w4[1]);
if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
++cnt;
if (abstime)
{
if (!valid_timespec (*abstime))
return EINVAL;
/* If a timeout is set, we create a waitable timer to wait for.
This is the easiest way to handle the absolute timeout value, given
that NtSetTimer also takes absolute times and given the double
dependency on evt *and* mtx, which requires to call WFMO twice. */
NTSTATUS status;
LARGE_INTEGER duetime;
timer_idx = cnt++;
status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
NotificationTimer);
if (!NT_SUCCESS (status))
return geterrno_from_nt_status (status);
timespec_to_filetime (abstime, &duetime);
status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
if (!NT_SUCCESS (status))
{
NtClose (w4[timer_idx]);
return geterrno_from_nt_status (status);
}
}
ResetEvent (evt);
if ((ret = ipc_mutex_unlock (mtx)) != 0)
return ret;
/* Everything's set up, so now wait for the event to be signalled. */
restart1:
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart1;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread::static_cancel_self ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
if (ret == 0)
{
/* At this point we need to lock the mutex. The wait is practically
the same as before, just that we now wait on the mutex instead of the
event. */
restart2:
w4[0] = mtx;
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart2;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread_testcancel ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
}
if (timer_idx)
{
if (ret != ETIMEDOUT)
NtCancelTimer (w4[timer_idx], NULL);
NtClose (w4[timer_idx]);
}
return ret;
}
static inline void
ipc_cond_signal (HANDLE evt)
{
SetEvent (evt);
}
class ipc_flock
{
struct flock fl;
@ -348,388 +219,88 @@ mq_open (const char *name, int oflag, ...)
return (mqd_t) -1;
}
static struct mq_info *
get_mqinfo (cygheap_fdget &fd)
{
if (fd >= 0)
{
fhandler_mqueue *fh = fd->is_mqueue ();
if (fh)
return fh->mqinfo ();
set_errno (EINVAL);
}
return NULL;
}
extern "C" int
mq_getattr (mqd_t mqd, struct mq_attr *mqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct mq_info *mqinfo;
int ret = -1;
__try
{
cygheap_fdget fd ((int) mqd, true);
mqinfo = get_mqinfo (fd);
if (mqinfo->mqi_magic != MQI_MAGIC)
{
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
__leave;
}
mqhdr = mqinfo->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */
mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
mqstat->mq_msgsize = attr->mq_msgsize;
mqstat->mq_curmsgs = attr->mq_curmsgs;
ipc_mutex_unlock (mqinfo->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
else
ret = fh->mq_getattr (mqstat);
return ret;
}
extern "C" int
mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct mq_info *mqinfo;
int ret = -1;
__try
{
cygheap_fdget fd ((int) mqd, true);
mqinfo = get_mqinfo (fd);
if (mqinfo->mqi_magic != MQI_MAGIC)
{
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
__leave;
}
mqhdr = mqinfo->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
if (omqstat != NULL)
{
omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */
omqstat->mq_maxmsg = attr->mq_maxmsg;
omqstat->mq_msgsize = attr->mq_msgsize;
omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
}
if (mqstat->mq_flags & O_NONBLOCK)
mqinfo->mqi_flags |= O_NONBLOCK;
else
mqinfo->mqi_flags &= ~O_NONBLOCK;
ipc_mutex_unlock (mqinfo->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
ret = fh->mq_setattr (mqstat, omqstat);
return ret;
}
extern "C" int
mq_notify (mqd_t mqd, const struct sigevent *notification)
{
int n;
pid_t pid;
struct mq_hdr *mqhdr;
struct mq_info *mqinfo;
__try
{
cygheap_fdget fd ((int) mqd, true);
mqinfo = get_mqinfo (fd);
if (mqinfo->mqi_magic != MQI_MAGIC)
{
set_errno (EBADF);
__leave;
}
mqhdr = mqinfo->mqi_hdr;
if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
pid = getpid ();
if (!notification)
{
if (mqhdr->mqh_pid == pid)
mqhdr->mqh_pid = 0; /* unregister calling process */
}
else
{
if (mqhdr->mqh_pid != 0)
{
if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
{
set_errno (EBUSY);
ipc_mutex_unlock (mqinfo->mqi_lock);
__leave;
}
}
mqhdr->mqh_pid = pid;
mqhdr->mqh_event = *notification;
}
ipc_mutex_unlock (mqinfo->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
static int
_mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
const struct timespec *abstime)
{
int n;
long index, freeindex;
int8_t *mptr;
struct sigevent *sigev;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
struct mq_info *mqinfo = NULL;
bool ipc_mutex_locked = false;
int ret = -1;
pthread_testcancel ();
__try
{
cygheap_fdget fd ((int) mqd);
mqinfo = get_mqinfo (fd);
if (mqinfo->mqi_magic != MQI_MAGIC)
{
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
__leave;
}
if (prio >= MQ_PRIO_MAX)
{
set_errno (EINVAL);
__leave;
}
mqhdr = mqinfo->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
ipc_mutex_locked = true;
if (len > (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0)
{
if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
{
sigev = &mqhdr->mqh_event;
if (sigev->sigev_notify == SIGEV_SIGNAL)
sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
sigev->sigev_value);
mqhdr->mqh_pid = 0; /* unregister */
}
}
else if (attr->mq_curmsgs >= attr->mq_maxmsg)
{
/* Queue is full */
if (mqinfo->mqi_flags & O_NONBLOCK)
{
set_errno (EAGAIN);
__leave;
}
/* Wait for room for one message on the queue */
while (attr->mq_curmsgs >= attr->mq_maxmsg)
{
int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend,
mqinfo->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
}
/* nmsghdr will point to new message */
if ((freeindex = mqhdr->mqh_free) == 0)
api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
nmsghdr = (struct msg_hdr *) &mptr[freeindex];
nmsghdr->msg_prio = prio;
nmsghdr->msg_len = len;
memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
/* Find right place for message in linked list */
index = mqhdr->mqh_head;
pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
while (index)
{
msghdr = (struct msg_hdr *) &mptr[index];
if (prio > msghdr->msg_prio)
{
nmsghdr->msg_next = index;
pmsghdr->msg_next = freeindex;
break;
}
index = msghdr->msg_next;
pmsghdr = msghdr;
}
if (index == 0)
{
/* Queue was empty or new goes at end of list */
pmsghdr->msg_next = freeindex;
nmsghdr->msg_next = 0;
}
/* Wake up anyone blocked in mq_receive waiting for a message */
if (attr->mq_curmsgs == 0)
ipc_cond_signal (mqinfo->mqi_waitrecv);
attr->mq_curmsgs++;
ret = 0;
}
__except (EBADF) {}
__endtry
if (ipc_mutex_locked)
ipc_mutex_unlock (mqinfo->mqi_lock);
else
ret = fh->mq_notify (notification);
return ret;
}
extern "C" int
mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
{
return _mq_send (mqd, ptr, len, prio, NULL);
}
extern "C" int
mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio,
const struct timespec *abstime)
{
return _mq_send (mqd, ptr, len, prio, abstime);
}
int ret = -1;
static ssize_t
_mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
const struct timespec *abstime)
{
int n;
long index;
int8_t *mptr;
ssize_t len = -1;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr;
struct mq_info *mqinfo;
bool ipc_mutex_locked = false;
pthread_testcancel ();
__try
{
cygheap_fdget fd ((int) mqd);
mqinfo = get_mqinfo (fd);
if (mqinfo->mqi_magic != MQI_MAGIC)
{
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
__leave;
}
mqhdr = mqinfo->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
ipc_mutex_locked = true;
if (maxlen < (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0) /* queue is empty */
{
if (mqinfo->mqi_flags & O_NONBLOCK)
{
set_errno (EAGAIN);
__leave;
}
/* Wait for a message to be placed onto queue */
mqhdr->mqh_nwait++;
while (attr->mq_curmsgs == 0)
{
int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv,
mqinfo->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
mqhdr->mqh_nwait--;
}
if ((index = mqhdr->mqh_head) == 0)
api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
msghdr = (struct msg_hdr *) &mptr[index];
mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
len = msghdr->msg_len;
memcpy(ptr, msghdr + 1, len); /* copy the message itself */
if (priop != NULL)
*priop = msghdr->msg_prio;
/* Just-read message goes to front of free list */
msghdr->msg_next = mqhdr->mqh_free;
mqhdr->mqh_free = index;
/* Wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
ipc_cond_signal (mqinfo->mqi_waitsend);
attr->mq_curmsgs--;
}
__except (EBADF) {}
__endtry
if (ipc_mutex_locked)
ipc_mutex_unlock (mqinfo->mqi_lock);
return len;
else
ret = fh->mq_timedsend (ptr, len, prio, abstime);
return ret;
}
extern "C" ssize_t
mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
extern "C" int
mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
{
return _mq_receive (mqd, ptr, maxlen, priop, NULL);
return mq_timedsend (mqd, ptr, len, prio, NULL);
}
extern "C" ssize_t
mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop,
const struct timespec *abstime)
{
return _mq_receive (mqd, ptr, maxlen, priop, abstime);
int ret = -1;
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
else
ret = fh->mq_timedrecv (ptr, maxlen, priop, abstime);
return ret;
}
extern "C" ssize_t
mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
{
return mq_timedreceive (mqd, ptr, maxlen, priop, NULL);
}
extern "C" int