@@ -2,6 +2,7 @@ package collector
22
33import (
44 "database/sql"
5+ "errors"
56 "fmt"
67 "reflect"
78 "strings"
@@ -19,6 +20,8 @@ import (
1920 "github.com/grafana/alloy/internal/util/syncbuffer"
2021)
2122
23+ var errMockQuerySamplesFailed = errors .New ("test-error" )
24+
2225func TestQuerySamples_FetchQuerySamples (t * testing.T ) {
2326 defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1" ))
2427
@@ -41,7 +44,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
4144 name string
4245 setupMock func (mock sqlmock.Sqlmock )
4346 disableQueryRedaction bool
44- expectedErrorLine string
4547 expectedLabels []model.LabelSet
4648 expectedLines []string
4749 }{
@@ -60,7 +62,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
6062 mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
6163 WillReturnRows (sqlmock .NewRows (columns ))
6264 },
63-
6465 expectedLabels : []model.LabelSet {
6566 {"op" : OP_QUERY_SAMPLE },
6667 },
@@ -83,12 +84,11 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
8384 mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
8485 WillReturnRows (sqlmock .NewRows (columns ))
8586 },
86-
8787 expectedLabels : []model.LabelSet {
8888 {"op" : OP_QUERY_SAMPLE },
8989 },
9090 expectedLines : []string {
91- `level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="0s" query_time="0s" queryid="123" cpu_time="0s"` , // time.Duration(0).String(),
91+ `level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="0s" query_time="0s" queryid="123" cpu_time="0s"` ,
9292 },
9393 },
9494 {
@@ -106,7 +106,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
106106 mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
107107 WillReturnRows (sqlmock .NewRows (columns ))
108108 },
109-
110109 expectedLabels : []model.LabelSet {
111110 {"op" : OP_QUERY_SAMPLE },
112111 {"op" : OP_WAIT_EVENT },
@@ -116,46 +115,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
116115 `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"` ,
117116 },
118117 },
119- {
120- name : "insufficient privilege query - no loki entries expected" ,
121- setupMock : func (mock sqlmock.Sqlmock ) {
122- mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).RowsWillBeClosed ().
123- WillReturnRows (sqlmock .NewRows (append (columns , "query" )).AddRow (
124- now , "testdb" , 103 , sql.NullInt64 {},
125- "testuser" , "testapp" , "127.0.0.1" , 5432 ,
126- "client backend" , now , sql.NullInt32 {}, sql.NullInt32 {},
127- now , "active" , now , sql.NullString {},
128- sql.NullString {}, nil , now , sql.NullInt64 {Int64 : 125 , Valid : true },
129- "<insufficient privilege>" ,
130- ))
131- // Second scrape: empty to complete cycle
132- mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).RowsWillBeClosed ().
133- WillReturnRows (sqlmock .NewRows (append (columns , "query" )))
134- },
135- disableQueryRedaction : true ,
136- expectedErrorLine : `err="insufficient privilege to access query` ,
137- expectedLabels : []model.LabelSet {}, // No Loki entries expected
138- expectedLines : []string {}, // No Loki entries expected
139- },
140- {
141- name : "null database name - no loki entries expected" ,
142- setupMock : func (mock sqlmock.Sqlmock ) {
143- mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
144- WillReturnRows (sqlmock .NewRows (columns ).AddRow (
145- now , sql.NullString {Valid : false }, 104 , sql.NullInt64 {},
146- "testuser" , "testapp" , "127.0.0.1" , 5432 ,
147- "client backend" , now , sql.NullInt32 {}, sql.NullInt32 {},
148- now , "active" , now , sql.NullString {},
149- sql.NullString {}, nil , now , sql.NullInt64 {Int64 : 126 , Valid : true },
150- ))
151- // Second scrape: empty to complete cycle
152- mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
153- WillReturnRows (sqlmock .NewRows (columns ))
154- },
155- expectedErrorLine : `err="database name is not valid` ,
156- expectedLabels : []model.LabelSet {}, // No Loki entries expected
157- expectedLines : []string {}, // No Loki entries expected
158- },
159118 {
160119 name : "query with redaction disabled" ,
161120 setupMock : func (mock sqlmock.Sqlmock ) {
@@ -209,13 +168,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
209168
210169 require .NoError (t , sampleCollector .Start (t .Context ()))
211170
212- // For error cases, wait for error message in logs
213- if tc .expectedErrorLine != "" {
214- require .Eventually (t , func () bool {
215- return strings .Contains (logBuffer .String (), tc .expectedErrorLine )
216- }, 5 * time .Second , 100 * time .Millisecond )
217- }
218-
219171 require .Eventually (t , func () bool {
220172 return len (lokiClient .Received ()) == len (tc .expectedLines )
221173 }, 5 * time .Second , 100 * time .Millisecond )
@@ -243,6 +195,123 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
243195 }
244196}
245197
198+ // TestQuerySamples_FetchQuerySamples_ErrorCases tests scenarios where errors occur
199+ // and no Loki entries are produced.
200+ func TestQuerySamples_FetchQuerySamples_ErrorCases (t * testing.T ) {
201+ defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1" ))
202+
203+ now := time .Now ()
204+
205+ columns := []string {
206+ "now" , "datname" , "pid" , "leader_pid" ,
207+ "usename" , "application_name" , "client_addr" , "client_port" ,
208+ "backend_type" , "backend_start" , "backend_xid" , "backend_xmin" ,
209+ "xact_start" , "state" , "state_change" , "wait_event_type" ,
210+ "wait_event" , "blocked_by_pids" , "query_start" , "query_id" ,
211+ }
212+
213+ testCases := []struct {
214+ name string
215+ setupMock func (mock sqlmock.Sqlmock )
216+ disableQueryRedaction bool
217+ expectedErrorLine string
218+ }{
219+ {
220+ name : "insufficient privilege query" ,
221+ setupMock : func (mock sqlmock.Sqlmock ) {
222+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).RowsWillBeClosed ().
223+ WillReturnRows (sqlmock .NewRows (append (columns , "query" )).AddRow (
224+ now , "testdb" , 103 , sql.NullInt64 {},
225+ "testuser" , "testapp" , "127.0.0.1" , 5432 ,
226+ "client backend" , now , sql.NullInt32 {}, sql.NullInt32 {},
227+ now , "active" , now , sql.NullString {},
228+ sql.NullString {}, nil , now , sql.NullInt64 {Int64 : 125 , Valid : true },
229+ "<insufficient privilege>" ,
230+ ))
231+ // Second scrape: empty to complete cycle
232+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).RowsWillBeClosed ().
233+ WillReturnRows (sqlmock .NewRows (append (columns , "query" )))
234+ // Return error to trigger finalization
235+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).
236+ WillReturnError (errMockQuerySamplesFailed )
237+ },
238+ disableQueryRedaction : true ,
239+ expectedErrorLine : `err="insufficient privilege to access query` ,
240+ },
241+ {
242+ name : "null database name" ,
243+ setupMock : func (mock sqlmock.Sqlmock ) {
244+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
245+ WillReturnRows (sqlmock .NewRows (columns ).AddRow (
246+ now , sql.NullString {Valid : false }, 104 , sql.NullInt64 {},
247+ "testuser" , "testapp" , "127.0.0.1" , 5432 ,
248+ "client backend" , now , sql.NullInt32 {}, sql.NullInt32 {},
249+ now , "active" , now , sql.NullString {},
250+ sql.NullString {}, nil , now , sql.NullInt64 {Int64 : 126 , Valid : true },
251+ ))
252+ // Second scrape: empty to complete cycle
253+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).RowsWillBeClosed ().
254+ WillReturnRows (sqlmock .NewRows (columns ))
255+ // Return error to trigger finalization
256+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , "" , excludeCurrentUserClause )).
257+ WillReturnError (errMockQuerySamplesFailed )
258+ },
259+ expectedErrorLine : `err="database name is not valid` ,
260+ },
261+ }
262+
263+ for _ , tc := range testCases {
264+ t .Run (tc .name , func (t * testing.T ) {
265+ t .Parallel ()
266+
267+ db , mock , err := sqlmock .New (sqlmock .QueryMatcherOption (sqlmock .QueryMatcherEqual ))
268+ require .NoError (t , err )
269+ defer db .Close ()
270+
271+ logBuffer := syncbuffer.Buffer {}
272+ lokiClient := loki .NewCollectingHandler ()
273+ defer lokiClient .Stop ()
274+
275+ sampleCollector , err := NewQuerySamples (QuerySamplesArguments {
276+ DB : db ,
277+ CollectInterval : time .Millisecond ,
278+ EntryHandler : lokiClient ,
279+ Logger : log .NewLogfmtLogger (log .NewSyncWriter (& logBuffer )),
280+ DisableQueryRedaction : tc .disableQueryRedaction ,
281+ ExcludeCurrentUser : true ,
282+ })
283+ require .NoError (t , err )
284+ require .NotNil (t , sampleCollector )
285+
286+ tc .setupMock (mock )
287+
288+ require .NoError (t , sampleCollector .Start (t .Context ()))
289+
290+ // Wait for the expected error to be logged
291+ require .Eventually (t , func () bool {
292+ return strings .Contains (logBuffer .String (), tc .expectedErrorLine )
293+ }, 5 * time .Second , 100 * time .Millisecond )
294+
295+ // Wait for the error to be returned, indicating all expected queries completed
296+ require .Eventually (t , func () bool {
297+ return strings .Contains (logBuffer .String (), errMockQuerySamplesFailed .Error ())
298+ }, 5 * time .Second , 100 * time .Millisecond )
299+
300+ // Verify no Loki entries were produced
301+ require .Empty (t , lokiClient .Received ())
302+
303+ sampleCollector .Stop ()
304+ require .Eventually (t , func () bool {
305+ return sampleCollector .Stopped ()
306+ }, 5 * time .Second , 100 * time .Millisecond )
307+
308+ require .Eventually (t , func () bool {
309+ return mock .ExpectationsWereMet () == nil
310+ }, 5 * time .Second , 100 * time .Millisecond )
311+ })
312+ }
313+ }
314+
246315func TestQuerySamples_FinalizationScenarios (t * testing.T ) {
247316 defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1" ))
248317
@@ -718,9 +787,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) {
718787 sql.NullString {}, nil , queryStartTime , sql.NullInt64 {Int64 : 20003 , Valid : true },
719788 "SELECT * FROM users" ,
720789 ))
790+ // Return error to trigger finalization
791+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).
792+ WillReturnError (errMockQuerySamplesFailed )
721793
722794 require .NoError (t , sampleCollector .Start (t .Context ()))
723795
796+ // Wait for all mock queries to complete
797+ require .Eventually (t , func () bool {
798+ return strings .Contains (logBuffer .String (), errMockQuerySamplesFailed .Error ())
799+ }, 5 * time .Second , 100 * time .Millisecond )
800+
724801 require .Eventually (t , func () bool {
725802 return len (lokiClient .Received ()) == 1
726803 }, 5 * time .Second , 100 * time .Millisecond )
@@ -783,9 +860,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) {
783860 sql.NullString {}, nil , queryStartTime , sql.NullInt64 {Int64 : 21002 , Valid : true },
784861 "SELECT 1" ,
785862 ))
863+ // Return error to trigger finalization
864+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).
865+ WillReturnError (errMockQuerySamplesFailed )
786866
787867 require .NoError (t , sampleCollector .Start (t .Context ()))
788868
869+ // Wait for all mock queries to complete
870+ require .Eventually (t , func () bool {
871+ return strings .Contains (logBuffer .String (), errMockQuerySamplesFailed .Error ())
872+ }, 5 * time .Second , 100 * time .Millisecond )
873+
789874 require .Eventually (t , func () bool {
790875 return len (lokiClient .Received ()) == 1
791876 }, 5 * time .Second , 100 * time .Millisecond )
@@ -866,9 +951,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) {
866951 sql.NullString {}, nil , queryStartTime , sql.NullInt64 {Int64 : 23002 , Valid : true },
867952 "SELECT * FROM b" ,
868953 ))
954+ // Return error to trigger finalization
955+ mock .ExpectQuery (fmt .Sprintf (selectPgStatActivity , queryTextClause , excludeCurrentUserClause )).
956+ WillReturnError (errMockQuerySamplesFailed )
869957
870958 require .NoError (t , sampleCollector .Start (t .Context ()))
871959
960+ // Wait for all mock queries to complete
961+ require .Eventually (t , func () bool {
962+ return strings .Contains (logBuffer .String (), errMockQuerySamplesFailed .Error ())
963+ }, 5 * time .Second , 100 * time .Millisecond )
964+
872965 require .Eventually (t , func () bool {
873966 return len (lokiClient .Received ()) == 2
874967 }, 5 * time .Second , 100 * time .Millisecond )
0 commit comments