@@ -25,6 +25,16 @@ import (
2525 "github.com/grafana/alloy/internal/util"
2626)
2727
28+ type fakeRec struct {
29+ c chan loki.Entry
30+ }
31+
32+ func (f * fakeRec ) Chan () chan loki.Entry {
33+ return f .c
34+ }
35+
36+ var _ loki.LogsReceiver = (* fakeRec )(nil )
37+
2838func Test (t * testing.T ) {
2939 defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("go.opencensus.io/stats/view.(*worker).start" ))
3040
@@ -78,6 +88,76 @@ func Test(t *testing.T) {
7888 }
7989}
8090
91+ func TestUpdateRemoveFileWhileReading (t * testing.T ) {
92+ defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("go.opencensus.io/stats/view.(*worker).start" ))
93+
94+ ctx , cancel := context .WithCancel (componenttest .TestContext (t ))
95+ defer cancel ()
96+
97+ // Create file to log to.
98+ f , err := os .CreateTemp (t .TempDir (), "example" )
99+ require .NoError (t , err )
100+ defer f .Close ()
101+
102+ ctrl , err := componenttest .NewControllerFromID (util .TestLogger (t ), "loki.source.file" )
103+ require .NoError (t , err )
104+
105+ ch1 := loki .NewLogsReceiver ()
106+
107+ go func () {
108+ err := ctrl .Run (ctx , Arguments {
109+ Targets : []discovery.Target {discovery .NewTargetFromMap (map [string ]string {
110+ "__path__" : f .Name (),
111+ "foo" : "bar" ,
112+ })},
113+ ForwardTo : []loki.LogsReceiver {ch1 },
114+ })
115+ require .NoError (t , err )
116+ }()
117+
118+ ctrl .WaitRunning (time .Minute )
119+
120+ // Start a goroutine that reads from the channel until cancellation
121+ go func () {
122+ for {
123+ select {
124+ case <- ctx .Done ():
125+ return
126+ case <- ch1 .Chan ():
127+ // Just consume the messages
128+ }
129+ }
130+ }()
131+
132+ go func () {
133+ for {
134+ select {
135+ case <- ctx .Done ():
136+ return
137+ default :
138+ _ , err = f .Write ([]byte ("writing some text\n writing some text2\n " ))
139+ require .NoError (t , err )
140+ }
141+ }
142+ }()
143+
144+ time .Sleep (100 * time .Millisecond )
145+
146+ err = ctrl .Update (Arguments {
147+ Targets : []discovery.Target {},
148+ ForwardTo : []loki.LogsReceiver {ch1 },
149+ })
150+ require .NoError (t , err )
151+
152+ time .Sleep (100 * time .Millisecond )
153+
154+ err = ctrl .Update (Arguments {
155+ Targets : []discovery.Target {},
156+ ForwardTo : []loki.LogsReceiver {ch1 },
157+ })
158+ require .NoError (t , err )
159+ }
160+
81161func TestFileWatch (t * testing.T ) {
82162 defer goleak .VerifyNone (t , goleak .IgnoreTopFunction ("go.opencensus.io/stats/view.(*worker).start" ))
83163 ctx , cancel := context .WithCancel (componenttest .TestContext (t ))
0 commit comments