diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index dac41f2220e..f253382a0cc 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -61,7 +61,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_LIB Yes) set(FLB_IN_RANDOM Yes) set(FLB_IN_SERIAL No) - set(FLB_IN_STDIN No) + set(FLB_IN_STDIN Yes) set(FLB_IN_SYSLOG Yes) set(FLB_IN_TAIL Yes) set(FLB_IN_TCP Yes) diff --git a/plugins/in_stdin/in_stdin.c b/plugins/in_stdin/in_stdin.c index 3f4e2786745..04193a4f788 100644 --- a/plugins/in_stdin/in_stdin.c +++ b/plugins/in_stdin/in_stdin.c @@ -34,6 +34,11 @@ #include #include +#ifdef FLB_SYSTEM_WINDOWS +#include +#include +#endif + #include "in_stdin.h" static inline void consume_bytes(char *buf, int bytes, int length) @@ -179,6 +184,63 @@ static inline int pack_regex(struct flb_in_stdin_config *ctx, return ret; } +#ifdef FLB_SYSTEM_WINDOWS +static int stdin_win32_available(struct flb_in_stdin_config *ctx) +{ + BOOL ret; + DWORD stdin_type; + DWORD available; + DWORD err; + + stdin_type = ctx->stdin_type & ~FILE_TYPE_REMOTE; + + if (stdin_type == FILE_TYPE_PIPE) { + ret = PeekNamedPipe(ctx->stdin_handle, NULL, 0, NULL, &available, NULL); + if (ret == FALSE) { + err = GetLastError(); + if (err == ERROR_BROKEN_PIPE || err == ERROR_HANDLE_EOF) { + return 0; + } + + flb_plg_debug(ctx->ins, "could not query stdin pipe: win32 error=%lu", err); + return 0; + } + + if (available == 0) { + return 0; + } + + return 1; + } + else if (stdin_type == FILE_TYPE_CHAR) { + if (_kbhit() == 0) { + return 0; + } + + return 1; + } + + return 1; +} + +static int stdin_read(struct flb_in_stdin_config *ctx, char *buf, size_t size) +{ + int ret; + + ret = stdin_win32_available(ctx); + if (ret <= 0) { + return ret; + } + + return _read(ctx->fd, buf, (unsigned int) size); +} +#else +static int stdin_read(struct flb_in_stdin_config *ctx, char *buf, size_t size) +{ + return read(ctx->fd, buf, size); +} +#endif + static int in_stdin_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { @@ -191,11 +253,17 @@ static int in_stdin_collect(struct flb_input_instance *ins, struct flb_time out_time; struct flb_in_stdin_config *ctx = in_context; - bytes = read(ctx->fd, - ctx->buf + ctx->buf_len, - ctx->buf_size - ctx->buf_len - 1); + bytes = stdin_read(ctx, + ctx->buf + ctx->buf_len, + ctx->buf_size - ctx->buf_len - 1); flb_plg_trace(ctx->ins, "stdin read() = %i", bytes); +#ifdef FLB_SYSTEM_WINDOWS + if (bytes == 0 && (ctx->stdin_type & ~FILE_TYPE_REMOTE) == FILE_TYPE_PIPE) { + return 0; + } +#endif + if (bytes == 0) { flb_plg_warn(ctx->ins, "end of file (stdin closed by remote end)"); } @@ -310,6 +378,7 @@ static int in_stdin_config_init(struct flb_in_stdin_config *ctx, ctx->buf = NULL; ctx->buf_len = 0; ctx->ins = in; + ctx->fd = -1; ret = flb_input_config_map_set(in, (void *)ctx); if (ret == -1) { @@ -391,8 +460,26 @@ static int in_stdin_init(struct flb_input_instance *in, goto init_error; } +#ifdef FLB_SYSTEM_WINDOWS + ctx->stdin_handle = GetStdHandle(STD_INPUT_HANDLE); + if (ctx->stdin_handle == INVALID_HANDLE_VALUE || ctx->stdin_handle == NULL) { + flb_plg_error(ctx->ins, "could not open standard input handle"); + goto init_error; + } + SetLastError(NO_ERROR); + ctx->stdin_type = GetFileType(ctx->stdin_handle); + if (ctx->stdin_type == FILE_TYPE_UNKNOWN && GetLastError() != NO_ERROR) { + flb_plg_error(ctx->ins, "could not detect standard input handle type"); + goto init_error; + } +#endif + /* Clone the standard input file descriptor */ +#ifdef FLB_SYSTEM_WINDOWS + fd = _dup(_fileno(stdin)); +#else fd = dup(STDIN_FILENO); +#endif if (fd == -1) { flb_errno(); flb_plg_error(ctx->ins, "Could not open standard input!"); @@ -407,11 +494,19 @@ static int in_stdin_init(struct flb_input_instance *in, /* Set the context */ flb_input_set_context(in, ctx); +#ifdef FLB_SYSTEM_WINDOWS + ret = flb_input_set_collector_time(in, + in_stdin_collect, + 0, + FLB_STDIN_WIN32_COLLECT_NSEC, + config); +#else /* Collect upon data available on the standard input */ ret = flb_input_set_collector_event(in, in_stdin_collect, ctx->fd, config); +#endif if (ret == -1) { flb_plg_error(ctx->ins, "Could not set collector for STDIN input plugin"); goto init_error; @@ -421,6 +516,14 @@ static int in_stdin_init(struct flb_input_instance *in, return 0; init_error: + if (ctx->fd >= 0) { +#ifdef FLB_SYSTEM_WINDOWS + _close(ctx->fd); +#else + close(ctx->fd); +#endif + ctx->fd = -1; + } in_stdin_config_destroy(ctx); return -1; @@ -436,7 +539,11 @@ static int in_stdin_exit(void *in_context, struct flb_config *config) } if (ctx->fd >= 0) { +#ifdef FLB_SYSTEM_WINDOWS + _close(ctx->fd); +#else close(ctx->fd); +#endif } flb_pack_state_reset(&ctx->pack_state); in_stdin_config_destroy(ctx); diff --git a/plugins/in_stdin/in_stdin.h b/plugins/in_stdin/in_stdin.h index d18d052b74a..35d440d93ba 100644 --- a/plugins/in_stdin/in_stdin.h +++ b/plugins/in_stdin/in_stdin.h @@ -25,7 +25,12 @@ #include #include +#ifdef FLB_SYSTEM_WINDOWS +#include +#endif + #define DEFAULT_BUF_SIZE 16000 +#define FLB_STDIN_WIN32_COLLECT_NSEC 100000000L /* STDIN Input configuration & context */ struct flb_in_stdin_config { @@ -41,6 +46,11 @@ struct flb_in_stdin_config { struct flb_pack_state pack_state; struct flb_input_instance *ins; struct flb_log_event_encoder *log_encoder; + +#ifdef FLB_SYSTEM_WINDOWS + HANDLE stdin_handle; + DWORD stdin_type; +#endif }; extern struct flb_input_plugin in_stdin_plugin;