Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 84 additions & 43 deletions drivers/pipes/pipe_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer,
FAR struct inode *inode = filep->f_inode;
FAR struct pipe_dev_s *dev = inode->i_private;
ssize_t nwritten = 0;
ssize_t last;
int ret;

DEBUGASSERT(dev);
Expand Down Expand Up @@ -545,70 +546,110 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer,
return ret;
}

/* REVISIT: "If all file descriptors referring to the read end of a
* pipe have been closed, then a write will cause a SIGPIPE signal to
* be generated for the calling process. If the calling process is
* ignoring this signal, then write(2) fails with the error EPIPE."
*/
/* Loop until all of the bytes have been written */

if (dev->d_nreaders <= 0 && PIPE_IS_POLICY_0(dev->d_flags))
last = 0;
for (; ; )
{
nxrmutex_unlock(&dev->d_bflock);
return -EPIPE;
}

/* Would the next write overflow the circular buffer? */
/* REVISIT: "If all file descriptors referring to the read end of a
* pipe have been closed, then a write will cause a SIGPIPE signal to
* be generated for the calling process. If the calling process is
* ignoring this signal, then write(2) fails with the error EPIPE."
*/

while (circbuf_is_full(&dev->d_buffer))
{
if (filep->f_oflags & O_NONBLOCK)
if (dev->d_nreaders <= 0 && PIPE_IS_POLICY_0(dev->d_flags))
{
/* If O_NONBLOCK was set, then return EGAIN. */

nxrmutex_unlock(&dev->d_bflock);
return -EAGAIN;
return nwritten == 0 ? -EPIPE : nwritten;
}
else

/* Would the next write overflow the circular buffer? */

if (!circbuf_is_full(&dev->d_buffer))
{
/* Wait for data to be removed from the pipe. */
/* Loop until all of the bytes have been written */

nxrmutex_unlock(&dev->d_bflock);
ret = nxsem_wait(&dev->d_wrsem);
if (ret < 0 || (ret = nxrmutex_lock(&dev->d_bflock)) < 0)
nwritten += circbuf_write(&dev->d_buffer,
buffer + nwritten, len - nwritten);

if ((size_t)nwritten == len)
{
/* Either call nxsem_wait may fail because a signal was
* received or if the task was canceled.
/* Notify all poll/select waiters that they can read from the
* FIFO when buffer used exceeds poll threshold.
*/

return (ssize_t)ret;
if (circbuf_used(&dev->d_buffer) > dev->d_pollinthrd)
{
poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS,
POLLIN);
}

/* Yes.. Notify all of the waiting readers that more data is
* available.
*/

pipecommon_wakeup(&dev->d_rdsem);

/* Return the number of bytes written */

nxrmutex_unlock(&dev->d_bflock);
return len;
}
}
}
else
{
/* There is not enough room for the next byte. Was anything
* written in this pass?
*/

/* Write data to buffer. */
if (last < nwritten)
{
/* Notify all poll/select waiters that they can read from the
* FIFO.
*/

nwritten = circbuf_write(&dev->d_buffer, buffer, len);
poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS, POLLIN);

/* Notify all poll/select waiters that they can read from the
* FIFO when buffer used exceeds poll threshold.
*/
/* Yes.. Notify all of the waiting readers that more data is
* available.
*/

if (circbuf_used(&dev->d_buffer) > dev->d_pollinthrd)
{
poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS,
POLLIN);
}
pipecommon_wakeup(&dev->d_rdsem);
}

/* Yes.. Notify all of the waiting readers that more data is
* available.
*/
last = nwritten;

pipecommon_wakeup(&dev->d_rdsem);
/* If O_NONBLOCK was set, then return partial bytes written or
* EGAIN.
*/

/* Return the number of bytes written */
if (filep->f_oflags & O_NONBLOCK)
{
if (nwritten == 0)
{
nwritten = -EAGAIN;
}

nxrmutex_unlock(&dev->d_bflock);
return nwritten;
nxrmutex_unlock(&dev->d_bflock);
return nwritten;
}

/* There is more to be written.. wait for data to be removed from
* the pipe
*/

nxrmutex_unlock(&dev->d_bflock);
ret = nxsem_wait(&dev->d_wrsem);
if (ret < 0 || (ret = nxrmutex_lock(&dev->d_bflock)) < 0)
{
/* Either call nxsem_wait may fail because a signal was
* received or if the task was canceled.
*/

return nwritten == 0 ? (ssize_t)ret : nwritten;
}
}
}
}

/****************************************************************************
Expand Down