Cygwin: AF_UNIX: first cut of mqueue modifications

This commit is contained in:
Ken Brown 2021-06-04 12:19:02 -04:00
parent fdeedfb979
commit 19a7e4723c
5 changed files with 117 additions and 17 deletions

View File

@ -3149,7 +3149,8 @@ public:
int mq_timedsend (const char *, size_t, unsigned int,
const struct timespec *);
ssize_t mq_timedrecv (char *, size_t, unsigned int *,
const struct timespec *);
const struct timespec *, uint32_t = 0);
void mq_unlock ();
struct mq_info *mqinfo () { return &mqi; }

View File

@ -931,7 +931,7 @@ fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio,
ssize_t
fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
const struct timespec *abstime)
const struct timespec *abstime, uint32_t flags)
{
int n;
long index;
@ -941,6 +941,7 @@ fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
struct mq_fattr *attr;
struct msg_hdr *msghdr;
bool mutex_locked = false;
bool keep_packet = flags & (_MQ_PEEK | _MQ_PEEK_NONBLOCK);
pthread_testcancel ();
@ -955,14 +956,17 @@ fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
__leave;
}
mutex_locked = true;
if (maxlen < (size_t) attr->mq_msgsize)
/* Check if maxlen is too small. When called from _mq_recv, that's ok,
but for actual user space message queues, that's an error condition. */
if (maxlen < (size_t) attr->mq_msgsize && !(flags & _MQ_ALLOW_PARTIAL))
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0) /* queue is empty */
{
if (is_nonblocking ())
if (is_nonblocking () || (flags & _MQ_PEEK_NONBLOCK))
{
set_errno (EAGAIN);
__leave;
@ -986,24 +990,37 @@ fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
msghdr = (struct msg_hdr *) &mptr[index];
mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
len = msghdr->msg_len;
memcpy(ptr, msghdr + 1, len); /* copy the message itself */
/* Partial read? */
if (maxlen < msghdr->msg_len)
len = maxlen;
else
len = msghdr->msg_len;
memcpy(ptr, msghdr + 1, len); /* copy the message itself */
if (priop != NULL)
*priop = msghdr->msg_prio;
/* Just-read message goes to front of free list */
msghdr->msg_next = mqhdr->mqh_free;
mqhdr->mqh_free = index;
/* Wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
cond_signal (mqinfo ()->mqi_waitsend);
attr->mq_curmsgs--;
if (!keep_packet)
{
mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
/* Just-read message goes to front of free list */
msghdr->msg_next = mqhdr->mqh_free;
mqhdr->mqh_free = index;
/* Wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
cond_signal (mqinfo ()->mqi_waitsend);
attr->mq_curmsgs--;
}
}
__except (EBADF) {}
__endtry
if (mutex_locked)
if (mutex_locked && !(flags & _MQ_HOLD_LOCK))
mutex_unlock (mqinfo ()->mqi_lock);
return len;
}
void
fhandler_mqueue::mq_unlock ()
{
mutex_unlock (mqinfo ()->mqi_lock);
}

View File

@ -10,6 +10,7 @@
#include <sys/types.h>
#include <sys/signal.h>
#include <sys/cdefs.h>
#include <cygwin/bits.h>
#ifndef _MQUEUE_H
#define _MQUEUE_H
@ -38,6 +39,25 @@ int mq_timedsend (mqd_t, const char *, size_t, unsigned int,
const struct timespec *);
int mq_unlink (const char *name);
#ifdef __INSIDE_CYGWIN__
enum
{
_MQ_ALLOW_PARTIAL = _BIT ( 0), /* allow partial reads */
_MQ_PEEK = _BIT ( 1), /* Peek into the packet, return data,
but don't touch the packet at all
(MSG_PEEK) */
_MQ_PEEK_NONBLOCK = _BIT ( 2), /* Peek into the packet, return data,
but don't touch the packet at all,
and don't block (grab_admin_pkt) */
_MQ_HOLD_LOCK = _BIT ( 3), /* Don't unlock mutex after reading. */
};
ssize_t _mq_recv (mqd_t, char *, size_t, int);
ssize_t _mq_timedrecv (mqd_t, char *, size_t, const struct timespec *, int);
ssize_t _mq_peek (mqd_t, char *, size_t, bool);
int _mq_unlock (mqd_t);
#endif
__END_DECLS
#endif /* _MQUEUE_H */

View File

@ -46,7 +46,7 @@ struct mq_hdr
struct msg_hdr
{
int32_t msg_next; /* index of next on linked list */
int32_t msg_len; /* actual length */
uint32_t msg_len; /* actual length */
unsigned int msg_prio; /* priority */
};
#pragma pack (pop)

View File

@ -303,6 +303,68 @@ mq_receive (mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
return mq_timedreceive (mqd, ptr, maxlen, priop, NULL);
}
/* Internal function to allow reading partial message packets.
Used from AF_UNIX code. */
extern "C" ssize_t
_mq_recv (mqd_t mqd, char *ptr, size_t maxlen, int flags)
{
return _mq_timedrecv (mqd, ptr, maxlen, NULL, flags);
}
/* Internal function to allow reading partial message packets with timeout.
Used from AF_UNIX code. */
extern "C" ssize_t
_mq_timedrecv (mqd_t mqd, char *ptr, size_t maxlen,
const struct timespec *abstime, int flags)
{
int ret = -1;
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
else
ret = fh->mq_timedrecv (ptr, maxlen, NULL, abstime,
_MQ_ALLOW_PARTIAL | flags);
return ret;
}
/* Internal function to allow peeking into message packets.
Used from AF_UNIX code. */
extern "C" ssize_t
_mq_peek (mqd_t mqd, char *ptr, size_t maxlen, bool nonblocking)
{
int ret = -1;
int flags = _MQ_ALLOW_PARTIAL | (nonblocking ? _MQ_PEEK_NONBLOCK : _MQ_PEEK);
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
else
ret = fh->mq_timedrecv (ptr, maxlen, NULL, NULL, flags);
return ret;
}
/* Internal function to unlock a queue after mq_receive was called
with _MQ_HOLD_LOCK. Used from AF_UNIX code. */
extern "C" int
_mq_unlock (mqd_t mqd)
{
int ret = -1;
cygheap_fdget fd ((int) mqd, true);
fhandler_mqueue *fh = fd->is_mqueue ();
if (!fh)
set_errno (EBADF);
else
{
fh->mq_unlock ();
ret = 0;
}
return ret;
}
extern "C" int
mq_close (mqd_t mqd)
{