diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index ff51d29a5..abb13b0e2 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -3115,6 +3115,11 @@ class fhandler_mqueue: public fhandler_disk_file int _dup (HANDLE parent, fhandler_mqueue *child); + int mutex_lock (HANDLE mtx, bool eintr); + int mutex_unlock (HANDLE mtx); + int cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime); + void cond_signal (HANDLE evt); + public: fhandler_mqueue (); fhandler_mqueue (void *) {} @@ -3126,6 +3131,13 @@ public: int open (int, mode_t); int mq_open (int, mode_t, struct mq_attr *); + int mq_getattr (struct mq_attr *); + int mq_setattr (const struct mq_attr *, struct mq_attr *); + int mq_notify (const struct sigevent *); + int mq_timedsend (const char *, size_t, unsigned int, + const struct timespec *); + ssize_t mq_timedrecv (char *, size_t, unsigned int *, + const struct timespec *); struct mq_info *mqinfo () { return &mqi; } diff --git a/winsup/cygwin/fhandler_mqueue.cc b/winsup/cygwin/fhandler_mqueue.cc index c450c0337..28aae314e 100644 --- a/winsup/cygwin/fhandler_mqueue.cc +++ b/winsup/cygwin/fhandler_mqueue.cc @@ -11,6 +11,7 @@ details. */ #include "path.h" #include "fhandler.h" #include "dtable.h" +#include "clock.h" #include #include @@ -137,7 +138,7 @@ fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags, get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */ mqinfo ()->mqi_sectsize = filesize; mqinfo ()->mqi_mode = mode; - mqinfo ()->mqi_flags = flags; + set_nonblocking (flags & O_NONBLOCK); __small_swprintf (buf, L"mqueue/mtx%s", get_name ()); RtlInitUnicodeString (&uname, buf); @@ -426,3 +427,436 @@ fhandler_mqueue::close () __endtry return 0; } + +int +fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr) +{ + switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self + | (eintr ? cw_sig_eintr : cw_sig_restart))) + { + case WAIT_OBJECT_0: + case WAIT_ABANDONED_0: + return 0; + case WAIT_SIGNALED: + set_errno (EINTR); + return 1; + default: + break; + } + return geterrno_from_win_error (); +} + +int +fhandler_mqueue::mutex_unlock (HANDLE mtx) +{ + return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error (); +} + +int +fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx, + const struct timespec *abstime) +{ + HANDLE w4[4] = { evt, }; + DWORD cnt = 2; + DWORD timer_idx = 0; + int ret = 0; + + wait_signal_arrived here (w4[1]); + if ((w4[cnt] = pthread::get_cancel_event ()) != NULL) + ++cnt; + if (abstime) + { + if (!valid_timespec (*abstime)) + return EINVAL; + + /* If a timeout is set, we create a waitable timer to wait for. + This is the easiest way to handle the absolute timeout value, given + that NtSetTimer also takes absolute times and given the double + dependency on evt *and* mtx, which requires to call WFMO twice. */ + NTSTATUS status; + LARGE_INTEGER duetime; + + timer_idx = cnt++; + status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL, + NotificationTimer); + if (!NT_SUCCESS (status)) + return geterrno_from_nt_status (status); + timespec_to_filetime (abstime, &duetime); + status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL); + if (!NT_SUCCESS (status)) + { + NtClose (w4[timer_idx]); + return geterrno_from_nt_status (status); + } + } + ResetEvent (evt); + if ((ret = mutex_unlock (mtx)) != 0) + return ret; + /* Everything's set up, so now wait for the event to be signalled. */ +restart1: + switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) + { + case WAIT_OBJECT_0: + break; + case WAIT_OBJECT_0 + 1: + if (_my_tls.call_signal_handler ()) + goto restart1; + ret = EINTR; + break; + case WAIT_OBJECT_0 + 2: + if (timer_idx != 2) + pthread::static_cancel_self (); + fallthrough; + case WAIT_OBJECT_0 + 3: + ret = ETIMEDOUT; + break; + default: + ret = geterrno_from_win_error (); + break; + } + if (ret == 0) + { + /* At this point we need to lock the mutex. The wait is practically + the same as before, just that we now wait on the mutex instead of the + event. */ + restart2: + w4[0] = mtx; + switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) + { + case WAIT_OBJECT_0: + case WAIT_ABANDONED_0: + break; + case WAIT_OBJECT_0 + 1: + if (_my_tls.call_signal_handler ()) + goto restart2; + ret = EINTR; + break; + case WAIT_OBJECT_0 + 2: + if (timer_idx != 2) + pthread_testcancel (); + fallthrough; + case WAIT_OBJECT_0 + 3: + ret = ETIMEDOUT; + break; + default: + ret = geterrno_from_win_error (); + break; + } + } + if (timer_idx) + { + if (ret != ETIMEDOUT) + NtCancelTimer (w4[timer_idx], NULL); + NtClose (w4[timer_idx]); + } + return ret; +} + +void +fhandler_mqueue::cond_signal (HANDLE evt) +{ + SetEvent (evt); +} + +int +fhandler_mqueue::mq_getattr (struct mq_attr *mqstat) +{ + int n; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; /* per-open */ + mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */ + mqstat->mq_msgsize = attr->mq_msgsize; + mqstat->mq_curmsgs = attr->mq_curmsgs; + + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat, + struct mq_attr *omqstat) +{ + int n; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + + if (omqstat != NULL) + { + omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; + omqstat->mq_maxmsg = attr->mq_maxmsg; + omqstat->mq_msgsize = attr->mq_msgsize; + omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */ + } + + set_nonblocking (mqstat->mq_flags & O_NONBLOCK); + + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_notify (const struct sigevent *notification) +{ + int n; + pid_t pid; + struct mq_hdr *mqhdr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + + pid = myself->pid; + if (!notification) + { + if (mqhdr->mqh_pid == pid) + mqhdr->mqh_pid = 0; /* unregister calling process */ + } + else + { + if (mqhdr->mqh_pid != 0) + { + if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH) + { + set_errno (EBUSY); + mutex_unlock (mqinfo ()->mqi_lock); + __leave; + } + } + mqhdr->mqh_pid = pid; + mqhdr->mqh_event = *notification; + } + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio, + const struct timespec *abstime) +{ + int n; + long index, freeindex; + int8_t *mptr; + struct sigevent *sigev; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + struct msg_hdr *msghdr, *nmsghdr, *pmsghdr; + bool mutex_locked = false; + int ret = -1; + + pthread_testcancel (); + + __try + { + if (prio >= MQ_PRIO_MAX) + { + set_errno (EINVAL); + __leave; + } + + mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */ + mptr = (int8_t *) mqhdr; /* byte pointer */ + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0) + { + errno = n; + __leave; + } + mutex_locked = true; + if (len > (size_t) attr->mq_msgsize) + { + set_errno (EMSGSIZE); + __leave; + } + if (attr->mq_curmsgs == 0) + { + if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) + { + sigev = &mqhdr->mqh_event; + if (sigev->sigev_notify == SIGEV_SIGNAL) + sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, + sigev->sigev_value); + mqhdr->mqh_pid = 0; /* unregister */ + } + } + else if (attr->mq_curmsgs >= attr->mq_maxmsg) + { + /* Queue is full */ + if (is_nonblocking ()) + { + set_errno (EAGAIN); + __leave; + } + /* Wait for room for one message on the queue */ + while (attr->mq_curmsgs >= attr->mq_maxmsg) + { + int ret = cond_timedwait (mqinfo ()->mqi_waitsend, + mqinfo ()->mqi_lock, abstime); + if (ret != 0) + { + set_errno (ret); + __leave; + } + } + } + + /* nmsghdr will point to new message */ + if ((freeindex = mqhdr->mqh_free) == 0) + api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs); + + nmsghdr = (struct msg_hdr *) &mptr[freeindex]; + nmsghdr->msg_prio = prio; + nmsghdr->msg_len = len; + memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */ + mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */ + + /* Find right place for message in linked list */ + index = mqhdr->mqh_head; + pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head); + while (index) + { + msghdr = (struct msg_hdr *) &mptr[index]; + if (prio > msghdr->msg_prio) + { + nmsghdr->msg_next = index; + pmsghdr->msg_next = freeindex; + break; + } + index = msghdr->msg_next; + pmsghdr = msghdr; + } + if (index == 0) + { + /* Queue was empty or new goes at end of list */ + pmsghdr->msg_next = freeindex; + nmsghdr->msg_next = 0; + } + /* Wake up anyone blocked in mq_receive waiting for a message */ + if (attr->mq_curmsgs == 0) + cond_signal (mqinfo ()->mqi_waitrecv); + attr->mq_curmsgs++; + + ret = 0; + } + __except (EBADF) {} + __endtry + if (mutex_locked) + mutex_unlock (mqinfo ()->mqi_lock); + return ret; +} + +ssize_t +fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop, + const struct timespec *abstime) +{ + int n; + long index; + int8_t *mptr; + ssize_t len = -1; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + struct msg_hdr *msghdr; + bool mutex_locked = false; + + pthread_testcancel (); + + __try + { + mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */ + mptr = (int8_t *) mqhdr; /* byte pointer */ + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0) + { + errno = n; + __leave; + } + mutex_locked = true; + if (maxlen < (size_t) attr->mq_msgsize) + { + set_errno (EMSGSIZE); + __leave; + } + if (attr->mq_curmsgs == 0) /* queue is empty */ + { + if (is_nonblocking ()) + { + set_errno (EAGAIN); + __leave; + } + /* Wait for a message to be placed onto queue */ + mqhdr->mqh_nwait++; + while (attr->mq_curmsgs == 0) + { + int ret = cond_timedwait (mqinfo ()->mqi_waitrecv, + mqinfo ()->mqi_lock, abstime); + if (ret != 0) + { + set_errno (ret); + __leave; + } + } + mqhdr->mqh_nwait--; + } + + if ((index = mqhdr->mqh_head) == 0) + api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs); + + msghdr = (struct msg_hdr *) &mptr[index]; + mqhdr->mqh_head = msghdr->msg_next; /* new head of list */ + len = msghdr->msg_len; + memcpy(ptr, msghdr + 1, len); /* copy the message itself */ + if (priop != NULL) + *priop = msghdr->msg_prio; + + /* Just-read message goes to front of free list */ + msghdr->msg_next = mqhdr->mqh_free; + mqhdr->mqh_free = index; + + /* Wake up anyone blocked in mq_send waiting for room */ + if (attr->mq_curmsgs == attr->mq_maxmsg) + cond_signal (mqinfo ()->mqi_waitsend); + attr->mq_curmsgs--; + } + __except (EBADF) {} + __endtry + if (mutex_locked) + mutex_unlock (mqinfo ()->mqi_lock); + return len; +} diff --git a/winsup/cygwin/mqueue_types.h b/winsup/cygwin/mqueue_types.h index 3a4b127ca..4d0d910e4 100644 --- a/winsup/cygwin/mqueue_types.h +++ b/winsup/cygwin/mqueue_types.h @@ -58,7 +58,6 @@ struct mq_info HANDLE mqi_waitsend; /* and condition variable for full queue */ HANDLE mqi_waitrecv; /* and condition variable for empty queue */ uint32_t mqi_magic; /* magic number if open */ - int mqi_flags; /* flags for this process */ }; diff --git a/winsup/cygwin/posix_ipc.cc b/winsup/cygwin/posix_ipc.cc index 772072d93..1932ac8db 100644 --- a/winsup/cygwin/posix_ipc.cc +++ b/winsup/cygwin/posix_ipc.cc @@ -104,135 +104,6 @@ check_path (char *res_name, ipc_type_t type, const char *name, size_t len) return true; } -static int -ipc_mutex_lock (HANDLE mtx, bool eintr) -{ - switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self - | (eintr ? cw_sig_eintr : cw_sig_restart))) - { - case WAIT_OBJECT_0: - case WAIT_ABANDONED_0: - return 0; - case WAIT_SIGNALED: - set_errno (EINTR); - return 1; - default: - break; - } - return geterrno_from_win_error (); -} - -static inline int -ipc_mutex_unlock (HANDLE mtx) -{ - return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error (); -} - -static int -ipc_cond_timedwait (HANDLE evt, HANDLE mtx, const struct timespec *abstime) -{ - HANDLE w4[4] = { evt, }; - DWORD cnt = 2; - DWORD timer_idx = 0; - int ret = 0; - - wait_signal_arrived here (w4[1]); - if ((w4[cnt] = pthread::get_cancel_event ()) != NULL) - ++cnt; - if (abstime) - { - if (!valid_timespec (*abstime)) - return EINVAL; - - /* If a timeout is set, we create a waitable timer to wait for. - This is the easiest way to handle the absolute timeout value, given - that NtSetTimer also takes absolute times and given the double - dependency on evt *and* mtx, which requires to call WFMO twice. */ - NTSTATUS status; - LARGE_INTEGER duetime; - - timer_idx = cnt++; - status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL, - NotificationTimer); - if (!NT_SUCCESS (status)) - return geterrno_from_nt_status (status); - timespec_to_filetime (abstime, &duetime); - status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL); - if (!NT_SUCCESS (status)) - { - NtClose (w4[timer_idx]); - return geterrno_from_nt_status (status); - } - } - ResetEvent (evt); - if ((ret = ipc_mutex_unlock (mtx)) != 0) - return ret; - /* Everything's set up, so now wait for the event to be signalled. */ -restart1: - switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) - { - case WAIT_OBJECT_0: - break; - case WAIT_OBJECT_0 + 1: - if (_my_tls.call_signal_handler ()) - goto restart1; - ret = EINTR; - break; - case WAIT_OBJECT_0 + 2: - if (timer_idx != 2) - pthread::static_cancel_self (); - fallthrough; - case WAIT_OBJECT_0 + 3: - ret = ETIMEDOUT; - break; - default: - ret = geterrno_from_win_error (); - break; - } - if (ret == 0) - { - /* At this point we need to lock the mutex. The wait is practically - the same as before, just that we now wait on the mutex instead of the - event. */ - restart2: - w4[0] = mtx; - switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) - { - case WAIT_OBJECT_0: - case WAIT_ABANDONED_0: - break; - case WAIT_OBJECT_0 + 1: - if (_my_tls.call_signal_handler ()) - goto restart2; - ret = EINTR; - break; - case WAIT_OBJECT_0 + 2: - if (timer_idx != 2) - pthread_testcancel (); - fallthrough; - case WAIT_OBJECT_0 + 3: - ret = ETIMEDOUT; - break; - default: - ret = geterrno_from_win_error (); - break; - } - } - if (timer_idx) - { - if (ret != ETIMEDOUT) - NtCancelTimer (w4[timer_idx], NULL); - NtClose (w4[timer_idx]); - } - return ret; -} - -static inline void -ipc_cond_signal (HANDLE evt) -{ - SetEvent (evt); -} - class ipc_flock { struct flock fl; @@ -348,388 +219,88 @@ mq_open (const char *name, int oflag, ...) return (mqd_t) -1; } -static struct mq_info * -get_mqinfo (cygheap_fdget &fd) -{ - if (fd >= 0) - { - fhandler_mqueue *fh = fd->is_mqueue (); - if (fh) - return fh->mqinfo (); - set_errno (EINVAL); - } - return NULL; -} - extern "C" int mq_getattr (mqd_t mqd, struct mq_attr *mqstat) { - int n; - struct mq_hdr *mqhdr; - struct mq_fattr *attr; - struct mq_info *mqinfo; + int ret = -1; - __try - { - cygheap_fdget fd ((int) mqd, true); - mqinfo = get_mqinfo (fd); - if (mqinfo->mqi_magic != MQI_MAGIC) - { - set_errno (EBADF); - __leave; - } - mqhdr = mqinfo->mqi_hdr; - attr = &mqhdr->mqh_attr; - if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0) - { - errno = n; - __leave; - } - mqstat->mq_flags = mqinfo->mqi_flags; /* per-open */ - mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */ - mqstat->mq_msgsize = attr->mq_msgsize; - mqstat->mq_curmsgs = attr->mq_curmsgs; - - ipc_mutex_unlock (mqinfo->mqi_lock); - return 0; - } - __except (EBADF) {} - __endtry - return -1; + cygheap_fdget fd ((int) mqd, true); + fhandler_mqueue *fh = fd->is_mqueue (); + if (!fh) + set_errno (EBADF); + else + ret = fh->mq_getattr (mqstat); + return ret; } extern "C" int mq_setattr (mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat) { - int n; - struct mq_hdr *mqhdr; - struct mq_fattr *attr; - struct mq_info *mqinfo; + int ret = -1; - __try - { - cygheap_fdget fd ((int) mqd, true); - mqinfo = get_mqinfo (fd); - if (mqinfo->mqi_magic != MQI_MAGIC) - { - set_errno (EBADF); - __leave; - } - mqhdr = mqinfo->mqi_hdr; - attr = &mqhdr->mqh_attr; - if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0) - { - errno = n; - __leave; - } - - if (omqstat != NULL) - { - omqstat->mq_flags = mqinfo->mqi_flags; /* previous attributes */ - omqstat->mq_maxmsg = attr->mq_maxmsg; - omqstat->mq_msgsize = attr->mq_msgsize; - omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */ - } - - if (mqstat->mq_flags & O_NONBLOCK) - mqinfo->mqi_flags |= O_NONBLOCK; - else - mqinfo->mqi_flags &= ~O_NONBLOCK; - - ipc_mutex_unlock (mqinfo->mqi_lock); - return 0; - } - __except (EBADF) {} - __endtry - return -1; + cygheap_fdget fd ((int) mqd, true); + fhandler_mqueue *fh = fd->is_mqueue (); + if (!fh) + set_errno (EBADF); + else + ret = fh->mq_setattr (mqstat, omqstat); + return ret; } extern "C" int mq_notify (mqd_t mqd, const struct sigevent *notification) { - int n; - pid_t pid; - struct mq_hdr *mqhdr; - struct mq_info *mqinfo; - - __try - { - cygheap_fdget fd ((int) mqd, true); - mqinfo = get_mqinfo (fd); - if (mqinfo->mqi_magic != MQI_MAGIC) - { - set_errno (EBADF); - __leave; - } - mqhdr = mqinfo->mqi_hdr; - if ((n = ipc_mutex_lock (mqinfo->mqi_lock, false)) != 0) - { - errno = n; - __leave; - } - - pid = getpid (); - if (!notification) - { - if (mqhdr->mqh_pid == pid) - mqhdr->mqh_pid = 0; /* unregister calling process */ - } - else - { - if (mqhdr->mqh_pid != 0) - { - if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH) - { - set_errno (EBUSY); - ipc_mutex_unlock (mqinfo->mqi_lock); - __leave; - } - } - mqhdr->mqh_pid = pid; - mqhdr->mqh_event = *notification; - } - ipc_mutex_unlock (mqinfo->mqi_lock); - return 0; - } - __except (EBADF) {} - __endtry - return -1; -} - -static int -_mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio, - const struct timespec *abstime) -{ - int n; - long index, freeindex; - int8_t *mptr; - struct sigevent *sigev; - struct mq_hdr *mqhdr; - struct mq_fattr *attr; - struct msg_hdr *msghdr, *nmsghdr, *pmsghdr; - struct mq_info *mqinfo = NULL; - bool ipc_mutex_locked = false; int ret = -1; - pthread_testcancel (); - - __try - { - cygheap_fdget fd ((int) mqd); - mqinfo = get_mqinfo (fd); - if (mqinfo->mqi_magic != MQI_MAGIC) - { - set_errno (EBADF); - __leave; - } - if (prio >= MQ_PRIO_MAX) - { - set_errno (EINVAL); - __leave; - } - - mqhdr = mqinfo->mqi_hdr; /* struct pointer */ - mptr = (int8_t *) mqhdr; /* byte pointer */ - attr = &mqhdr->mqh_attr; - if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0) - { - errno = n; - __leave; - } - ipc_mutex_locked = true; - if (len > (size_t) attr->mq_msgsize) - { - set_errno (EMSGSIZE); - __leave; - } - if (attr->mq_curmsgs == 0) - { - if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) - { - sigev = &mqhdr->mqh_event; - if (sigev->sigev_notify == SIGEV_SIGNAL) - sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, - sigev->sigev_value); - mqhdr->mqh_pid = 0; /* unregister */ - } - } - else if (attr->mq_curmsgs >= attr->mq_maxmsg) - { - /* Queue is full */ - if (mqinfo->mqi_flags & O_NONBLOCK) - { - set_errno (EAGAIN); - __leave; - } - /* Wait for room for one message on the queue */ - while (attr->mq_curmsgs >= attr->mq_maxmsg) - { - int ret = ipc_cond_timedwait (mqinfo->mqi_waitsend, - mqinfo->mqi_lock, abstime); - if (ret != 0) - { - set_errno (ret); - __leave; - } - } - } - - /* nmsghdr will point to new message */ - if ((freeindex = mqhdr->mqh_free) == 0) - api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs); - - nmsghdr = (struct msg_hdr *) &mptr[freeindex]; - nmsghdr->msg_prio = prio; - nmsghdr->msg_len = len; - memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */ - mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */ - - /* Find right place for message in linked list */ - index = mqhdr->mqh_head; - pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head); - while (index) - { - msghdr = (struct msg_hdr *) &mptr[index]; - if (prio > msghdr->msg_prio) - { - nmsghdr->msg_next = index; - pmsghdr->msg_next = freeindex; - break; - } - index = msghdr->msg_next; - pmsghdr = msghdr; - } - if (index == 0) - { - /* Queue was empty or new goes at end of list */ - pmsghdr->msg_next = freeindex; - nmsghdr->msg_next = 0; - } - /* Wake up anyone blocked in mq_receive waiting for a message */ - if (attr->mq_curmsgs == 0) - ipc_cond_signal (mqinfo->mqi_waitrecv); - attr->mq_curmsgs++; - - ret = 0; - } - __except (EBADF) {} - __endtry - if (ipc_mutex_locked) - ipc_mutex_unlock (mqinfo->mqi_lock); + cygheap_fdget fd ((int) mqd, true); + fhandler_mqueue *fh = fd->is_mqueue (); + if (!fh) + set_errno (EBADF); + else + ret = fh->mq_notify (notification); return ret; } -extern "C" int -mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio) -{ - return _mq_send (mqd, ptr, len, prio, NULL); -} - extern "C" int mq_timedsend (mqd_t mqd, const char *ptr, size_t len, unsigned int prio, const struct timespec *abstime) { - return _mq_send (mqd, ptr, len, prio, abstime); + int ret = -1; + + cygheap_fdget fd ((int) mqd, true); + fhandler_mqueue *fh = fd->is_mqueue (); + if (!fh) + set_errno (EBADF); + else + ret = fh->mq_timedsend (ptr, len, prio, abstime); + return ret; } -static ssize_t -_mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop, +extern "C" int +mq_send (mqd_t mqd, const char *ptr, size_t len, unsigned int prio) +{ + return mq_timedsend (mqd, ptr, len, prio, NULL); +} + +extern "C" ssize_t +mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop, const struct timespec *abstime) { - int n; - long index; - int8_t *mptr; - ssize_t len = -1; - struct mq_hdr *mqhdr; - struct mq_fattr *attr; - struct msg_hdr *msghdr; - struct mq_info *mqinfo; - bool ipc_mutex_locked = false; + int ret = -1; - pthread_testcancel (); - - __try - { - cygheap_fdget fd ((int) mqd); - mqinfo = get_mqinfo (fd); - if (mqinfo->mqi_magic != MQI_MAGIC) - { - set_errno (EBADF); - __leave; - } - mqhdr = mqinfo->mqi_hdr; /* struct pointer */ - mptr = (int8_t *) mqhdr; /* byte pointer */ - attr = &mqhdr->mqh_attr; - if ((n = ipc_mutex_lock (mqinfo->mqi_lock, true)) != 0) - { - errno = n; - __leave; - } - ipc_mutex_locked = true; - if (maxlen < (size_t) attr->mq_msgsize) - { - set_errno (EMSGSIZE); - __leave; - } - if (attr->mq_curmsgs == 0) /* queue is empty */ - { - if (mqinfo->mqi_flags & O_NONBLOCK) - { - set_errno (EAGAIN); - __leave; - } - /* Wait for a message to be placed onto queue */ - mqhdr->mqh_nwait++; - while (attr->mq_curmsgs == 0) - { - int ret = ipc_cond_timedwait (mqinfo->mqi_waitrecv, - mqinfo->mqi_lock, abstime); - if (ret != 0) - { - set_errno (ret); - __leave; - } - } - mqhdr->mqh_nwait--; - } - - if ((index = mqhdr->mqh_head) == 0) - api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs); - - msghdr = (struct msg_hdr *) &mptr[index]; - mqhdr->mqh_head = msghdr->msg_next; /* new head of list */ - len = msghdr->msg_len; - memcpy(ptr, msghdr + 1, len); /* copy the message itself */ - if (priop != NULL) - *priop = msghdr->msg_prio; - - /* Just-read message goes to front of free list */ - msghdr->msg_next = mqhdr->mqh_free; - mqhdr->mqh_free = index; - - /* Wake up anyone blocked in mq_send waiting for room */ - if (attr->mq_curmsgs == attr->mq_maxmsg) - ipc_cond_signal (mqinfo->mqi_waitsend); - attr->mq_curmsgs--; - } - __except (EBADF) {} - __endtry - if (ipc_mutex_locked) - ipc_mutex_unlock (mqinfo->mqi_lock); - return len; + cygheap_fdget fd ((int) mqd, true); + fhandler_mqueue *fh = fd->is_mqueue (); + if (!fh) + set_errno (EBADF); + else + ret = fh->mq_timedrecv (ptr, maxlen, priop, abstime); + return ret; } extern "C" ssize_t mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop) { - return _mq_receive (mqd, ptr, maxlen, priop, NULL); -} - -extern "C" ssize_t -mq_timedreceive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop, - const struct timespec *abstime) -{ - return _mq_receive (mqd, ptr, maxlen, priop, abstime); + return mq_timedreceive (mqd, ptr, maxlen, priop, NULL); } extern "C" int