Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion lib/test/CThreadDataReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,22 @@ void CThreadDataReader::run() {

static const std::streamsize BUF_SIZE{512};
char buffer[BUF_SIZE];
while (strm.good()) {

// For regular files the reader can open the file while the writer
// is still flushing, hit a premature EOF, and stop too early.
// After hitting EOF we clear the stream state and retry a limited
// number of times, sleeping between each attempt. Any successful
// read resets the counter so we only give up after the writer has
// truly finished.
//
// 50 retries * 40ms sleep = 2 seconds total patience after the last
// successful read. The previous value of 10 (~400ms) was too low
// for loaded CI agents where kernel buffer flushing can be delayed.
// See https://github.com/elastic/ml-cpp/issues/2890.
static const std::size_t MAX_EOF_RETRIES{50};
std::size_t eofRetries{0};

for (;;) {
if (m_Shutdown) {
return;
}
Expand All @@ -75,6 +90,7 @@ void CThreadDataReader::run() {
return;
}
if (strm.gcount() > 0) {
eofRetries = 0;
core::CScopedLock lock(m_Mutex);
// This code deals with the test character we write to
// detect the short-lived connection problem on Windows
Expand All @@ -88,6 +104,16 @@ void CThreadDataReader::run() {
m_Data.append(copyFrom, copyLen);
}
}
if (strm.eof()) {
if (strm.gcount() == 0) {
++eofRetries;
if (eofRetries > MAX_EOF_RETRIES) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(m_SleepTimeMs));
}
strm.clear();
}
}
}

Expand Down