Cygwin: AF_UNIX: adapt connect and accept to mqueues

- Flesh out connect_mqueue, which replaces connect_pipe.

- A new method wait_mqueue replaces wait_pipe.

- A new method wait_mqueue_thread replaces wait_pipe_thread.

- New methods send_mqueue_name and recv_peer_mqueue_name are used for
  making connection requests and responding to them.

- A new method xchg_sock_info is called after a connecting socket
  successfully receives its peer's mqueue name.  This functionality
  was previously in open_pipe.

- Remove open_pipe, listen_pipe, and create_pipe_instance, which are
  no longer needed.
This commit is contained in:
Ken Brown 2021-05-09 18:15:56 -04:00
parent 8ce83c452b
commit 37226b2763
2 changed files with 219 additions and 319 deletions

View File

@ -1095,17 +1095,17 @@ class fhandler_socket_unix : public fhandler_socket
char get_type_char ();
void set_pipe_non_blocking (bool nonblocking);
int send_sock_info (bool from_bind);
void xchg_sock_info ();
int grab_admin_pkt (bool peek = true);
int recv_peer_info ();
static NTSTATUS npfs_handle (HANDLE &nph);
int create_mqueue (bool listener = false);
HANDLE create_pipe_instance ();
NTSTATUS open_pipe (PUNICODE_STRING pipe_name, bool xchg_sock_info);
mqd_t open_mqueue (const char *mqueue_name, bool nonblocking);
int wait_pipe (PUNICODE_STRING pipe_name);
int connect_mqueue (const char *mqueue_name);
int connect_pipe (PUNICODE_STRING pipe_name);
int listen_pipe ();
int wait_mqueue (mqd_t mqd);
int send_mqueue_name (mqd_t mqd, bool wait);
int recv_peer_mqueue_name (bool set = true, bool timeout = true,
mqd_t *mqd = NULL);
ssize_t peek_mqueue (char *buf, size_t buflen, bool nonblocking = true);
int disconnect_pipe (HANDLE ph);
/* The NULL pointer check is required for FS methods like fstat. When
@ -1143,7 +1143,7 @@ class fhandler_socket_unix : public fhandler_socket
int dup (fhandler_base *child, int);
DWORD wait_pipe_thread (PUNICODE_STRING pipe_name);
DWORD wait_mqueue_thread (mqd_t mqd);
int socket (int af, int type, int protocol, int flags);
int socketpair (int af, int type, int protocol, int flags,

View File

@ -166,7 +166,7 @@ AF_UNIX_PKT_DATA_APPEND (af_unix_pkt_hdr_t *phdr, void *data, uint16_t dlen)
|| _s == STATUS_MORE_PROCESSING_REQUIRED; })
/* Default timeout value of connect: 20 secs, as on Linux. */
#define AF_UNIX_CONNECT_TIMEOUT (-20 * NS100PERSEC)
#define AF_UNIX_CONNECT_TIMEOUT 20
/* Message queue priorities */
enum
@ -257,9 +257,6 @@ fhandler_socket_unix::reopen_shmem ()
return 0;
}
/* Character length of pipe name, excluding trailing NUL. */
#define CYGWIN_PIPE_SOCKET_NAME_LEN 47
/* Character length of mqueue name, excluding trailing NUL. */
#define CYGWIN_MQUEUE_SOCKET_NAME_LEN 27
@ -694,6 +691,15 @@ fhandler_socket_unix::send_sock_info (bool from_bind)
return ret;
}
/* Called from connect_mqueue and wait_mqueue after successfully
getting peer's mqueue name. */
void
fhandler_socket_unix::xchg_sock_info ()
{
send_sock_info (false);
recv_peer_info ();
}
/* Reads an administrative packet from the pipe and handles it. If
PEEK is true, checks first to see if the next packet in the pipe is
an administrative packet; otherwise the caller must set io_lock and
@ -754,10 +760,6 @@ fhandler_socket_unix::grab_admin_pkt (bool peek)
return 0;
}
/* FIXME: This is temporary until we no longer need the old
AF_UNIX_CONNECT_TIMEOUT. */
#define AF_UNIX_CONNECT_TIMEOUT_MQ 20
/* Returns an error code. Locking is not required when called from accept4,
user space doesn't know about this socket yet. */
int
@ -773,7 +775,7 @@ fhandler_socket_unix::recv_peer_info ()
packet = (af_unix_pkt_hdr_t *) alloca (len);
set_mqueue_non_blocking (get_mqd_in (), false);
clock_gettime (CLOCK_REALTIME, &timeout);
timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT_MQ;
timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT;
if (_mq_timedrecv (get_mqd_in (), (char *) packet, len, &timeout, 0) < 0)
ret = get_errno ();
if (ret == ETIMEDOUT)
@ -860,79 +862,6 @@ fhandler_socket_unix::create_mqueue (bool listener)
return 0;
}
HANDLE
fhandler_socket_unix::create_pipe_instance ()
{
NTSTATUS status;
HANDLE npfsh;
HANDLE ph;
ACCESS_MASK access;
OBJECT_ATTRIBUTES attr;
IO_STATUS_BLOCK io;
ULONG sharing;
ULONG nonblocking;
ULONG max_instances;
LARGE_INTEGER timeout;
status = npfs_handle (npfsh);
if (!NT_SUCCESS (status))
{
__seterrno_from_nt_status (status);
return NULL;
}
access = GENERIC_READ | FILE_READ_ATTRIBUTES
| GENERIC_WRITE | FILE_WRITE_ATTRIBUTES
| SYNCHRONIZE;
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
/* NPFS doesn't understand reopening by handle, unfortunately. */
InitializeObjectAttributes (&attr, pc.get_nt_native_path (), OBJ_INHERIT,
npfsh, NULL);
nonblocking = is_nonblocking () ? FILE_PIPE_COMPLETE_OPERATION
: FILE_PIPE_QUEUE_OPERATION;
max_instances = (get_socket_type () == SOCK_DGRAM) ? 1 : -1;
timeout.QuadPart = -500000;
status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing,
FILE_OPEN, 0,
FILE_PIPE_MESSAGE_TYPE,
FILE_PIPE_MESSAGE_MODE,
nonblocking, max_instances,
rmem (), wmem (), &timeout);
if (!NT_SUCCESS (status))
__seterrno_from_nt_status (status);
return ph;
}
NTSTATUS
fhandler_socket_unix::open_pipe (PUNICODE_STRING pipe_name, bool xchg_sock_info)
{
NTSTATUS status;
HANDLE npfsh;
ACCESS_MASK access;
OBJECT_ATTRIBUTES attr;
IO_STATUS_BLOCK io;
ULONG sharing;
HANDLE ph = NULL;
status = npfs_handle (npfsh);
if (!NT_SUCCESS (status))
return status;
access = GENERIC_READ | GENERIC_WRITE | SYNCHRONIZE;
InitializeObjectAttributes (&attr, pipe_name, OBJ_INHERIT, npfsh, NULL);
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
status = NtOpenFile (&ph, access, &attr, &io, sharing, 0);
if (NT_SUCCESS (status))
{
set_handle (ph);
if (xchg_sock_info)
{
/* FIXME: Should we check for errors? */
send_sock_info (false);
recv_peer_info ();
}
}
return status;
}
mqd_t
fhandler_socket_unix::open_mqueue (const char *mqueue_name, bool nonblocking)
{
@ -945,28 +874,27 @@ fhandler_socket_unix::open_mqueue (const char *mqueue_name, bool nonblocking)
struct conn_wait_info_t
{
fhandler_socket_unix *fh;
UNICODE_STRING pipe_name;
WCHAR pipe_name_buf[CYGWIN_PIPE_SOCKET_NAME_LEN + 1];
mqd_t mqd; /* Descriptor for listener's mqueue. */
};
/* Just hop to the wait_pipe_thread method. */
/* Just hop to the wait_mqueue_thread method. */
DWORD WINAPI
connect_wait_func (LPVOID param)
{
conn_wait_info_t *wait_info = (conn_wait_info_t *) param;
return wait_info->fh->wait_pipe_thread (&wait_info->pipe_name);
return wait_info->fh->wait_mqueue_thread (wait_info->mqd);
}
/* Start a waiter thread to wait for a pipe instance to become available.
in blocking mode, wait for the thread to finish. In nonblocking mode
/* Start a waiter thread to wait for the listener's mqueue to have space.
In blocking mode, wait for the thread to finish. In nonblocking mode
just return with errno set to EINPROGRESS. */
int
fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name)
fhandler_socket_unix::wait_mqueue (mqd_t mqd)
{
conn_wait_info_t *wait_info;
DWORD waitret, err;
int ret = -1;
HANDLE thr, evt;
HANDLE thr;
PVOID param;
if (!(cwt_termination_evt = create_event ()))
@ -975,10 +903,7 @@ fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name)
if (!wait_info)
return -1;
wait_info->fh = this;
RtlInitEmptyUnicodeString (&wait_info->pipe_name, wait_info->pipe_name_buf,
sizeof wait_info->pipe_name_buf);
RtlCopyUnicodeString (&wait_info->pipe_name, pipe_name);
wait_info->mqd = mqd;
cwt_param = (PVOID) wait_info;
connect_wait_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE,
connect_wait_func, cwt_param, 0, NULL);
@ -1027,71 +952,115 @@ fhandler_socket_unix::wait_pipe (PUNICODE_STRING pipe_name)
break;
}
out:
evt = InterlockedExchangePointer (&cwt_termination_evt, NULL);
if (evt)
NtClose (evt);
return ret;
}
int
fhandler_socket_unix::recv_peer_mqueue_name (bool set, bool timeout, mqd_t *mqd)
{
af_unix_pkt_hdr_t *packet;
size_t plen;
mqd_t peer_mqd;
int flags;
ssize_t nr;
timespec tm;
plen = sizeof *packet + CYGWIN_MQUEUE_SOCKET_NAME_LEN + 1;
packet = (af_unix_pkt_hdr_t *) alloca (plen);
if (timeout)
{
set_mqueue_non_blocking (get_mqd_in (), false);
clock_gettime (CLOCK_REALTIME, &tm);
tm.tv_sec += AF_UNIX_CONNECT_TIMEOUT;
nr = _mq_timedrecv (get_mqd_in (), (char *) packet, plen, &tm, 0);
set_mqueue_non_blocking (get_mqd_in (), is_nonblocking ());
}
else
nr = _mq_recv (get_mqd_in (), (char *) packet, plen, 0);
if (nr < 0)
{
if (get_errno () == ETIMEDOUT)
set_errno (ECONNABORTED);
return -1;
}
flags = O_WRONLY;
if (is_nonblocking ())
flags |= O_NONBLOCK;
peer_mqd = mq_open ((const char *) AF_UNIX_PKT_DATA (packet), flags);
if (peer_mqd == (mqd_t) -1)
return -1;
if (set)
set_mqd_out (peer_mqd);
if (mqd)
*mqd = peer_mqd;
return 0;
}
/* A socket makes a connection request by sending its own mqueue name
to the listener's mqueue, first creating its mqueue if necessary. */
int
fhandler_socket_unix::send_mqueue_name (mqd_t mqd, bool wait)
{
size_t plen;
size_t dlen = 0;
af_unix_pkt_hdr_t *packet;
const char *mqn = get_mqueue_name ();
if (!mqn || !*mqn)
{
gen_mqueue_name ();
mqn = get_mqueue_name ();
if (get_mqd_in () == (mqd_t) -1 && create_mqueue () < 0)
return -1;
}
dlen = CYGWIN_MQUEUE_SOCKET_NAME_LEN + 1;
plen = sizeof *packet + dlen;
packet = (af_unix_pkt_hdr_t *) alloca (plen);
packet->init (true, _SHUT_NONE, 0, 0, dlen);
memcpy (AF_UNIX_PKT_DATA (packet), mqn, dlen);
if (!wait)
return mq_send (mqd, (const char *) packet, packet->pckt_len,
af_un_prio_admin);
timespec timeout;
set_mqueue_non_blocking (mqd, false);
clock_gettime (CLOCK_REALTIME, &timeout);
timeout.tv_sec += AF_UNIX_CONNECT_TIMEOUT;
return mq_timedsend (mqd, (const char *) packet, packet->pckt_len,
af_un_prio_admin, &timeout);
}
int
fhandler_socket_unix::connect_mqueue (const char *mqueue_name)
{
return 0;
}
int
fhandler_socket_unix::connect_pipe (PUNICODE_STRING pipe_name)
{
NTSTATUS status;
/* Try connecting first. If it doesn't work, wait for the pipe
to become available. */
status = open_pipe (pipe_name, get_socket_type () != SOCK_DGRAM);
if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
return wait_pipe (pipe_name);
if (!NT_SUCCESS (status))
{
__seterrno_from_nt_status (status);
so_error (get_errno ());
return -1;
}
so_error (0);
return 0;
}
int
fhandler_socket_unix::listen_pipe ()
{
NTSTATUS status;
IO_STATUS_BLOCK io;
HANDLE evt = NULL;
DWORD waitret = WAIT_OBJECT_0;
int ret = -1;
int error = 0;
mqd_t mqd;
io.Status = STATUS_PENDING;
if (!is_nonblocking () && !(evt = create_event ()))
mqd = mq_open (mqueue_name, O_WRONLY | O_NONBLOCK);
if (mqd == (mqd_t) -1)
return -1;
status = NtFsControlFile (get_handle (), evt, NULL, NULL, &io,
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
if (status == STATUS_PENDING)
/* Try sending my mqueue name. If it doesn't work, wait for the
listener's mqueue to become available. */
ret = send_mqueue_name (mqd, false);
if (ret < 0 && get_errno () == EAGAIN)
return wait_mqueue (mqd);
if (ret < 0)
{
waitret = cygwait (evt ?: get_handle (), cw_infinite,
cw_cancel | cw_sig_eintr);
if (waitret == WAIT_OBJECT_0)
status = io.Status;
error = get_errno ();
goto out;
}
if (evt)
NtClose (evt);
if (waitret == WAIT_CANCELED)
pthread::static_cancel_self ();
else if (waitret == WAIT_SIGNALED)
set_errno (EINTR);
else if (status == STATUS_PIPE_LISTENING)
set_errno (EAGAIN);
else if (status == STATUS_SUCCESS || status == STATUS_PIPE_CONNECTED)
ret = 0;
else
__seterrno_from_nt_status (status);
/* Wait for response, which should be peer's mqueue name. */
ret = recv_peer_mqueue_name ();
if (ret < 0)
{
error =get_errno ();
goto out;
}
xchg_sock_info ();
out:
so_error (error);
mq_close (mqd);
return ret;
}
@ -1243,103 +1212,38 @@ fhandler_socket_unix::dup (fhandler_base *child, int flags)
return 0;
}
/* Waiter thread method. Here we wait for a pipe instance to become
available and connect to it, if so. This function is running
asynchronously if called on a non-blocking pipe. The important
things to do:
/* Waiter thread method. Here we wait for the listener's mqueue to
become available and send our mqueue name to it, if so. This
function is running asynchronously if called on a non-blocking
socket. The important things to do:
- Set the peer pipe handle if successful
- Set the output mqueue descriptor if successful
- Send own sun_path to peer if successful
- Set connect_state
- Set so_error for later call to select
*/
DWORD
fhandler_socket_unix::wait_pipe_thread (PUNICODE_STRING pipe_name)
fhandler_socket_unix::wait_mqueue_thread (mqd_t mqd)
{
HANDLE npfsh;
HANDLE evt;
LONG error = 0;
NTSTATUS status;
IO_STATUS_BLOCK io;
ULONG pwbuf_size;
PFILE_PIPE_WAIT_FOR_BUFFER pwbuf;
LONGLONG stamp;
status = npfs_handle (npfsh);
if (!NT_SUCCESS (status))
/* Try for up to AF_UNIX_CONNECT_TIMEOUT seconds to send my mqueue
name to listening socket. */
if (send_mqueue_name (mqd, true) < 0)
{
error = geterrno_from_nt_status (status);
error = get_errno ();
goto out;
}
if (!(evt = create_event ()))
goto out;
pwbuf_size = offsetof (FILE_PIPE_WAIT_FOR_BUFFER, Name) + pipe_name->Length;
pwbuf = (PFILE_PIPE_WAIT_FOR_BUFFER) alloca (pwbuf_size);
pwbuf->Timeout.QuadPart = AF_UNIX_CONNECT_TIMEOUT;
pwbuf->NameLength = pipe_name->Length;
pwbuf->TimeoutSpecified = TRUE;
memcpy (pwbuf->Name, pipe_name->Buffer, pipe_name->Length);
stamp = get_clock (CLOCK_MONOTONIC)->n100secs ();
do
/* Read response from peer, which should be its mqueue name. */
if (recv_peer_mqueue_name () < 0)
{
status = NtFsControlFile (npfsh, evt, NULL, NULL, &io, FSCTL_PIPE_WAIT,
pwbuf, pwbuf_size, NULL, 0);
if (status == STATUS_PENDING)
{
HANDLE w[2] = { evt, cwt_termination_evt };
switch (WaitForMultipleObjects (2, w, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
status = io.Status;
break;
case WAIT_OBJECT_0 + 1:
default:
status = STATUS_THREAD_IS_TERMINATING;
break;
}
}
switch (status)
{
case STATUS_SUCCESS:
{
status = open_pipe (pipe_name, get_socket_type () != SOCK_DGRAM);
if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
{
/* Another concurrent connect grabbed the pipe instance
under our nose. Fix the timeout value and go waiting
again, unless the timeout has passed. */
pwbuf->Timeout.QuadPart -=
stamp - get_clock (CLOCK_MONOTONIC)->n100secs ();
if (pwbuf->Timeout.QuadPart >= 0)
{
status = STATUS_IO_TIMEOUT;
error = ETIMEDOUT;
}
}
else if (!NT_SUCCESS (status))
error = geterrno_from_nt_status (status);
}
break;
case STATUS_OBJECT_NAME_NOT_FOUND:
error = EADDRNOTAVAIL;
break;
case STATUS_IO_TIMEOUT:
error = ETIMEDOUT;
break;
case STATUS_INSUFFICIENT_RESOURCES:
error = ENOBUFS;
break;
case STATUS_THREAD_IS_TERMINATING:
error = EINTR;
break;
case STATUS_INVALID_DEVICE_REQUEST:
default:
error = EIO;
break;
}
error = get_errno ();
goto out;
}
while (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status));
xchg_sock_info ();
/* FIXME: Translate mq errors to socket errors. */
out:
mq_close (mqd);
PVOID param = InterlockedExchangePointer (&cwt_param, NULL);
if (param)
cfree (param);
@ -1554,6 +1458,9 @@ fhandler_socket_unix::listen (int backlog)
int
fhandler_socket_unix::accept4 (struct sockaddr *peer, int *len, int flags)
{
mqd_t peer_mqd;
fhandler_socket_unix *sock;
if (get_socket_type () != SOCK_STREAM)
{
set_errno (EOPNOTSUPP);
@ -1565,91 +1472,84 @@ fhandler_socket_unix::accept4 (struct sockaddr *peer, int *len, int flags)
set_errno (EINVAL);
return -1;
}
if (listen_pipe () == 0)
if (recv_peer_mqueue_name (false, false, &peer_mqd) < 0)
return -1;
if (flags & SOCK_NONBLOCK)
set_mqueue_non_blocking (peer_mqd, true);
/* We now have an mqueue descriptor that the accepted socket can use
for output. Prepare new file descriptor. */
int error = ENOBUFS;
cygheap_fdnew fd;
if (fd < 0)
goto err;
sock = (fhandler_socket_unix *) build_fh_dev (dev ());
if (!sock)
goto err;
if (sock->create_shmem () < 0)
goto create_shmem_failed;
sock->set_addr_family (AF_UNIX);
sock->set_socket_type (get_socket_type ());
if (flags & SOCK_NONBLOCK)
sock->set_nonblocking (true);
if (flags & SOCK_CLOEXEC)
sock->set_close_on_exec (true);
sock->set_unique_id ();
sock->set_ino (sock->get_unique_id ());
sock->set_mqd_out (peer_mqd);
if (sock->send_mqueue_name (peer_mqd, false) < 0)
{
/* Our handle is now connected with a client. This handle is used
for the accepted socket. Our handle has to be replaced with a
new instance handle for the next accept. */
io_lock ();
HANDLE accepted = get_handle ();
HANDLE new_inst = create_pipe_instance ();
int error = ENOBUFS;
if (!new_inst)
io_unlock ();
else
{
/* Set new io handle. */
set_handle (new_inst);
io_unlock ();
/* Prepare new file descriptor. */
cygheap_fdnew fd;
if (fd >= 0)
{
fhandler_socket_unix *sock = (fhandler_socket_unix *)
build_fh_dev (dev ());
if (sock)
{
if (sock->create_shmem () < 0)
goto create_shmem_failed;
sock->set_addr_family (AF_UNIX);
sock->set_socket_type (get_socket_type ());
if (flags & SOCK_NONBLOCK)
sock->set_nonblocking (true);
if (flags & SOCK_CLOEXEC)
sock->set_close_on_exec (true);
sock->set_unique_id ();
sock->set_ino (sock->get_unique_id ());
sock->pc.set_nt_native_path (pc.get_nt_native_path ());
sock->connect_state (connected);
sock->binding_state (binding_state ());
sock->set_handle (accepted);
sock->sun_path (sun_path ());
sock->sock_cred (sock_cred ());
/* Send this socket info to connecting socket. */
sock->send_sock_info (false);
/* Fetch the packet sent by send_sock_info called by
connecting peer. */
error = sock->recv_peer_info ();
if (error == 0)
{
__try
{
if (peer)
{
sun_name_t *sun = sock->peer_sun_path ();
if (sun)
{
memcpy (peer, &sun->un,
MIN (*len, sun->un_len));
*len = sun->un_len;
}
else if (len)
*len = 0;
}
fd = sock;
if (fd <= 2)
set_std_handle (fd);
return fd;
}
__except (NO_ERROR)
{
error = EFAULT;
}
__endtry
}
create_shmem_failed:
delete sock;
}
}
}
/* Ouch! We can't handle the client if we couldn't
create a new instance to accept more connections.*/
disconnect_pipe (accepted);
set_errno (error);
error = get_errno ();
goto send_mqueue_name_failed;
}
sock->connect_state (connected);
sock->binding_state (binding_state ());
sock->sun_path (sun_path ());
sock->sock_cred (sock_cred ());
/* Send this socket info to connecting socket. */
sock->send_sock_info (false);
/* Fetch the packet sent by send_sock_info called by connecting
peer. */
error = sock->recv_peer_info ();
if (error == 0)
{
__try
{
if (peer)
{
sun_name_t *sun = sock->peer_sun_path ();
if (sun)
{
memcpy (peer, &sun->un, MIN (*len, sun->un_len));
*len = sun->un_len;
}
else if (len)
*len = 0;
}
fd = sock;
if (fd <= 2)
set_std_handle (fd);
return fd;
}
__except (NO_ERROR)
{
error = EFAULT;
}
__endtry
}
send_mqueue_name_failed:
if (sock->get_mqd_in () != (mqd_t) -1)
{
mq_close (sock->get_mqd_in ());
mq_unlink (sock->get_mqueue_name ());
}
NtUnmapViewOfSection (NtCurrentProcess (), sock->shmem);
NtClose (sock->shmem_handle);
create_shmem_failed:
delete sock;
err:
set_errno (error);
mq_close (peer_mqd);
return -1;
}