Cygwin: FIFO: fix hit_eof

According to Posix, a FIFO open for reading is at EOF if it is empty
and there are no writers open.

The only way to test this is to poll the fifo_client_handlers as in
raw_read and select.cc:peek_fifo.  The current hit_eof instead relies
on the value of nconnected, which can be out of date.  On the one
hand, it doesn't take into account writers that were connected but
have since closed.  On the other hand, it doesn't take into account
writers that are in the process of opening but haven't yet connected.

Fix this by introducing a maybe_eof method that tentatively assumes
EOF if there are no connected writers after polling.  Then check for
writers currently opening (via a new 'writer_opening' event), and wait
for the fifo_reader_thread to record any new connection that was made
while we were polling.

To handle the needs of peek_fifo, replace the get_fc_handle method
by a get_fc_handler method, and add a fifo_client_handler::get_state
method.

Remove the is_connected method, which was used only in peek_fifo and
is no longer needed.

Remove the nconnected data member, which was used only for the flawed
hit_eof.

Add some comments about events to fhandler.h.
This commit is contained in:
Ken Brown 2020-04-26 09:38:46 -04:00
parent 13c65c43c2
commit 301454f132
3 changed files with 98 additions and 49 deletions

View File

@ -1296,19 +1296,26 @@ struct fifo_client_handler
/* Returns FILE_PIPE_DISCONNECTED_STATE, FILE_PIPE_LISTENING_STATE,
FILE_PIPE_CONNECTED_STATE, FILE_PIPE_CLOSING_STATE,
FILE_PIPE_INPUT_AVAILABLE_STATE, or -1 on error. */
fifo_client_connect_state &get_state () { return state; }
int pipe_state ();
};
class fhandler_fifo: public fhandler_base
{
HANDLE read_ready;
HANDLE write_ready;
/* Handles to named events shared by all fhandlers for a given FIFO. */
HANDLE read_ready; /* A reader is open; OK for a writer to open. */
HANDLE write_ready; /* A writer is open; OK for a reader to open. */
HANDLE writer_opening; /* A writer is opening; no EOF. */
/* Non-shared handles needed for the listen_client_thread. */
HANDLE listen_client_thr;
HANDLE lct_termination_evt;
UNICODE_STRING pipe_name;
WCHAR pipe_name_buf[CYGWIN_FIFO_PIPE_NAME_LEN + 1];
bool _maybe_eof;
fifo_client_handler fc_handler[MAX_CLIENTS];
int nhandlers, nconnected;
int nhandlers;
af_unix_spinlock_t _fifo_client_lock;
bool reader, writer, duplexer;
size_t max_atomic_write;
@ -1326,10 +1333,10 @@ class fhandler_fifo: public fhandler_base
public:
fhandler_fifo ();
bool hit_eof ();
bool maybe_eof () const { return _maybe_eof; }
void maybe_eof (bool val) { _maybe_eof = val; }
int get_nhandlers () const { return nhandlers; }
HANDLE get_fc_handle (int i) const { return fc_handler[i].h; }
bool is_connected (int i) const
{ return fc_handler[i].state == fc_connected; }
fifo_client_handler get_fc_handler (int i) const { return fc_handler[i]; }
PUNICODE_STRING get_pipe_name ();
DWORD listen_client_thread ();
void fifo_client_lock () { _fifo_client_lock.lock (); }

View File

@ -66,9 +66,10 @@ STATUS_PIPE_EMPTY simply means there's no data to be read. */
|| _s == STATUS_PIPE_BUSY; })
fhandler_fifo::fhandler_fifo ():
fhandler_base (), read_ready (NULL), write_ready (NULL),
listen_client_thr (NULL), lct_termination_evt (NULL), nhandlers (0),
nconnected (0), reader (false), writer (false), duplexer (false),
fhandler_base (),
read_ready (NULL), write_ready (NULL), writer_opening (NULL),
listen_client_thr (NULL), lct_termination_evt (NULL), _maybe_eof (false), nhandlers (0),
reader (false), writer (false), duplexer (false),
max_atomic_write (DEFAULT_PIPEBUFSIZE)
{
pipe_name_buf[0] = L'\0';
@ -295,7 +296,8 @@ fhandler_fifo::record_connection (fifo_client_handler& fc,
{
SetEvent (write_ready);
fc.state = s;
nconnected++;
maybe_eof (false);
ResetEvent (writer_opening);
set_pipe_non_blocking (fc.h, true);
}
@ -465,6 +467,13 @@ fhandler_fifo::open (int flags, mode_t)
res = error_set_errno;
goto out;
}
npbuf[0] = 'o';
if (!(writer_opening = CreateEvent (sa_buf, true, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
res = error_set_errno;
goto out;
}
/* If we're a duplexer, create the pipe and the first client handler. */
if (duplexer)
@ -518,10 +527,12 @@ fhandler_fifo::open (int flags, mode_t)
listen_client thread is running. Then signal write_ready. */
if (writer)
{
SetEvent (writer_opening);
while (1)
{
if (!wait (read_ready))
{
ResetEvent (writer_opening);
res = error_errno_set;
goto out;
}
@ -540,6 +551,7 @@ fhandler_fifo::open (int flags, mode_t)
debug_printf ("create of writer failed");
__seterrno_from_nt_status (status);
res = error_errno_set;
ResetEvent (writer_opening);
goto out;
}
}
@ -559,6 +571,11 @@ out:
NtClose (write_ready);
write_ready = NULL;
}
if (writer_opening)
{
NtClose (writer_opening);
writer_opening = NULL;
}
if (get_handle ())
NtClose (get_handle ());
if (listen_client_thr)
@ -717,28 +734,23 @@ fhandler_fifo::raw_write (const void *ptr, size_t len)
return ret;
}
/* A FIFO open for reading is at EOF if no process has it open for
writing. We test this by checking nconnected. But we must take
account of the possible delay from the time of connection to the
time the connection is recorded by the listen_client thread. */
/* A reader is at EOF if the pipe is empty and no writers are open.
hit_eof is called by raw_read and select.cc:peek_fifo if it appears
that we are at EOF after polling the fc_handlers. We recheck this
in case a writer opened while we were polling. */
bool
fhandler_fifo::hit_eof ()
{
bool eof;
bool retry = true;
repeat:
bool ret = maybe_eof () && !IsEventSignalled (writer_opening);
if (ret)
{
yield ();
/* Wait for the reader thread to finish recording any connection. */
fifo_client_lock ();
eof = (nconnected == 0);
fifo_client_unlock ();
if (eof && retry)
{
retry = false;
/* Give the listen_client thread time to catch up. */
Sleep (1);
goto repeat;
}
return eof;
ret = maybe_eof ();
}
return ret;
}
/* Is the lct running? */
@ -783,13 +795,8 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
while (1)
{
if (hit_eof ())
{
len = 0;
return;
}
/* Poll the connected clients for input. */
int nconnected = 0;
fifo_client_lock ();
for (int i = 0; i < nhandlers; i++)
if (fc_handler[i].state >= fc_connected)
@ -798,7 +805,8 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
IO_STATUS_BLOCK io;
size_t nbytes = 0;
status = NtReadFile (get_fc_handle (i), NULL, NULL, NULL,
nconnected++;
status = NtReadFile (fc_handler[i].h, NULL, NULL, NULL,
&io, in_ptr, len, NULL, NULL);
switch (status)
{
@ -826,7 +834,13 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
break;
}
}
maybe_eof (!nconnected && !IsEventSignalled (writer_opening));
fifo_client_unlock ();
if (maybe_eof () && hit_eof ())
{
len = 0;
return;
}
if (is_nonblocking ())
{
set_errno (EAGAIN);
@ -928,6 +942,8 @@ fhandler_fifo::close ()
NtClose (read_ready);
if (write_ready)
NtClose (write_ready);
if (writer_opening)
NtClose (writer_opening);
fifo_client_lock ();
for (int i = 0; i < nhandlers; i++)
fc_handler[i].close ();
@ -979,6 +995,13 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
__seterrno ();
goto err_close_read_ready;
}
if (!DuplicateHandle (GetCurrentProcess (), writer_opening,
GetCurrentProcess (), &fhf->writer_opening,
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
{
__seterrno ();
goto err_close_write_ready;
}
if (reader)
{
/* Make sure the child starts unlocked. */
@ -1009,6 +1032,9 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
err_close_handlers:
for (int j = 0; j < i; j++)
fhf->fc_handler[j].close ();
/* err_close_writer_opening: */
NtClose (fhf->writer_opening);
err_close_write_ready:
NtClose (fhf->write_ready);
err_close_read_ready:
NtClose (fhf->read_ready);
@ -1028,6 +1054,7 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
fhandler_base::fixup_after_fork (parent);
fork_fixup (parent, read_ready, "read_ready");
fork_fixup (parent, write_ready, "write_ready");
fork_fixup (parent, writer_opening, "writer_opening");
if (reader)
{
/* Make sure the child starts unlocked. */
@ -1062,6 +1089,7 @@ fhandler_fifo::set_close_on_exec (bool val)
fhandler_base::set_close_on_exec (val);
set_no_inheritance (read_ready, val);
set_no_inheritance (write_ready, val);
set_no_inheritance (writer_opening, val);
fifo_client_lock ();
for (int i = 0; i < nhandlers; i++)
set_no_inheritance (fc_handler[i].h, val);

View File

@ -866,31 +866,45 @@ peek_fifo (select_record *s, bool from_select)
goto out;
}
if (fh->hit_eof ())
{
select_printf ("read: %s, saw EOF", fh->get_name ());
gotone = s->read_ready = true;
if (s->except_selected)
gotone += s->except_ready = true;
goto out;
}
fh->fifo_client_lock ();
int nconnected = 0;
for (int i = 0; i < fh->get_nhandlers (); i++)
if (fh->is_connected (i))
if (fh->get_fc_handler (i).get_state () >= fc_connected)
{
int n = pipe_data_available (s->fd, fh, fh->get_fc_handle (i),
false);
if (n > 0)
nconnected++;
switch (fh->get_fc_handler (i).pipe_state ())
{
select_printf ("read: %s, ready for read: avail %d, client %d",
fh->get_name (), n, i);
case FILE_PIPE_CONNECTED_STATE:
fh->get_fc_handler (i).get_state () = fc_connected;
break;
case FILE_PIPE_DISCONNECTED_STATE:
fh->get_fc_handler (i).get_state () = fc_disconnected;
nconnected--;
break;
case FILE_PIPE_CLOSING_STATE:
fh->get_fc_handler (i).get_state () = fc_closing;
break;
case FILE_PIPE_INPUT_AVAILABLE_STATE:
fh->get_fc_handler (i).get_state () = fc_input_avail;
select_printf ("read: %s, ready for read", fh->get_name ());
fh->fifo_client_unlock ();
gotone += s->read_ready = true;
goto out;
default:
fh->get_fc_handler (i).get_state () = fc_error;
nconnected--;
break;
}
}
fh->maybe_eof (!nconnected);
fh->fifo_client_unlock ();
if (fh->maybe_eof () && fh->hit_eof ())
{
select_printf ("read: %s, saw EOF", fh->get_name ());
gotone += s->read_ready = true;
if (s->except_selected)
gotone += s->except_ready = true;
}
}
out:
if (s->write_selected)