55#include <fluent-bit/flb_error.h>
66#include <fluent-bit/flb_socket.h>
77#include <fluent-bit/flb_task.h>
8+ #include <fluent-bit/flb_output.h>
9+ #include <fluent-bit/flb_scheduler.h>
10+ #include <string.h>
811
912#include "flb_tests_internal.h"
1013
1114#define TASK_COUNT_LIMIT (FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT + 1)
1215
1316struct test_ctx {
1417 struct flb_config * config ;
18+ struct mk_event_loop * evl ;
1519};
1620
1721struct test_ctx * test_ctx_create ()
@@ -32,6 +36,22 @@ struct test_ctx* test_ctx_create()
3236 return NULL ;
3337 }
3438
39+ ret_ctx -> evl = mk_event_loop_create (8 );
40+ if (!TEST_CHECK (ret_ctx -> evl != NULL )) {
41+ flb_config_exit (ret_ctx -> config );
42+ flb_free (ret_ctx );
43+ return NULL ;
44+ }
45+
46+ ret_ctx -> config -> evl = ret_ctx -> evl ;
47+ ret_ctx -> config -> sched = flb_sched_create (ret_ctx -> config , ret_ctx -> evl );
48+ if (!TEST_CHECK (ret_ctx -> config -> sched != NULL )) {
49+ mk_event_loop_destroy (ret_ctx -> evl );
50+ flb_config_exit (ret_ctx -> config );
51+ flb_free (ret_ctx );
52+ return NULL ;
53+ }
54+
3555 return ret_ctx ;
3656}
3757
@@ -88,7 +108,91 @@ void test_task_map_limit()
88108 test_ctx_destroy (ctx );
89109}
90110
111+ void test_task_route_data_preserved_across_retry ()
112+ {
113+ int ret ;
114+ int records ;
115+ size_t bytes ;
116+ struct test_ctx * ctx ;
117+ struct flb_task * task ;
118+ struct flb_output_instance out_a ;
119+ struct flb_output_instance out_b ;
120+ struct flb_task_route * route_a ;
121+ struct flb_task_route * route_b ;
122+ struct flb_task_retry * retry ;
123+
124+ ctx = test_ctx_create ();
125+ if (!TEST_CHECK (ctx != NULL )) {
126+ return ;
127+ }
128+
129+ task = task_alloc (ctx -> config );
130+ if (!TEST_CHECK (task != NULL )) {
131+ test_ctx_destroy (ctx );
132+ return ;
133+ }
134+
135+ /* Avoid input chunk up/down side effects in retry creation. */
136+ task -> users = 2 ;
137+
138+ memset (& out_a , 0 , sizeof (out_a ));
139+ memset (& out_b , 0 , sizeof (out_b ));
140+ out_a .retry_limit = 5 ;
141+ out_b .retry_limit = 5 ;
142+
143+ route_a = flb_calloc (1 , sizeof (struct flb_task_route ));
144+ route_b = flb_calloc (1 , sizeof (struct flb_task_route ));
145+ TEST_CHECK (route_a != NULL );
146+ TEST_CHECK (route_b != NULL );
147+ if (!route_a || !route_b ) {
148+ flb_free (route_a );
149+ flb_free (route_b );
150+ flb_task_destroy (task , FLB_TRUE );
151+ test_ctx_destroy (ctx );
152+ return ;
153+ }
154+
155+ route_a -> status = FLB_TASK_ROUTE_ACTIVE ;
156+ route_a -> out = & out_a ;
157+ route_a -> records = 3 ;
158+ route_a -> bytes = 300 ;
159+ route_b -> status = FLB_TASK_ROUTE_ACTIVE ;
160+ route_b -> out = & out_b ;
161+ route_b -> records = 7 ;
162+ route_b -> bytes = 700 ;
163+ mk_list_add (& route_a -> _head , & task -> routes );
164+ mk_list_add (& route_b -> _head , & task -> routes );
165+
166+ flb_task_set_route_data (task , & out_a , 1 , 111 );
167+ ret = flb_task_get_route_data (task , & out_a , & records , & bytes );
168+ TEST_CHECK (ret == 0 );
169+ TEST_CHECK (records == 1 );
170+ TEST_CHECK (bytes == 111 );
171+
172+ retry = flb_task_retry_create (task , & out_a );
173+ TEST_CHECK (retry != NULL );
174+ if (retry != NULL ) {
175+ TEST_CHECK (retry -> attempts == 1 );
176+ }
177+
178+ retry = flb_task_retry_create (task , & out_a );
179+ TEST_CHECK (retry != NULL );
180+ if (retry != NULL ) {
181+ TEST_CHECK (retry -> attempts == 2 );
182+ }
183+
184+ ret = flb_task_get_route_data (task , & out_a , & records , & bytes );
185+ TEST_CHECK (ret == 0 );
186+ TEST_CHECK (records == 1 );
187+ TEST_CHECK (bytes == 111 );
188+
189+ flb_task_retry_clean (task , & out_a );
190+ flb_task_destroy (task , FLB_TRUE );
191+ test_ctx_destroy (ctx );
192+ }
193+
91194TEST_LIST = {
92195 { "task_map_limit" , test_task_map_limit },
196+ { "task_route_data_preserved_across_retry" , test_task_route_data_preserved_across_retry },
93197 { 0 }
94198};
0 commit comments