From 37226b27637404600525e4230933043d8105879a Mon Sep 17 00:00:00 2001 From: Ken Brown Date: Sun, 9 May 2021 18:15:56 -0400 Subject: [PATCH] Cygwin: AF_UNIX: adapt connect and accept to mqueues - Flesh out connect_mqueue, which replaces connect_pipe. - A new method wait_mqueue replaces wait_pipe. - A new method wait_mqueue_thread replaces wait_pipe_thread. - New methods send_mqueue_name and recv_peer_mqueue_name are used for making connection requests and responding to them. - A new method xchg_sock_info is called after a connecting socket successfully receives its peer's mqueue name. This functionality was previously in open_pipe. - Remove open_pipe, listen_pipe, and create_pipe_instance, which are no longer needed. --- winsup/cygwin/fhandler.h | 12 +- winsup/cygwin/fhandler_socket_unix.cc | 526 +++++++++++--------------- 2 files changed, 219 insertions(+), 319 deletions(-) diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index 6555e9e82..e608551b3 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -1095,17 +1095,17 @@ class fhandler_socket_unix : public fhandler_socket char get_type_char (); void set_pipe_non_blocking (bool nonblocking); int send_sock_info (bool from_bind); + void xchg_sock_info (); int grab_admin_pkt (bool peek = true); int recv_peer_info (); static NTSTATUS npfs_handle (HANDLE &nph); int create_mqueue (bool listener = false); - HANDLE create_pipe_instance (); - NTSTATUS open_pipe (PUNICODE_STRING pipe_name, bool xchg_sock_info); mqd_t open_mqueue (const char *mqueue_name, bool nonblocking); - int wait_pipe (PUNICODE_STRING pipe_name); int connect_mqueue (const char *mqueue_name); - int connect_pipe (PUNICODE_STRING pipe_name); - int listen_pipe (); + int wait_mqueue (mqd_t mqd); + int send_mqueue_name (mqd_t mqd, bool wait); + int recv_peer_mqueue_name (bool set = true, bool timeout = true, + mqd_t *mqd = NULL); ssize_t peek_mqueue (char *buf, size_t buflen, bool nonblocking = true); int disconnect_pipe (HANDLE ph); /* The NULL pointer check is required for FS methods like fstat. When @@ -1143,7 +1143,7 @@ class fhandler_socket_unix : public fhandler_socket int dup (fhandler_base *child, int); - DWORD wait_pipe_thread (PUNICODE_STRING pipe_name); + DWORD wait_mqueue_thread (mqd_t mqd); int socket (int af, int type, int protocol, int flags); int socketpair (int af, int type, int protocol, int flags, diff --git a/winsup/cygwin/fhandler_socket_unix.cc b/winsup/cygwin/fhandler_socket_unix.cc index 19483b168..54bb0d9b9 100644 --- a/winsup/cygwin/fhandler_socket_unix.cc +++ b/winsup/cygwin/fhandler_socket_unix.cc @@ -166,7 +166,7 @@ AF_UNIX_PKT_DATA_APPEND (af_unix_pkt_hdr_t *phdr, void *data, uint16_t dlen) || _s == STATUS_MORE_PROCESSING_REQUIRED; }) /* Default timeout value of connect: 20 secs, as on Linux. */ -#define AF_UNIX_CONNECT_TIMEOUT (-20 * NS100PERSEC) +#define AF_UNIX_CONNECT_TIMEOUT 20 /* Message queue priorities */ enum @@ -257,9 +257,6 @@ fhandler_socket_unix::reopen_shmem () return 0; } -/* Character length of pipe name, excluding trailing NUL. */ -#define CYGWIN_PIPE_SOCKET_NAME_LEN 47 - /* Character length of mqueue name, excluding trailing NUL. */ #define CYGWIN_MQUEUE_SOCKET_NAME_LEN 27 @@ -694,6 +691,15 @@ fhandler_socket_unix::send_sock_info (bool from_bind) return ret; } +/* Called from connect_mqueue and wait_mqueue after successfully + getting peer's mqueue name. */ +void +fhandler_socket_unix::xchg_sock_info () +{ + send_sock_info (false); + recv_peer_info (); +} + /* Reads an administrative packet from the pipe and handles it. If PEEK is true, checks first to see if the next packet in the pipe is an administrative packet; otherwise the caller must set io_lock and @@ -754,10 +760,6 @@ fhandler_socket_unix::grab_admin_pkt (bool peek) return 0; } -/* FIXME: This is temporary until we no longer need the old - AF_UNIX_CONNECT_TIMEOUT. */ -#define AF_UNIX_CONNECT_TIMEOUT_MQ 20 - /* Returns an error code. Locking is not required when called from accept4, user space doesn't know about this socket yet. */ int @@ -773,7 +775,7 @@ fhandler_socket_unix::recv_peer_info () packet = (af_unix_pkt_hdr_t *) alloca (len); set_mqueue_non_blocking (get_mqd_in (), false); clock_gettime (CLOCK_REALTIME, &timeout); - timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT_MQ; + timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT; if (_mq_timedrecv (get_mqd_in (), (char *) packet, len, &timeout, 0) < 0) ret = get_errno (); if (ret == ETIMEDOUT) @@ -860,79 +862,6 @@ fhandler_socket_unix::create_mqueue (bool listener) return 0; } -HANDLE -fhandler_socket_unix::create_pipe_instance () -{ - NTSTATUS status; - HANDLE npfsh; - HANDLE ph; - ACCESS_MASK access; - OBJECT_ATTRIBUTES attr; - IO_STATUS_BLOCK io; - ULONG sharing; - ULONG nonblocking; - ULONG max_instances; - LARGE_INTEGER timeout; - - status = npfs_handle (npfsh); - if (!NT_SUCCESS (status)) - { - __seterrno_from_nt_status (status); - return NULL; - } - access = GENERIC_READ | FILE_READ_ATTRIBUTES - | GENERIC_WRITE | FILE_WRITE_ATTRIBUTES - | SYNCHRONIZE; - sharing = FILE_SHARE_READ | FILE_SHARE_WRITE; - /* NPFS doesn't understand reopening by handle, unfortunately. */ - InitializeObjectAttributes (&attr, pc.get_nt_native_path (), OBJ_INHERIT, - npfsh, NULL); - nonblocking = is_nonblocking () ? FILE_PIPE_COMPLETE_OPERATION - : FILE_PIPE_QUEUE_OPERATION; - max_instances = (get_socket_type () == SOCK_DGRAM) ? 1 : -1; - timeout.QuadPart = -500000; - status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing, - FILE_OPEN, 0, - FILE_PIPE_MESSAGE_TYPE, - FILE_PIPE_MESSAGE_MODE, - nonblocking, max_instances, - rmem (), wmem (), &timeout); - if (!NT_SUCCESS (status)) - __seterrno_from_nt_status (status); - return ph; -} - -NTSTATUS -fhandler_socket_unix::open_pipe (PUNICODE_STRING pipe_name, bool xchg_sock_info) -{ - NTSTATUS status; - HANDLE npfsh; - ACCESS_MASK access; - OBJECT_ATTRIBUTES attr; - IO_STATUS_BLOCK io; - ULONG sharing; - HANDLE ph = NULL; - - status = npfs_handle (npfsh); - if (!NT_SUCCESS (status)) - return status; - access = GENERIC_READ | GENERIC_WRITE | SYNCHRONIZE; - InitializeObjectAttributes (&attr, pipe_name, OBJ_INHERIT, npfsh, NULL); - sharing = FILE_SHARE_READ | FILE_SHARE_WRITE; - status = NtOpenFile (&ph, access, &attr, &io, sharing, 0); - if (NT_SUCCESS (status)) - { - set_handle (ph); - if (xchg_sock_info) - { - /* FIXME: Should we check for errors? */ - send_sock_info (false); - recv_peer_info (); - } - } - return status; -} - mqd_t fhandler_socket_unix::open_mqueue (const char *mqueue_name, bool nonblocking) { @@ -945,28 +874,27 @@ fhandler_socket_unix::open_mqueue (const char *mqueue_name, bool nonblocking) struct conn_wait_info_t { fhandler_socket_unix *fh; - UNICODE_STRING pipe_name; - WCHAR pipe_name_buf[CYGWIN_PIPE_SOCKET_NAME_LEN + 1]; + mqd_t mqd; /* Descriptor for listener's mqueue. */ }; -/* Just hop to the wait_pipe_thread method. */ +/* Just hop to the wait_mqueue_thread method. */ DWORD WINAPI connect_wait_func (LPVOID param) { conn_wait_info_t *wait_info = (conn_wait_info_t *) param; - return wait_info->fh->wait_pipe_thread (&wait_info->pipe_name); + return wait_info->fh->wait_mqueue_thread (wait_info->mqd); } -/* Start a waiter thread to wait for a pipe instance to become available. - in blocking mode, wait for the thread to finish. In nonblocking mode +/* Start a waiter thread to wait for the listener's mqueue to have space. + In blocking mode, wait for the thread to finish. In nonblocking mode just return with errno set to EINPROGRESS. */ int -fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name) +fhandler_socket_unix::wait_mqueue (mqd_t mqd) { conn_wait_info_t *wait_info; DWORD waitret, err; int ret = -1; - HANDLE thr, evt; + HANDLE thr; PVOID param; if (!(cwt_termination_evt = create_event ())) @@ -975,10 +903,7 @@ fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name) if (!wait_info) return -1; wait_info->fh = this; - RtlInitEmptyUnicodeString (&wait_info->pipe_name, wait_info->pipe_name_buf, - sizeof wait_info->pipe_name_buf); - RtlCopyUnicodeString (&wait_info->pipe_name, pipe_name); - + wait_info->mqd = mqd; cwt_param = (PVOID) wait_info; connect_wait_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE, connect_wait_func, cwt_param, 0, NULL); @@ -1027,71 +952,115 @@ fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name) break; } out: - evt = InterlockedExchangePointer (&cwt_termination_evt, NULL); - if (evt) - NtClose (evt); return ret; } +int +fhandler_socket_unix::recv_peer_mqueue_name (bool set, bool timeout, mqd_t *mqd) +{ + af_unix_pkt_hdr_t *packet; + size_t plen; + mqd_t peer_mqd; + int flags; + ssize_t nr; + timespec tm; + + plen = sizeof *packet + CYGWIN_MQUEUE_SOCKET_NAME_LEN + 1; + packet = (af_unix_pkt_hdr_t *) alloca (plen); + if (timeout) + { + set_mqueue_non_blocking (get_mqd_in (), false); + clock_gettime (CLOCK_REALTIME, &tm); + tm.tv_sec += AF_UNIX_CONNECT_TIMEOUT; + nr = _mq_timedrecv (get_mqd_in (), (char *) packet, plen, &tm, 0); + set_mqueue_non_blocking (get_mqd_in (), is_nonblocking ()); + } + else + nr = _mq_recv (get_mqd_in (), (char *) packet, plen, 0); + if (nr < 0) + { + if (get_errno () == ETIMEDOUT) + set_errno (ECONNABORTED); + return -1; + } + flags = O_WRONLY; + if (is_nonblocking ()) + flags |= O_NONBLOCK; + peer_mqd = mq_open ((const char *) AF_UNIX_PKT_DATA (packet), flags); + if (peer_mqd == (mqd_t) -1) + return -1; + if (set) + set_mqd_out (peer_mqd); + if (mqd) + *mqd = peer_mqd; + return 0; +} + +/* A socket makes a connection request by sending its own mqueue name + to the listener's mqueue, first creating its mqueue if necessary. */ +int +fhandler_socket_unix::send_mqueue_name (mqd_t mqd, bool wait) +{ + size_t plen; + size_t dlen = 0; + af_unix_pkt_hdr_t *packet; + const char *mqn = get_mqueue_name (); + + if (!mqn || !*mqn) + { + gen_mqueue_name (); + mqn = get_mqueue_name (); + if (get_mqd_in () == (mqd_t) -1 && create_mqueue () < 0) + return -1; + } + dlen = CYGWIN_MQUEUE_SOCKET_NAME_LEN + 1; + plen = sizeof *packet + dlen; + packet = (af_unix_pkt_hdr_t *) alloca (plen); + packet->init (true, _SHUT_NONE, 0, 0, dlen); + memcpy (AF_UNIX_PKT_DATA (packet), mqn, dlen); + if (!wait) + return mq_send (mqd, (const char *) packet, packet->pckt_len, + af_un_prio_admin); + timespec timeout; + set_mqueue_non_blocking (mqd, false); + clock_gettime (CLOCK_REALTIME, &timeout); + timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT; + return mq_timedsend (mqd, (const char *) packet, packet->pckt_len, + af_un_prio_admin, &timeout); +} + int fhandler_socket_unix::connect_mqueue (const char *mqueue_name) { - return 0; -} - -int -fhandler_socket_unix::connect_pipe (PUNICODE_STRING pipe_name) -{ - NTSTATUS status; - - /* Try connecting first. If it doesn't work, wait for the pipe - to become available. */ - status = open_pipe (pipe_name, get_socket_type () != SOCK_DGRAM); - if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status)) - return wait_pipe (pipe_name); - if (!NT_SUCCESS (status)) - { - __seterrno_from_nt_status (status); - so_error (get_errno ()); - return -1; - } - so_error (0); - return 0; -} - -int -fhandler_socket_unix::listen_pipe () -{ - NTSTATUS status; - IO_STATUS_BLOCK io; - HANDLE evt = NULL; - DWORD waitret = WAIT_OBJECT_0; int ret = -1; + int error = 0; + mqd_t mqd; - io.Status = STATUS_PENDING; - if (!is_nonblocking () && !(evt = create_event ())) + mqd = mq_open (mqueue_name, O_WRONLY | O_NONBLOCK); + + if (mqd == (mqd_t) -1) return -1; - status = NtFsControlFile (get_handle (), evt, NULL, NULL, &io, - FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); - if (status == STATUS_PENDING) + /* Try sending my mqueue name. If it doesn't work, wait for the + listener's mqueue to become available. */ + ret = send_mqueue_name (mqd, false); + if (ret < 0 && get_errno () == EAGAIN) + return wait_mqueue (mqd); + if (ret < 0) { - waitret = cygwait (evt ?: get_handle (), cw_infinite, - cw_cancel | cw_sig_eintr); - if (waitret == WAIT_OBJECT_0) - status = io.Status; + error = get_errno (); + goto out; } - if (evt) - NtClose (evt); - if (waitret == WAIT_CANCELED) - pthread::static_cancel_self (); - else if (waitret == WAIT_SIGNALED) - set_errno (EINTR); - else if (status == STATUS_PIPE_LISTENING) - set_errno (EAGAIN); - else if (status == STATUS_SUCCESS || status == STATUS_PIPE_CONNECTED) - ret = 0; - else - __seterrno_from_nt_status (status); + /* Wait for response, which should be peer's mqueue name. */ + ret = recv_peer_mqueue_name (); + if (ret < 0) + { + error =get_errno (); + goto out; + } + xchg_sock_info (); +out: + so_error (error); + mq_close (mqd); return ret; } @@ -1243,103 +1212,38 @@ fhandler_socket_unix::dup (fhandler_base *child, int flags) return 0; } -/* Waiter thread method. Here we wait for a pipe instance to become - available and connect to it, if so. This function is running - asynchronously if called on a non-blocking pipe. The important - things to do: +/* Waiter thread method. Here we wait for the listener's mqueue to + become available and send our mqueue name to it, if so. This + function is running asynchronously if called on a non-blocking + socket. The important things to do: - - Set the peer pipe handle if successful + - Set the output mqueue descriptor if successful - Send own sun_path to peer if successful - Set connect_state - Set so_error for later call to select */ DWORD -fhandler_socket_unix::wait_pipe_thread (PUNICODE_STRING pipe_name) +fhandler_socket_unix::wait_mqueue_thread (mqd_t mqd) { - HANDLE npfsh; - HANDLE evt; LONG error = 0; - NTSTATUS status; - IO_STATUS_BLOCK io; - ULONG pwbuf_size; - PFILE_PIPE_WAIT_FOR_BUFFER pwbuf; - LONGLONG stamp; - status = npfs_handle (npfsh); - if (!NT_SUCCESS (status)) + /* Try for up to AF_UNIX_CONNECT_TIMEOUT seconds to send my mqueue + name to listening socket. */ + if (send_mqueue_name (mqd, true) < 0) { - error = geterrno_from_nt_status (status); + error = get_errno (); goto out; } - if (!(evt = create_event ())) - goto out; - pwbuf_size = offsetof (FILE_PIPE_WAIT_FOR_BUFFER, Name) + pipe_name->Length; - pwbuf = (PFILE_PIPE_WAIT_FOR_BUFFER) alloca (pwbuf_size); - pwbuf->Timeout.QuadPart = AF_UNIX_CONNECT_TIMEOUT; - pwbuf->NameLength = pipe_name->Length; - pwbuf->TimeoutSpecified = TRUE; - memcpy (pwbuf->Name, pipe_name->Buffer, pipe_name->Length); - stamp = get_clock (CLOCK_MONOTONIC)->n100secs (); - do + /* Read response from peer, which should be its mqueue name. */ + if (recv_peer_mqueue_name () < 0) { - status = NtFsControlFile (npfsh, evt, NULL, NULL, &io, FSCTL_PIPE_WAIT, - pwbuf, pwbuf_size, NULL, 0); - if (status == STATUS_PENDING) - { - HANDLE w[2] = { evt, cwt_termination_evt }; - switch (WaitForMultipleObjects (2, w, FALSE, INFINITE)) - { - case WAIT_OBJECT_0: - status = io.Status; - break; - case WAIT_OBJECT_0 + 1: - default: - status = STATUS_THREAD_IS_TERMINATING; - break; - } - } - switch (status) - { - case STATUS_SUCCESS: - { - status = open_pipe (pipe_name, get_socket_type () != SOCK_DGRAM); - if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status)) - { - /* Another concurrent connect grabbed the pipe instance - under our nose. Fix the timeout value and go waiting - again, unless the timeout has passed. */ - pwbuf->Timeout.QuadPart -= - stamp - get_clock (CLOCK_MONOTONIC)->n100secs (); - if (pwbuf->Timeout.QuadPart >= 0) - { - status = STATUS_IO_TIMEOUT; - error = ETIMEDOUT; - } - } - else if (!NT_SUCCESS (status)) - error = geterrno_from_nt_status (status); - } - break; - case STATUS_OBJECT_NAME_NOT_FOUND: - error = EADDRNOTAVAIL; - break; - case STATUS_IO_TIMEOUT: - error = ETIMEDOUT; - break; - case STATUS_INSUFFICIENT_RESOURCES: - error = ENOBUFS; - break; - case STATUS_THREAD_IS_TERMINATING: - error = EINTR; - break; - case STATUS_INVALID_DEVICE_REQUEST: - default: - error = EIO; - break; - } + error = get_errno (); + goto out; } - while (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status)); + xchg_sock_info (); + /* FIXME: Translate mq errors to socket errors. */ out: + mq_close (mqd); PVOID param = InterlockedExchangePointer (&cwt_param, NULL); if (param) cfree (param); @@ -1554,6 +1458,9 @@ fhandler_socket_unix::listen (int backlog) int fhandler_socket_unix::accept4 (struct sockaddr *peer, int *len, int flags) { + mqd_t peer_mqd; + fhandler_socket_unix *sock; + if (get_socket_type () != SOCK_STREAM) { set_errno (EOPNOTSUPP); @@ -1565,91 +1472,84 @@ fhandler_socket_unix::accept4 (struct sockaddr *peer, int *len, int flags) set_errno (EINVAL); return -1; } - if (listen_pipe () == 0) + + if (recv_peer_mqueue_name (false, false, &peer_mqd) < 0) + return -1; + if (flags & SOCK_NONBLOCK) + set_mqueue_non_blocking (peer_mqd, true); + /* We now have an mqueue descriptor that the accepted socket can use + for output. Prepare new file descriptor. */ + int error = ENOBUFS; + cygheap_fdnew fd; + if (fd < 0) + goto err; + sock = (fhandler_socket_unix *) build_fh_dev (dev ()); + if (!sock) + goto err; + if (sock->create_shmem () < 0) + goto create_shmem_failed; + sock->set_addr_family (AF_UNIX); + sock->set_socket_type (get_socket_type ()); + if (flags & SOCK_NONBLOCK) + sock->set_nonblocking (true); + if (flags & SOCK_CLOEXEC) + sock->set_close_on_exec (true); + sock->set_unique_id (); + sock->set_ino (sock->get_unique_id ()); + sock->set_mqd_out (peer_mqd); + if (sock->send_mqueue_name (peer_mqd, false) < 0) { - /* Our handle is now connected with a client. This handle is used - for the accepted socket. Our handle has to be replaced with a - new instance handle for the next accept. */ - io_lock (); - HANDLE accepted = get_handle (); - HANDLE new_inst = create_pipe_instance (); - int error = ENOBUFS; - if (!new_inst) - io_unlock (); - else - { - /* Set new io handle. */ - set_handle (new_inst); - io_unlock (); - /* Prepare new file descriptor. */ - cygheap_fdnew fd; - - if (fd >= 0) - { - fhandler_socket_unix *sock = (fhandler_socket_unix *) - build_fh_dev (dev ()); - if (sock) - { - if (sock->create_shmem () < 0) - goto create_shmem_failed; - - sock->set_addr_family (AF_UNIX); - sock->set_socket_type (get_socket_type ()); - if (flags & SOCK_NONBLOCK) - sock->set_nonblocking (true); - if (flags & SOCK_CLOEXEC) - sock->set_close_on_exec (true); - sock->set_unique_id (); - sock->set_ino (sock->get_unique_id ()); - sock->pc.set_nt_native_path (pc.get_nt_native_path ()); - sock->connect_state (connected); - sock->binding_state (binding_state ()); - sock->set_handle (accepted); - - sock->sun_path (sun_path ()); - sock->sock_cred (sock_cred ()); - /* Send this socket info to connecting socket. */ - sock->send_sock_info (false); - /* Fetch the packet sent by send_sock_info called by - connecting peer. */ - error = sock->recv_peer_info (); - if (error == 0) - { - __try - { - if (peer) - { - sun_name_t *sun = sock->peer_sun_path (); - if (sun) - { - memcpy (peer, &sun->un, - MIN (*len, sun->un_len)); - *len = sun->un_len; - } - else if (len) - *len = 0; - } - fd = sock; - if (fd <= 2) - set_std_handle (fd); - return fd; - } - __except (NO_ERROR) - { - error = EFAULT; - } - __endtry - } -create_shmem_failed: - delete sock; - } - } - } - /* Ouch! We can't handle the client if we couldn't - create a new instance to accept more connections.*/ - disconnect_pipe (accepted); - set_errno (error); + error = get_errno (); + goto send_mqueue_name_failed; } + sock->connect_state (connected); + sock->binding_state (binding_state ()); + sock->sun_path (sun_path ()); + sock->sock_cred (sock_cred ()); + /* Send this socket info to connecting socket. */ + sock->send_sock_info (false); + /* Fetch the packet sent by send_sock_info called by connecting + peer. */ + error = sock->recv_peer_info (); + if (error == 0) + { + __try + { + if (peer) + { + sun_name_t *sun = sock->peer_sun_path (); + if (sun) + { + memcpy (peer, &sun->un, MIN (*len, sun->un_len)); + *len = sun->un_len; + } + else if (len) + *len = 0; + } + fd = sock; + if (fd <= 2) + set_std_handle (fd); + return fd; + } + __except (NO_ERROR) + { + error = EFAULT; + } + __endtry + } +send_mqueue_name_failed: + if (sock->get_mqd_in () != (mqd_t) -1) + { + mq_close (sock->get_mqd_in ()); + mq_unlink (sock->get_mqueue_name ()); + } + NtUnmapViewOfSection (NtCurrentProcess (), sock->shmem); + NtClose (sock->shmem_handle); +create_shmem_failed: + delete sock; +err: + set_errno (error); + mq_close (peer_mqd); return -1; }