4
0
mirror of git://sourceware.org/git/newlib-cygwin.git synced 2025-01-19 04:49:25 +08:00

Cygwin: FIFO: allow multiple writers

Introduce a 'fifo_client_handler' structure that can be used by a
reader to communicate with a writer using an instance of the named
pipe.  An fhandler_fifo opened for reading creates a thread that does
the following:

 - maintains a list of fifo_client_handlers
 - listens for_clients trying to connect
 - creates new pipe instances as needed so that there's always at
   least one available for connecting.

The pipe instances are initially created in blocking mode, but they
are set to be non-blocking after a connection is made.

fhandler_fifo::raw_read now loops through the connected clients and
reads from the first one that has data available.

New fhandler_fifo methods: add_client, listen_client,
listen_client_thread, check_listen_client_thread.

Replace the create_pipe method by create_pipe_instance, which allows
unlimited pipe instances.

New helper functions: create_event, set_pipe_non_blocking.
This commit is contained in:
Ken Brown 2019-03-22 19:30:37 +00:00 committed by Corinna Vinschen
parent 5955da96e2
commit 48d4cce3be
2 changed files with 367 additions and 32 deletions

View File

@ -1235,20 +1235,49 @@ public:
};
#define CYGWIN_FIFO_PIPE_NAME_LEN 47
#define MAX_CLIENTS 64
enum fifo_client_connect_state
{
fc_unknown,
fc_connecting,
fc_connected,
fc_invalid
};
struct fifo_client_handler
{
fhandler_base *fh;
fifo_client_connect_state state;
HANDLE connect_evt;
HANDLE dummy_evt; /* Never signaled. */
fifo_client_handler () : fh (NULL), state (fc_unknown), connect_evt (NULL),
dummy_evt (NULL) {}
int connect ();
int close ();
};
class fhandler_fifo: public fhandler_base
{
HANDLE read_ready;
HANDLE write_ready;
HANDLE listen_client_thr;
HANDLE lct_termination_evt;
UNICODE_STRING pipe_name;
WCHAR pipe_name_buf[CYGWIN_FIFO_PIPE_NAME_LEN + 1];
fifo_client_handler client[MAX_CLIENTS];
int nclients, nconnected;
bool __reg2 wait (HANDLE);
NTSTATUS npfs_handle (HANDLE &);
HANDLE create_pipe ();
HANDLE create_pipe_instance (bool);
NTSTATUS open_pipe ();
int disconnect_and_reconnect (int);
int add_client ();
bool listen_client ();
public:
fhandler_fifo ();
PUNICODE_STRING get_pipe_name ();
DWORD listen_client_thread ();
int open (int, mode_t);
off_t lseek (off_t offset, int whence);
int close ();

View File

@ -31,8 +31,9 @@ STATUS_PIPE_EMPTY simply means there's no data to be read. */
|| _s == STATUS_PIPE_EMPTY; })
fhandler_fifo::fhandler_fifo ():
fhandler_base (),
read_ready (NULL), write_ready (NULL)
fhandler_base (), read_ready (NULL), write_ready (NULL),
listen_client_thr (NULL), lct_termination_evt (NULL), nclients (0),
nconnected (0)
{
pipe_name_buf[0] = L'\0';
need_fork_fixup (true);
@ -78,6 +79,94 @@ fhandler_fifo::arm (HANDLE h)
return res;
}
static HANDLE
create_event ()
{
NTSTATUS status;
OBJECT_ATTRIBUTES attr;
HANDLE evt = NULL;
InitializeObjectAttributes (&attr, NULL, 0, NULL, NULL);
status = NtCreateEvent (&evt, EVENT_ALL_ACCESS, &attr,
NotificationEvent, FALSE);
if (!NT_SUCCESS (status))
__seterrno_from_nt_status (status);
return evt;
}
static void
set_pipe_non_blocking (HANDLE ph, bool nonblocking)
{
NTSTATUS status;
IO_STATUS_BLOCK io;
FILE_PIPE_INFORMATION fpi;
fpi.ReadMode = FILE_PIPE_MESSAGE_MODE;
fpi.CompletionMode = nonblocking ? FILE_PIPE_COMPLETE_OPERATION
: FILE_PIPE_QUEUE_OPERATION;
status = NtSetInformationFile (ph, &io, &fpi, sizeof fpi,
FilePipeInformation);
if (!NT_SUCCESS (status))
debug_printf ("NtSetInformationFile(FilePipeInformation): %y", status);
}
/* The pipe instance is always in blocking mode when this is called. */
int
fifo_client_handler::connect ()
{
NTSTATUS status;
IO_STATUS_BLOCK io;
if (connect_evt)
ResetEvent (connect_evt);
else if (!(connect_evt = create_event ()))
return -1;
status = NtFsControlFile (fh->get_handle (), connect_evt, NULL, NULL, &io,
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
switch (status)
{
case STATUS_PENDING:
case STATUS_PIPE_LISTENING:
state = fc_connecting;
break;
case STATUS_PIPE_CONNECTED:
state = fc_connected;
set_pipe_non_blocking (fh->get_handle (), true);
break;
default:
__seterrno_from_nt_status (status);
return -1;
}
return 0;
}
int
fhandler_fifo::disconnect_and_reconnect (int i)
{
NTSTATUS status;
IO_STATUS_BLOCK io;
HANDLE ph = client[i].fh->get_handle ();
status = NtFsControlFile (ph, NULL, NULL, NULL, &io, FSCTL_PIPE_DISCONNECT,
NULL, 0, NULL, 0);
/* Short-lived. Don't use cygwait. We don't want to be interrupted. */
if (status == STATUS_PENDING
&& NtWaitForSingleObject (ph, FALSE, NULL) == WAIT_OBJECT_0)
status = io.Status;
if (!NT_SUCCESS (status))
{
__seterrno_from_nt_status (status);
return -1;
}
set_pipe_non_blocking (client[i].fh->get_handle (), false);
if (client[i].connect () < 0)
return -1;
if (client[i].state == fc_connected)
nconnected++;
return 0;
}
NTSTATUS
fhandler_fifo::npfs_handle (HANDLE &nph)
{
@ -108,9 +197,12 @@ fhandler_fifo::npfs_handle (HANDLE &nph)
return status;
}
/* Called when pipe is opened for reading. */
/* Called when a FIFO is first opened for reading and again each time
a new client is needed. Each pipe instance is created in blocking
mode so that we can easily wait for a connection. After it is
connected, it is put in nonblocking mode. */
HANDLE
fhandler_fifo::create_pipe ()
fhandler_fifo::create_pipe_instance (bool first)
{
NTSTATUS status;
HANDLE npfsh;
@ -121,7 +213,7 @@ fhandler_fifo::create_pipe ()
ULONG hattr;
ULONG sharing;
ULONG nonblocking = FILE_PIPE_QUEUE_OPERATION;
ULONG max_instances = 1;
ULONG max_instances = -1;
LARGE_INTEGER timeout;
status = npfs_handle (npfsh);
@ -133,12 +225,14 @@ fhandler_fifo::create_pipe ()
access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES
| SYNCHRONIZE;
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
hattr = OBJ_INHERIT | OBJ_CASE_INSENSITIVE;
hattr = OBJ_INHERIT;
if (first)
hattr |= OBJ_CASE_INSENSITIVE;
InitializeObjectAttributes (&attr, get_pipe_name (),
hattr, npfsh, NULL);
timeout.QuadPart = -500000;
status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing,
FILE_CREATE, 0,
first ? FILE_CREATE : FILE_OPEN, 0,
FILE_PIPE_MESSAGE_TYPE,
FILE_PIPE_MESSAGE_MODE,
nonblocking, max_instances,
@ -149,7 +243,7 @@ fhandler_fifo::create_pipe ()
return ph;
}
/* Called when file is opened for writing. */
/* Called when a FIFO is opened for writing. */
NTSTATUS
fhandler_fifo::open_pipe ()
{
@ -174,6 +268,140 @@ fhandler_fifo::open_pipe ()
return status;
}
int
fhandler_fifo::add_client ()
{
fifo_client_handler fc;
fhandler_base *fh;
bool first = (nclients == 0);
if (nclients == MAX_CLIENTS)
{
set_errno (EMFILE);
return -1;
}
if (!(fc.dummy_evt = create_event ()))
return -1;
if (!(fh = build_fh_dev (dev ())))
{
set_errno (EMFILE);
return -1;
}
fc.fh = fh;
HANDLE ph = create_pipe_instance (first);
if (!ph)
goto errout;
fh->set_io_handle (ph);
fh->set_flags (get_flags ());
if (fc.connect () < 0)
{
fc.close ();
goto errout;
}
if (fc.state == fc_connected)
nconnected++;
client[nclients++] = fc;
return 0;
errout:
delete fh;
return -1;
}
/* Just hop to the listen_client_thread method. */
DWORD WINAPI
listen_client_func (LPVOID param)
{
fhandler_fifo *fh = (fhandler_fifo *) param;
return fh->listen_client_thread ();
}
/* Start a thread that listens for client connections. Whenever a new
client connects, it creates a new pipe_instance if necessary.
(There may already be an available instance if a client has
disconnected.) */
bool
fhandler_fifo::listen_client ()
{
if (!(lct_termination_evt = create_event ()))
return false;
listen_client_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE,
listen_client_func, (PVOID) this, 0, NULL);
if (!listen_client_thr)
{
__seterrno ();
HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
if (evt)
CloseHandle (evt);
return false;
}
return true;
}
DWORD
fhandler_fifo::listen_client_thread ()
{
while (1)
{
bool found;
HANDLE w[MAX_CLIENTS + 1];
int i;
DWORD wait_ret;
found = false;
for (i = 0; i < nclients; i++)
switch (client[i].state)
{
case fc_invalid:
if (disconnect_and_reconnect (i) < 0)
goto errout;
/* Fall through. */
case fc_connected:
w[i] = client[i].dummy_evt;
break;
case fc_connecting:
found = true;
w[i] = client[i].connect_evt;
break;
case fc_unknown: /* Shouldn't happen. */
default:
break;
}
w[nclients] = lct_termination_evt;
if (!found)
{
if (add_client () < 0)
goto errout;
else
continue;
}
if (!arm (read_ready))
{
__seterrno ();
goto errout;
}
/* Wait for a client to connect. */
wait_ret = WaitForMultipleObjects (nclients + 1, w, false, INFINITE);
i = wait_ret - WAIT_OBJECT_0;
if (i < 0 || i > nclients)
goto errout;
else if (i == nclients) /* Reader is closing. */
return 0;
else
{
client[i].state = fc_connected;
nconnected++;
set_pipe_non_blocking (client[i].fh->get_handle (), true);
yield ();
}
}
errout:
ResetEvent (read_ready);
return -1;
}
int
fhandler_fifo::open (int flags, mode_t)
{
@ -184,7 +412,6 @@ fhandler_fifo::open (int flags, mode_t)
error_set_errno
} res;
bool reader, writer, duplexer;
HANDLE ph = NULL;
/* Determine what we're doing with this fhandler: reading, writing, both */
switch (flags & O_ACCMODE)
@ -212,6 +439,9 @@ fhandler_fifo::open (int flags, mode_t)
debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer);
set_flags (flags);
if (reader)
nohandle (true);
/* Create control events for this named pipe */
char char_sa_buf[1024];
LPSECURITY_ATTRIBUTES sa_buf;
@ -234,24 +464,42 @@ fhandler_fifo::open (int flags, mode_t)
goto out;
}
/* If we're reading, create the pipe, signal that we're ready and wait for
a writer.
FIXME: Probably need to special case O_RDWR case. */
/* If we're reading, start the listen_client thread (which should
signal read_ready), and wait for a writer. */
if (reader)
{
ph = create_pipe ();
if (!ph)
if (!listen_client ())
{
debug_printf ("create of reader failed");
debug_printf ("create of listen_client thread failed");
res = error_errno_set;
goto out;
}
else if (!arm (read_ready))
/* Wait for the listen_client thread to create the pipe and
signal read_ready. This should be quick. */
HANDLE w[2] = { listen_client_thr, read_ready };
switch (WaitForMultipleObjects (2, w, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
debug_printf ("listen_client_thread exited unexpectedly");
DWORD err;
GetExitCodeThread (listen_client_thr, &err);
__seterrno_from_win_error (err);
res = error_errno_set;
goto out;
break;
case WAIT_OBJECT_0 + 1:
if (!arm (read_ready))
{
res = error_set_errno;
goto out;
}
break;
default:
res = error_set_errno;
goto out;
break;
}
else if (!duplexer && !wait (write_ready))
if (!duplexer && !wait (write_ready))
{
res = error_errno_set;
goto out;
@ -261,7 +509,8 @@ fhandler_fifo::open (int flags, mode_t)
}
/* If we're writing, wait for read_ready and then connect to the
pipe. Then signal write_ready. */
pipe. This should always succeed quickly if the reader's
listen_client thread is running. Then signal write_ready. */
if (writer)
{
if (!wait (read_ready))
@ -283,7 +532,10 @@ fhandler_fifo::open (int flags, mode_t)
goto out;
}
else
res = success;
{
set_pipe_non_blocking (get_handle (), true);
res = success;
}
}
out:
if (res == error_set_errno)
@ -302,6 +554,8 @@ out:
}
if (get_io_handle ())
CloseHandle (get_io_handle ());
if (listen_client_thr)
CloseHandle (listen_client_thr);
}
debug_printf ("res %d", res);
return res == success;
@ -396,19 +650,36 @@ void __reg3
fhandler_fifo::raw_read (void *in_ptr, size_t& len)
{
size_t orig_len = len;
/* Start the listen_client thread if necessary (e.g., after dup or fork). */
if (!listen_client_thr && !listen_client ())
goto errout;
while (1)
{
len = orig_len;
fhandler_base::raw_read (in_ptr, len);
ssize_t nread = (ssize_t) len;
if (nread > 0)
return;
else if (nread < 0 && GetLastError () != ERROR_NO_DATA)
goto errout;
else if (nread == 0) /* Writer has disconnected. */
if (nconnected == 0) /* EOF */
{
/* Not implemented yet. */
len = 0;
return;
}
/* Poll the connected clients for input. */
for (int i = 0; i < nclients; i++)
if (client[i].state == fc_connected)
{
len = orig_len;
client[i].fh->fhandler_base::raw_read (in_ptr, len);
ssize_t nread = (ssize_t) len;
if (nread > 0)
return;
else if (nread < 0 && GetLastError () != ERROR_NO_DATA)
goto errout;
else if (nread == 0) /* Client has disconnected. */
{
client[i].state = fc_invalid;
nconnected--;
}
}
if (is_nonblocking ())
{
set_errno (EAGAIN);
@ -441,12 +712,47 @@ fhandler_fifo::fstatvfs (struct statvfs *sfs)
return fh.fstatvfs (sfs);
}
int
fifo_client_handler::close ()
{
int res = 0;
if (fh)
res = fh->close ();
if (connect_evt)
CloseHandle (connect_evt);
if (dummy_evt)
CloseHandle (dummy_evt);
return res;
}
int
fhandler_fifo::close ()
{
CloseHandle (read_ready);
CloseHandle (write_ready);
return fhandler_base::close ();
int res = 0;
HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
HANDLE thr = InterlockedExchangePointer (&listen_client_thr, NULL);
if (thr)
{
if (evt)
SetEvent (evt);
WaitForSingleObject (thr, INFINITE);
DWORD err;
GetExitCodeThread (thr, &err);
if (err)
debug_printf ("listen_client_thread exited with code %d", err);
CloseHandle (thr);
}
if (evt)
CloseHandle (evt);
if (read_ready)
CloseHandle (read_ready);
if (write_ready)
CloseHandle (write_ready);
for (int i = 0; i < nclients; i++)
if (client[i].close () < 0)
res = -1;
return fhandler_base::close () || res;
}
int