newlib-cygwin/winsup/cygwin/fhandler_mqueue.cc

1010 lines
25 KiB
C++

/* fhandler_mqueue.cc: fhandler for POSIX message queue
This file is part of Cygwin.
This software is a copyrighted work licensed under the terms of the
Cygwin license. Please consult the file "CYGWIN_LICENSE" for
details. */
#include "winsup.h"
#include "shared_info.h"
#include "path.h"
#include "fhandler.h"
#include "dtable.h"
#include "clock.h"
#include <stdio.h>
#include <mqueue.h>
#include <sys/param.h>
#define MSGSIZE(i) roundup((i), sizeof(long))
#define FILESIZE 80
struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
fhandler_mqueue::fhandler_mqueue () :
fhandler_disk_file ()
{
filebuf = (char *) ccalloc_abort (HEAP_BUF, 1, FILESIZE);
}
fhandler_mqueue::~fhandler_mqueue ()
{
cfree (filebuf);
}
bool
fhandler_mqueue::valid_path ()
{
const char *posix_basename = get_name () + MQ_LEN;
size_t len = strlen (posix_basename);
if (len > 0 && len <= NAME_MAX && !strpbrk (posix_basename, "/\\"))
return true;
return false;
}
int
fhandler_mqueue::open (int flags, mode_t mode)
{
if (!valid_path ())
{
set_errno (EINVAL);
return 0;
}
/* FIXME: reopen by handle semantics missing yet */
flags &= ~(O_NOCTTY | O_PATH | O_BINARY | O_TEXT);
return mq_open (flags, mode, NULL);
}
int
fhandler_mqueue::mq_open (int oflags, mode_t mode, struct mq_attr *attr)
{
NTSTATUS status;
IO_STATUS_BLOCK io;
PUNICODE_STRING mqstream;
OBJECT_ATTRIBUTES oa;
struct mq_info *mqinfo = NULL;
bool created = false;
if ((oflags & ~(O_ACCMODE | O_CLOEXEC | O_CREAT | O_EXCL | O_NONBLOCK))
|| (oflags & O_ACCMODE) == O_ACCMODE)
{
set_errno (EINVAL);
return 0;
}
/* attach a stream suffix to the NT filename, thus creating a stream. */
mqstream = pc.get_nt_native_path (&ro_u_mq_suffix);
pc.get_object_attr (oa, sec_none_nih);
again:
if (oflags & O_CREAT)
{
/* Create and disallow sharing */
status = NtCreateFile (&get_handle (),
GENERIC_READ | GENERIC_WRITE | DELETE
| SYNCHRONIZE, &oa, &io, NULL,
FILE_ATTRIBUTE_NORMAL, FILE_SHARE_DELETE,
FILE_CREATE,
FILE_OPEN_FOR_BACKUP_INTENT
| FILE_SYNCHRONOUS_IO_NONALERT,
NULL, 0);
if (!NT_SUCCESS (status))
{
if (status == STATUS_OBJECT_NAME_COLLISION && (oflags & O_EXCL) == 0)
goto exists;
__seterrno_from_nt_status (status);
return 0;
}
if (pc.has_acls ())
set_created_file_access (get_handle (), pc, mode);
created = true;
goto out;
}
exists:
/* Open the file, and loop while detecting a sharing violation. */
while (true)
{
status = NtOpenFile (&get_handle (),
GENERIC_READ | GENERIC_WRITE | SYNCHRONIZE,
&oa, &io, FILE_SHARE_VALID_FLAGS,
FILE_OPEN_FOR_BACKUP_INTENT
| FILE_SYNCHRONOUS_IO_NONALERT);
if (NT_SUCCESS (status))
break;
if (status == STATUS_OBJECT_NAME_NOT_FOUND && (oflags & O_CREAT))
goto again;
if (status != STATUS_SHARING_VIOLATION)
{
__seterrno_from_nt_status (status);
return 0;
}
Sleep (100L);
}
out:
/* We need the filename without STREAM_SUFFIX later on */
mqstream->Length -= ro_u_mq_suffix.Length;
mqstream->Buffer[mqstream->Length / sizeof (WCHAR)] = L'\0';
if (created)
{
if (attr == NULL)
attr = &defattr;
/* Check minimum and maximum values. The max values are pretty much
arbitrary, taken from the linux mq_overview man page, up to Linux
3.4. These max values make sure that the internal mq_fattr
structure can use 32 bit types. */
if (attr->mq_maxmsg <= 0 || attr->mq_maxmsg > 32768
|| attr->mq_msgsize <= 0 || attr->mq_msgsize > 1048576)
set_errno (EINVAL);
else
mqinfo = mqinfo_create (attr, mode, oflags & O_NONBLOCK);
}
else
mqinfo = mqinfo_open (oflags & O_NONBLOCK);
mq_open_finish (mqinfo != NULL, created);
/* Set fhandler open flags */
if (mqinfo)
{
set_access (GENERIC_READ | SYNCHRONIZE);
close_on_exec (true);
set_flags (oflags | O_CLOEXEC, O_BINARY);
set_open_status ();
}
return mqinfo ? 1 : 0;
}
struct mq_info *
fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags,
bool just_open)
{
WCHAR buf[NAME_MAX + sizeof ("mqueue/XXX")];
UNICODE_STRING uname;
OBJECT_ATTRIBUTES oa;
NTSTATUS status;
LARGE_INTEGER fsiz = { QuadPart: (LONGLONG) filesize };
PVOID mptr = NULL;
/* Set sectsize prior to using filesize in NtMapViewOfSection. It will
get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */
mqinfo ()->mqi_sectsize = filesize;
mqinfo ()->mqi_mode = mode;
set_nonblocking (flags & O_NONBLOCK);
__small_swprintf (buf, L"mqueue/mtx%s", get_name ());
RtlInitUnicodeString (&uname, buf);
InitializeObjectAttributes (&oa, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
get_shared_parent_dir (),
everyone_sd (CYG_MUTANT_ACCESS));
status = NtCreateMutant (&mqinfo ()->mqi_lock, CYG_MUTANT_ACCESS, &oa,
FALSE);
if (!NT_SUCCESS (status))
goto err;
wcsncpy (buf + 7, L"snd", 3);
/* same length, no RtlInitUnicodeString required */
InitializeObjectAttributes (&oa, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
get_shared_parent_dir (),
everyone_sd (CYG_EVENT_ACCESS));
status = NtCreateEvent (&mqinfo ()->mqi_waitsend, CYG_EVENT_ACCESS, &oa,
NotificationEvent, FALSE);
if (!NT_SUCCESS (status))
goto err;
wcsncpy (buf + 7, L"rcv", 3);
/* same length, same attributes, no more init required */
status = NtCreateEvent (&mqinfo ()->mqi_waitrecv, CYG_EVENT_ACCESS, &oa,
NotificationEvent, FALSE);
if (!NT_SUCCESS (status))
goto err;
InitializeObjectAttributes (&oa, NULL, 0, NULL, NULL);
status = NtCreateSection (&mqinfo ()->mqi_sect, SECTION_ALL_ACCESS, &oa,
&fsiz, PAGE_READWRITE, SEC_COMMIT, get_handle ());
if (!NT_SUCCESS (status))
goto err;
status = NtMapViewOfSection (mqinfo ()->mqi_sect, NtCurrentProcess (),
&mptr, 0, filesize, NULL, &filesize,
ViewShare, MEM_TOP_DOWN, PAGE_READWRITE);
if (!NT_SUCCESS (status))
goto err;
mqinfo ()->mqi_hdr = (struct mq_hdr *) mptr;
/* Special problem on Cygwin. /dev/mqueue is just a simple dir,
so there's a chance normal files are created in there. */
if (just_open && mqinfo ()->mqi_hdr->mqh_magic != MQI_MAGIC)
{
status = STATUS_ACCESS_DENIED;
goto err;
}
mqinfo ()->mqi_magic = MQI_MAGIC;
return mqinfo ();
err:
if (mqinfo ()->mqi_sect)
NtClose (mqinfo ()->mqi_sect);
if (mqinfo ()->mqi_waitrecv)
NtClose (mqinfo ()->mqi_waitrecv);
if (mqinfo ()->mqi_waitsend)
NtClose (mqinfo ()->mqi_waitsend);
if (mqinfo ()->mqi_lock)
NtClose (mqinfo ()->mqi_lock);
__seterrno_from_nt_status (status);
return NULL;
}
struct mq_info *
fhandler_mqueue::mqinfo_open (int flags)
{
FILE_STANDARD_INFORMATION fsi;
IO_STATUS_BLOCK io;
NTSTATUS status;
mode_t mode;
fsi.EndOfFile.QuadPart = 0;
status = NtQueryInformationFile (get_handle (), &io, &fsi, sizeof fsi,
FileStandardInformation);
if (!NT_SUCCESS (status))
{
__seterrno_from_nt_status (status);
return NULL;
}
if (get_file_attribute (get_handle (), pc, &mode, NULL, NULL))
mode = STD_RBITS | STD_WBITS;
return _mqinfo (fsi.EndOfFile.QuadPart, mode, flags, true);
}
struct mq_info *
fhandler_mqueue::mqinfo_create (struct mq_attr *attr, mode_t mode, int flags)
{
long msgsize;
off_t filesize = 0;
FILE_END_OF_FILE_INFORMATION feofi;
IO_STATUS_BLOCK io;
NTSTATUS status;
struct mq_info *mqinfo = NULL;
msgsize = MSGSIZE (attr->mq_msgsize);
filesize = sizeof (struct mq_hdr)
+ (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
feofi.EndOfFile.QuadPart = filesize;
status = NtSetInformationFile (get_handle (), &io, &feofi, sizeof feofi,
FileEndOfFileInformation);
if (!NT_SUCCESS (status))
{
__seterrno_from_nt_status (status);
return NULL;
}
mqinfo = _mqinfo (filesize, mode, flags, false);
if (mqinfo)
{
/* Initialize header at beginning of file */
/* Create free list with all messages on it */
int8_t *mptr;
struct mq_hdr *mqhdr;
struct msg_hdr *msghdr;
mptr = (int8_t *) mqinfo->mqi_hdr;
mqhdr = mqinfo->mqi_hdr;
mqhdr->mqh_attr.mq_flags = 0;
mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
mqhdr->mqh_attr.mq_curmsgs = 0;
mqhdr->mqh_nwait = 0;
mqhdr->mqh_pid = 0;
mqhdr->mqh_head = 0;
mqhdr->mqh_magic = MQI_MAGIC;
long index = sizeof (struct mq_hdr);
mqhdr->mqh_free = index;
for (int i = 0; i < attr->mq_maxmsg - 1; i++)
{
msghdr = (struct msg_hdr *) &mptr[index];
index += sizeof (struct msg_hdr) + msgsize;
msghdr->msg_next = index;
}
msghdr = (struct msg_hdr *) &mptr[index];
msghdr->msg_next = 0; /* end of free list */
}
return mqinfo;
}
void
fhandler_mqueue::mq_open_finish (bool success, bool created)
{
NTSTATUS status;
HANDLE def_stream;
OBJECT_ATTRIBUTES oa;
IO_STATUS_BLOCK io;
if (get_handle ())
{
/* If we have an open queue stream handle, close it and set it to NULL */
HANDLE queue_stream = get_handle ();
set_handle (NULL);
if (success)
{
/* In case of success, open the default stream for reading. This
can be used to implement various IO functions without exposing
the actual message queue. */
pc.get_object_attr (oa, sec_none_nih);
status = NtOpenFile (&def_stream, GENERIC_READ | SYNCHRONIZE,
&oa, &io, FILE_SHARE_VALID_FLAGS,
FILE_OPEN_FOR_BACKUP_INTENT
| FILE_SYNCHRONOUS_IO_NONALERT);
if (NT_SUCCESS (status))
set_handle (def_stream);
else /* Note that we don't treat this as an error! */
{
debug_printf ("Opening default stream failed: status %y", status);
nohandle (true);
}
}
else if (created)
{
/* In case of error at creation time, delete the file */
FILE_DISPOSITION_INFORMATION disp = { TRUE };
NtSetInformationFile (queue_stream, &io, &disp, sizeof disp,
FileDispositionInformation);
/* We also have to set the delete disposition on the default stream,
otherwise only the queue stream will get deleted */
pc.get_object_attr (oa, sec_none_nih);
status = NtOpenFile (&def_stream, DELETE, &oa, &io,
FILE_SHARE_VALID_FLAGS,
FILE_OPEN_FOR_BACKUP_INTENT);
if (NT_SUCCESS (status))
{
NtSetInformationFile (def_stream, &io, &disp, sizeof disp,
FileDispositionInformation);
NtClose (def_stream);
}
}
NtClose (queue_stream);
}
}
char *
fhandler_mqueue::get_proc_fd_name (char *buf)
{
return strcpy (buf, strrchr (get_name (), '/'));
}
int
fhandler_mqueue::fcntl (int cmd, intptr_t arg)
{
int res;
switch (cmd)
{
case F_GETFD:
res = close_on_exec () ? FD_CLOEXEC : 0;
break;
case F_GETFL:
res = get_flags ();
debug_printf ("GETFL: %y", res);
break;
default:
set_errno (EINVAL);
res = -1;
break;
}
return res;
}
/* Do what fhandler_virtual does for read/lseek */
bool
fhandler_mqueue::fill_filebuf ()
{
unsigned long qsize = 0;
int notify = 0;
int signo = 0;
int notify_pid = 0;
if (mutex_lock (mqinfo ()->mqi_lock, true) == 0)
{
struct mq_hdr *mqhdr = mqinfo ()->mqi_hdr;
int8_t *mptr = (int8_t *) mqhdr;
struct msg_hdr *msghdr;
for (long index = mqhdr->mqh_head; index; index = msghdr->msg_next)
{
msghdr = (struct msg_hdr *) &mptr[index];
qsize += msghdr->msg_len;
}
if (mqhdr->mqh_pid)
{
notify = mqhdr->mqh_event.sigev_notify;
if (notify == SIGEV_SIGNAL)
signo = mqhdr->mqh_event.sigev_signo;
notify_pid = mqhdr->mqh_pid;
}
mutex_unlock (mqinfo ()->mqi_lock);
}
/* QSIZE: bytes of all current msgs
NOTIFY: sigev_notify if there's a notifier
SIGNO: signal number if NOTIFY && sigev_notify == SIGEV_SIGNAL
NOTIFY_PID: if NOTIFY pid */
snprintf (filebuf, FILESIZE,
"QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
qsize, notify, signo, notify_pid);
filesize = strlen (filebuf);
return true;
}
void __reg3
fhandler_mqueue::read (void *in_ptr, size_t& len)
{
if (len == 0)
return;
if (!filebuf[0] && !fill_filebuf ())
{
len = (size_t) -1;
return;
}
if ((ssize_t) len > filesize - position)
len = (size_t) (filesize - position);
if ((ssize_t) len < 0)
len = 0;
else
memcpy (in_ptr, filebuf + position, len);
position += len;
}
off_t
fhandler_mqueue::lseek (off_t offset, int whence)
{
if (!fill_filebuf ())
return (off_t) -1;
switch (whence)
{
case SEEK_SET:
position = offset;
break;
case SEEK_CUR:
position += offset;
break;
case SEEK_END:
position = filesize + offset;
break;
default:
set_errno (EINVAL);
return (off_t) -1;
}
return position;
}
int __reg2
fhandler_mqueue::fstat (struct stat *buf)
{
int ret = fhandler_disk_file::fstat (buf);
if (!ret)
{
buf->st_size = FILESIZE;
buf->st_dev = FH_MQUEUE;
}
return ret;
}
int
fhandler_mqueue::_dup (HANDLE parent, fhandler_mqueue *fhc)
{
__try
{
PVOID mptr = NULL;
SIZE_T filesize = mqinfo ()->mqi_sectsize;
NTSTATUS status;
if (!DuplicateHandle (parent, mqinfo ()->mqi_sect,
GetCurrentProcess (), &fhc->mqinfo ()->mqi_sect,
0, FALSE, DUPLICATE_SAME_ACCESS))
__leave;
status = NtMapViewOfSection (mqinfo ()->mqi_sect, NtCurrentProcess (),
&mptr, 0, filesize, NULL, &filesize,
ViewShare, MEM_TOP_DOWN, PAGE_READWRITE);
if (!NT_SUCCESS (status))
api_fatal ("Mapping message queue failed in fork, status 0x%x\n",
status);
fhc->mqinfo ()->mqi_hdr = (struct mq_hdr *) mptr;
if (!DuplicateHandle (parent, mqinfo ()->mqi_waitsend,
GetCurrentProcess (), &fhc->mqinfo ()->mqi_waitsend,
0, FALSE, DUPLICATE_SAME_ACCESS))
__leave;
if (!DuplicateHandle (parent, mqinfo ()->mqi_waitrecv,
GetCurrentProcess (), &fhc->mqinfo ()->mqi_waitrecv,
0, FALSE, DUPLICATE_SAME_ACCESS))
__leave;
if (!DuplicateHandle (parent, mqinfo ()->mqi_lock,
GetCurrentProcess (), &fhc->mqinfo ()->mqi_lock,
0, FALSE, DUPLICATE_SAME_ACCESS))
__leave;
return 0;
}
__except (EFAULT) {}
__endtry
return -1;
}
int
fhandler_mqueue::dup (fhandler_base *child, int flags)
{
fhandler_mqueue *fhc = (fhandler_mqueue *) child;
int ret = fhandler_disk_file::dup (child, flags);
if (!ret)
{
memcpy (fhc->filebuf, filebuf, FILESIZE);
ret = _dup (GetCurrentProcess (), fhc);
}
return ret;
}
void
fhandler_mqueue::fixup_after_fork (HANDLE parent)
{
if (_dup (parent, this))
api_fatal ("Creating IPC object failed in fork, %E");
}
int
fhandler_mqueue::ioctl (unsigned int cmd, void *buf)
{
return fhandler_base::ioctl (cmd, buf);
}
int
fhandler_mqueue::close ()
{
__try
{
mqinfo ()->mqi_magic = 0; /* just in case */
NtUnmapViewOfSection (NtCurrentProcess (), mqinfo ()->mqi_hdr);
NtClose (mqinfo ()->mqi_sect);
NtClose (mqinfo ()->mqi_waitsend);
NtClose (mqinfo ()->mqi_waitrecv);
NtClose (mqinfo ()->mqi_lock);
}
__except (0) {}
__endtry
return 0;
}
int
fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr)
{
switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
| (eintr ? cw_sig_eintr : cw_sig_restart)))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
return 0;
case WAIT_SIGNALED:
set_errno (EINTR);
return 1;
default:
break;
}
return geterrno_from_win_error ();
}
int
fhandler_mqueue::mutex_unlock (HANDLE mtx)
{
return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
}
int
fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx,
const struct timespec *abstime)
{
HANDLE w4[4] = { evt, };
DWORD cnt = 2;
DWORD timer_idx = 0;
int ret = 0;
wait_signal_arrived here (w4[1]);
if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
++cnt;
if (abstime)
{
if (!valid_timespec (*abstime))
return EINVAL;
/* If a timeout is set, we create a waitable timer to wait for.
This is the easiest way to handle the absolute timeout value, given
that NtSetTimer also takes absolute times and given the double
dependency on evt *and* mtx, which requires to call WFMO twice. */
NTSTATUS status;
LARGE_INTEGER duetime;
timer_idx = cnt++;
status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
NotificationTimer);
if (!NT_SUCCESS (status))
return geterrno_from_nt_status (status);
timespec_to_filetime (abstime, &duetime);
status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
if (!NT_SUCCESS (status))
{
NtClose (w4[timer_idx]);
return geterrno_from_nt_status (status);
}
}
ResetEvent (evt);
if ((ret = mutex_unlock (mtx)) != 0)
return ret;
/* Everything's set up, so now wait for the event to be signalled. */
restart1:
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart1;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread::static_cancel_self ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
if (ret == 0)
{
/* At this point we need to lock the mutex. The wait is practically
the same as before, just that we now wait on the mutex instead of the
event. */
restart2:
w4[0] = mtx;
switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
case WAIT_ABANDONED_0:
break;
case WAIT_OBJECT_0 + 1:
if (_my_tls.call_signal_handler ())
goto restart2;
ret = EINTR;
break;
case WAIT_OBJECT_0 + 2:
if (timer_idx != 2)
pthread_testcancel ();
fallthrough;
case WAIT_OBJECT_0 + 3:
ret = ETIMEDOUT;
break;
default:
ret = geterrno_from_win_error ();
break;
}
}
if (timer_idx)
{
if (ret != ETIMEDOUT)
NtCancelTimer (w4[timer_idx], NULL);
NtClose (w4[timer_idx]);
}
return ret;
}
void
fhandler_mqueue::cond_signal (HANDLE evt)
{
SetEvent (evt);
}
int
fhandler_mqueue::mq_getattr (struct mq_attr *mqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; /* per-open */
mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
mqstat->mq_msgsize = attr->mq_msgsize;
mqstat->mq_curmsgs = attr->mq_curmsgs;
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat,
struct mq_attr *omqstat)
{
int n;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
if (omqstat != NULL)
{
omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0;
omqstat->mq_maxmsg = attr->mq_maxmsg;
omqstat->mq_msgsize = attr->mq_msgsize;
omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
}
set_nonblocking (mqstat->mq_flags & O_NONBLOCK);
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_notify (const struct sigevent *notification)
{
int n;
pid_t pid;
struct mq_hdr *mqhdr;
__try
{
mqhdr = mqinfo ()->mqi_hdr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
{
errno = n;
__leave;
}
pid = myself->pid;
if (!notification)
{
if (mqhdr->mqh_pid == pid)
mqhdr->mqh_pid = 0; /* unregister calling process */
}
else
{
if (mqhdr->mqh_pid != 0)
{
if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
{
set_errno (EBUSY);
mutex_unlock (mqinfo ()->mqi_lock);
__leave;
}
}
mqhdr->mqh_pid = pid;
mqhdr->mqh_event = *notification;
}
mutex_unlock (mqinfo ()->mqi_lock);
return 0;
}
__except (EBADF) {}
__endtry
return -1;
}
int
fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio,
const struct timespec *abstime)
{
int n;
long index, freeindex;
int8_t *mptr;
struct sigevent *sigev;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
bool mutex_locked = false;
int ret = -1;
pthread_testcancel ();
__try
{
if (prio >= MQ_PRIO_MAX)
{
set_errno (EINVAL);
__leave;
}
mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
mutex_locked = true;
if (len > (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0)
{
if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
{
sigev = &mqhdr->mqh_event;
if (sigev->sigev_notify == SIGEV_SIGNAL)
sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
sigev->sigev_value);
mqhdr->mqh_pid = 0; /* unregister */
}
}
else if (attr->mq_curmsgs >= attr->mq_maxmsg)
{
/* Queue is full */
if (is_nonblocking ())
{
set_errno (EAGAIN);
__leave;
}
/* Wait for room for one message on the queue */
while (attr->mq_curmsgs >= attr->mq_maxmsg)
{
int ret = cond_timedwait (mqinfo ()->mqi_waitsend,
mqinfo ()->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
}
/* nmsghdr will point to new message */
if ((freeindex = mqhdr->mqh_free) == 0)
api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
nmsghdr = (struct msg_hdr *) &mptr[freeindex];
nmsghdr->msg_prio = prio;
nmsghdr->msg_len = len;
memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
/* Find right place for message in linked list */
index = mqhdr->mqh_head;
pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
while (index)
{
msghdr = (struct msg_hdr *) &mptr[index];
if (prio > msghdr->msg_prio)
{
nmsghdr->msg_next = index;
pmsghdr->msg_next = freeindex;
break;
}
index = msghdr->msg_next;
pmsghdr = msghdr;
}
if (index == 0)
{
/* Queue was empty or new goes at end of list */
pmsghdr->msg_next = freeindex;
nmsghdr->msg_next = 0;
}
/* Wake up anyone blocked in mq_receive waiting for a message */
if (attr->mq_curmsgs == 0)
cond_signal (mqinfo ()->mqi_waitrecv);
attr->mq_curmsgs++;
ret = 0;
}
__except (EBADF) {}
__endtry
if (mutex_locked)
mutex_unlock (mqinfo ()->mqi_lock);
return ret;
}
ssize_t
fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
const struct timespec *abstime)
{
int n;
long index;
int8_t *mptr;
ssize_t len = -1;
struct mq_hdr *mqhdr;
struct mq_fattr *attr;
struct msg_hdr *msghdr;
bool mutex_locked = false;
pthread_testcancel ();
__try
{
mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
mptr = (int8_t *) mqhdr; /* byte pointer */
attr = &mqhdr->mqh_attr;
if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
{
errno = n;
__leave;
}
mutex_locked = true;
if (maxlen < (size_t) attr->mq_msgsize)
{
set_errno (EMSGSIZE);
__leave;
}
if (attr->mq_curmsgs == 0) /* queue is empty */
{
if (is_nonblocking ())
{
set_errno (EAGAIN);
__leave;
}
/* Wait for a message to be placed onto queue */
mqhdr->mqh_nwait++;
while (attr->mq_curmsgs == 0)
{
int ret = cond_timedwait (mqinfo ()->mqi_waitrecv,
mqinfo ()->mqi_lock, abstime);
if (ret != 0)
{
set_errno (ret);
__leave;
}
}
mqhdr->mqh_nwait--;
}
if ((index = mqhdr->mqh_head) == 0)
api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
msghdr = (struct msg_hdr *) &mptr[index];
mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
len = msghdr->msg_len;
memcpy(ptr, msghdr + 1, len); /* copy the message itself */
if (priop != NULL)
*priop = msghdr->msg_prio;
/* Just-read message goes to front of free list */
msghdr->msg_next = mqhdr->mqh_free;
mqhdr->mqh_free = index;
/* Wake up anyone blocked in mq_send waiting for room */
if (attr->mq_curmsgs == attr->mq_maxmsg)
cond_signal (mqinfo ()->mqi_waitsend);
attr->mq_curmsgs--;
}
__except (EBADF) {}
__endtry
if (mutex_locked)
mutex_unlock (mqinfo ()->mqi_lock);
return len;
}