Cygwin: pipe: Use read pipe handle for select() on write pipe.
- Usually WriteQuotaAvailable retrieved by NtQueryInformationFile() on the write side reflects the space available in the inbound buffer on the read side. However, if a pipe read is currently pending, WriteQuotaAvailable on the write side is decremented by the number of bytes the read side is requesting. So it's possible (even likely) that WriteQuotaAvailable is 0, even if the inbound buffer on the read side is not full. This can lead to a deadlock situation: The reader is waiting for data, but select on the writer side assumes that no space is available in the read side inbound buffer. Currently, to avoid this stuation, read() does not request larger block than pipe size - 1. However, this mechanism does not take effect if the reader side is non-cygwin app. The only reliable information is available on the read side, so fetch info from the read side via the pipe-specific query handle (query_hdl) introduced. If the query_hdl (read handle) is kept in write side, writer can not detect closure of read pipe. Therefore, raw_write() counts write handle and query_hdl. If they are equal, only the pairs of write handle and query_hdl are alive. In this case, raw_write() returns EPIPE and raises SIGPIPE. - Nonblocking pipes (PIPE_NOWAIT) are not well handled by non-Cygwin tools, so convert pipe handles to PIPE_WAIT handles when spawning a non-Cygwin process.
This commit is contained in:
parent
0d12015670
commit
f79a46112e
|
@ -1176,10 +1176,22 @@ class fhandler_pipe_fifo: public fhandler_base
|
|||
{
|
||||
protected:
|
||||
size_t pipe_buf_size;
|
||||
HANDLE query_hdl;
|
||||
|
||||
public:
|
||||
fhandler_pipe_fifo ();
|
||||
|
||||
HANDLE get_query_handle () const { return query_hdl; }
|
||||
void close_query_handle ()
|
||||
{
|
||||
if (query_hdl)
|
||||
{
|
||||
CloseHandle (query_hdl);
|
||||
query_hdl = NULL;
|
||||
}
|
||||
}
|
||||
bool reader_closed ();
|
||||
|
||||
ssize_t __reg3 raw_write (const void *ptr, size_t len);
|
||||
|
||||
};
|
||||
|
@ -1189,7 +1201,6 @@ class fhandler_pipe: public fhandler_pipe_fifo
|
|||
private:
|
||||
HANDLE read_mtx;
|
||||
pid_t popen_pid;
|
||||
void set_pipe_non_blocking (bool nonblocking);
|
||||
public:
|
||||
fhandler_pipe ();
|
||||
|
||||
|
@ -1237,6 +1248,7 @@ public:
|
|||
fh->copy_from (this);
|
||||
return fh;
|
||||
}
|
||||
void set_pipe_non_blocking (bool nonblocking);
|
||||
};
|
||||
|
||||
#define CYGWIN_FIFO_PIPE_NAME_LEN 47
|
||||
|
|
|
@ -202,6 +202,8 @@ fhandler_pipe::open_setup (int flags)
|
|||
if (!read_mtx)
|
||||
debug_printf ("CreateMutex failed: %E");
|
||||
}
|
||||
if (get_dev () == FH_PIPEW && !query_hdl)
|
||||
set_pipe_non_blocking (is_nonblocking ());
|
||||
}
|
||||
|
||||
off_t
|
||||
|
@ -268,39 +270,22 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
|
|||
while (nbytes < len)
|
||||
{
|
||||
ULONG_PTR nbytes_now = 0;
|
||||
size_t left = len - nbytes;
|
||||
ULONG len1 = (ULONG) left;
|
||||
ULONG len1 = (ULONG) (len - nbytes);
|
||||
waitret = WAIT_OBJECT_0;
|
||||
|
||||
if (evt)
|
||||
ResetEvent (evt);
|
||||
if (!is_nonblocking ())
|
||||
FILE_PIPE_LOCAL_INFORMATION fpli;
|
||||
status = NtQueryInformationFile (get_handle (), &io,
|
||||
&fpli, sizeof (fpli),
|
||||
FilePipeLocalInformation);
|
||||
if (NT_SUCCESS (status))
|
||||
{
|
||||
FILE_PIPE_LOCAL_INFORMATION fpli;
|
||||
|
||||
/* If the pipe is empty, don't request more bytes than pipe
|
||||
buffer size - 1. Pending read lowers WriteQuotaAvailable on
|
||||
the write side and thus affects select's ability to return
|
||||
more or less reliable info whether a write succeeds or not. */
|
||||
ULONG chunk = pipe_buf_size - 1;
|
||||
status = NtQueryInformationFile (get_handle (), &io,
|
||||
&fpli, sizeof (fpli),
|
||||
FilePipeLocalInformation);
|
||||
if (NT_SUCCESS (status))
|
||||
{
|
||||
if (fpli.ReadDataAvailable > 0)
|
||||
chunk = left;
|
||||
else if (nbytes != 0)
|
||||
break;
|
||||
else
|
||||
chunk = fpli.InboundQuota - 1;
|
||||
}
|
||||
else if (nbytes != 0)
|
||||
break;
|
||||
|
||||
if (len1 > chunk)
|
||||
len1 = chunk;
|
||||
if (fpli.ReadDataAvailable == 0 && nbytes != 0)
|
||||
break;
|
||||
}
|
||||
else if (nbytes != 0)
|
||||
break;
|
||||
status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr,
|
||||
len1, NULL, NULL);
|
||||
if (evt && status == STATUS_PENDING)
|
||||
|
@ -385,6 +370,16 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
|
|||
len = nbytes;
|
||||
}
|
||||
|
||||
bool
|
||||
fhandler_pipe_fifo::reader_closed ()
|
||||
{
|
||||
if (!query_hdl)
|
||||
return false;
|
||||
int n_reader = get_obj_handle_count (query_hdl);
|
||||
int n_writer = get_obj_handle_count (get_handle ());
|
||||
return n_reader == n_writer;
|
||||
}
|
||||
|
||||
ssize_t __reg3
|
||||
fhandler_pipe_fifo::raw_write (const void *ptr, size_t len)
|
||||
{
|
||||
|
@ -457,7 +452,19 @@ fhandler_pipe_fifo::raw_write (const void *ptr, size_t len)
|
|||
}
|
||||
if (evt && status == STATUS_PENDING)
|
||||
{
|
||||
waitret = cygwait (evt, INFINITE, cw_cancel | cw_sig);
|
||||
while (WAIT_TIMEOUT ==
|
||||
(waitret = cygwait (evt, (DWORD) 0, cw_cancel | cw_sig)))
|
||||
{
|
||||
if (reader_closed ())
|
||||
{
|
||||
CancelIo (get_handle ());
|
||||
set_errno (EPIPE);
|
||||
raise (SIGPIPE);
|
||||
break;
|
||||
}
|
||||
else
|
||||
cygwait (select_sem, 10);
|
||||
}
|
||||
/* If io.Status is STATUS_CANCELLED after CancelIo, IO has actually
|
||||
been cancelled and io.Information contains the number of bytes
|
||||
processed so far.
|
||||
|
@ -523,6 +530,8 @@ fhandler_pipe::set_close_on_exec (bool val)
|
|||
set_no_inheritance (read_mtx, val);
|
||||
if (select_sem)
|
||||
set_no_inheritance (select_sem, val);
|
||||
if (query_hdl)
|
||||
set_no_inheritance (query_hdl, val);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -532,6 +541,9 @@ fhandler_pipe::fixup_after_fork (HANDLE parent)
|
|||
fork_fixup (parent, read_mtx, "read_mtx");
|
||||
if (select_sem)
|
||||
fork_fixup (parent, select_sem, "select_sem");
|
||||
if (query_hdl)
|
||||
fork_fixup (parent, query_hdl, "query_hdl");
|
||||
|
||||
fhandler_base::fixup_after_fork (parent);
|
||||
}
|
||||
|
||||
|
@ -562,6 +574,15 @@ fhandler_pipe::dup (fhandler_base *child, int flags)
|
|||
ftp->close ();
|
||||
res = -1;
|
||||
}
|
||||
else if (query_hdl &&
|
||||
!DuplicateHandle (GetCurrentProcess (), query_hdl,
|
||||
GetCurrentProcess (), &ftp->query_hdl,
|
||||
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
||||
{
|
||||
__seterrno ();
|
||||
ftp->close ();
|
||||
res = -1;
|
||||
}
|
||||
|
||||
debug_printf ("res %d", res);
|
||||
return res;
|
||||
|
@ -577,6 +598,8 @@ fhandler_pipe::close ()
|
|||
ReleaseSemaphore (select_sem, get_obj_handle_count (select_sem), NULL);
|
||||
CloseHandle (select_sem);
|
||||
}
|
||||
if (query_hdl)
|
||||
CloseHandle (query_hdl);
|
||||
return fhandler_base::close ();
|
||||
}
|
||||
|
||||
|
@ -797,6 +820,19 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode)
|
|||
DuplicateHandle (GetCurrentProcess (), fhs[0]->select_sem,
|
||||
GetCurrentProcess (), &fhs[1]->select_sem,
|
||||
0, sa->bInheritHandle, DUPLICATE_SAME_ACCESS);
|
||||
if (!DuplicateHandle (GetCurrentProcess (), r,
|
||||
GetCurrentProcess (), &fhs[1]->query_hdl,
|
||||
FILE_READ_DATA, sa->bInheritHandle, 0))
|
||||
{
|
||||
CloseHandle (fhs[0]->select_sem);
|
||||
delete fhs[0];
|
||||
CloseHandle (r);
|
||||
CloseHandle (fhs[1]->select_sem);
|
||||
delete fhs[1];
|
||||
CloseHandle (w);
|
||||
}
|
||||
else
|
||||
res = 0;
|
||||
}
|
||||
|
||||
debug_printf ("%R = pipe([%p, %p], %d, %y)", res, fhs[0], fhs[1], psize, mode);
|
||||
|
|
|
@ -608,16 +608,47 @@ pipe_data_available (int fd, fhandler_base *fh, HANDLE h, bool writing)
|
|||
}
|
||||
if (writing)
|
||||
{
|
||||
/* WriteQuotaAvailable is decremented by the number of bytes requested
|
||||
by a blocking reader on the other side of the pipe. Cygwin readers
|
||||
are serialized and never request a number of bytes equivalent to the
|
||||
full buffer size. So WriteQuotaAvailable is 0 only if either the
|
||||
read buffer on the other side is really full, or if we have non-Cygwin
|
||||
readers. */
|
||||
/* If there is anything available in the pipe buffer then signal
|
||||
that. This means that a pipe could still block since you could
|
||||
be trying to write more to the pipe than is available in the
|
||||
buffer but that is the hazard of select().
|
||||
|
||||
Note that WriteQuotaAvailable is unreliable.
|
||||
|
||||
Usually WriteQuotaAvailable on the write side reflects the space
|
||||
available in the inbound buffer on the read side. However, if a
|
||||
pipe read is currently pending, WriteQuotaAvailable on the write side
|
||||
is decremented by the number of bytes the read side is requesting.
|
||||
So it's possible (even likely) that WriteQuotaAvailable is 0, even
|
||||
if the inbound buffer on the read side is not full. This can lead to
|
||||
a deadlock situation: The reader is waiting for data, but select
|
||||
on the writer side assumes that no space is available in the read
|
||||
side inbound buffer.
|
||||
|
||||
Consequentially, the only reliable information is available on the
|
||||
read side, so fetch info from the read side via the pipe-specific
|
||||
query handle. Use fpli.WriteQuotaAvailable as storage for the actual
|
||||
interesting value, which is the InboundQuote on the write side,
|
||||
decremented by the number of bytes of data in that buffer. */
|
||||
/* Note: Do not use NtQueryInformationFile() for query_hdl because
|
||||
NtQueryInformationFile() seems to interfere with reading pipes
|
||||
in non-cygwin apps. Instead, use PeekNamedPipe() here. */
|
||||
if (fh->get_device () == FH_PIPEW)
|
||||
{
|
||||
HANDLE query_hdl = ((fhandler_pipe *) fh)->get_query_handle ();
|
||||
if (query_hdl)
|
||||
{
|
||||
DWORD nbytes_in_pipe;
|
||||
PeekNamedPipe (query_hdl, NULL, 0, NULL, &nbytes_in_pipe, NULL);
|
||||
fpli.WriteQuotaAvailable = fpli.InboundQuota - nbytes_in_pipe;
|
||||
}
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
if (fpli.WriteQuotaAvailable > 0)
|
||||
{
|
||||
paranoid_printf ("fd %d, %s, write: size %u, avail %u", fd,
|
||||
fh->get_name (), fpli.OutboundQuota,
|
||||
fh->get_name (), fpli.InboundQuota,
|
||||
fpli.WriteQuotaAvailable);
|
||||
return 1;
|
||||
}
|
||||
|
@ -712,6 +743,13 @@ out:
|
|||
h = fh->get_output_handle ();
|
||||
if (s->write_selected && dev != FH_PIPER)
|
||||
{
|
||||
if (dev == FH_PIPEW && ((fhandler_pipe *) fh)->reader_closed ())
|
||||
{
|
||||
gotone += s->write_ready = true;
|
||||
if (s->except_selected)
|
||||
gotone += s->except_ready = true;
|
||||
return gotone;
|
||||
}
|
||||
gotone += s->write_ready = pipe_data_available (s->fd, fh, h, true);
|
||||
select_printf ("write: %s, gotone %d", fh->get_name (), gotone);
|
||||
}
|
||||
|
|
|
@ -657,6 +657,17 @@ child_info_spawn::worker (const char *prog_arg, const char *const *argv,
|
|||
ptys->create_invisible_console ();
|
||||
ptys->setup_locale ();
|
||||
}
|
||||
else if (cfd->get_dev () == FH_PIPEW)
|
||||
{
|
||||
fhandler_pipe *pipe = (fhandler_pipe *)(fhandler_base *) cfd;
|
||||
pipe->close_query_handle ();
|
||||
pipe->set_pipe_non_blocking (false);
|
||||
}
|
||||
else if (cfd->get_dev () == FH_PIPER)
|
||||
{
|
||||
fhandler_pipe *pipe = (fhandler_pipe *)(fhandler_base *) cfd;
|
||||
pipe->set_pipe_non_blocking (false);
|
||||
}
|
||||
}
|
||||
|
||||
bool enable_pcon = false;
|
||||
|
|
Loading…
Reference in New Issue