Skip to content

Commit bff889d

Browse files
committed
Merge branch 'Nitro-fixing-dockerinput-plugin' into dev
2 parents ada17c4 + 68b8397 commit bff889d

File tree

5 files changed

+237
-113
lines changed

5 files changed

+237
-113
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ Bug Handling
3636
Features
3737
--------
3838

39+
* Added `fields_from_labels`, `container_expiry_days`, and
40+
`new_containers_replay_logs` options to DockerLogInput.
41+
3942
* Added `bind_queue` option to AMQPInput.
4043

4144
* Added time interval configurability to Unique Items filter.

docs/source/config/inputs/docker_log.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ Config:
5454

5555
.. versionadded:: 0.11
5656

57+
- fields_from_labels (array[string], optional):
58+
A list of labels to pull is as fields. These are pulled in last and will
59+
override any fields added from fields_from_env.
5760
- since_path (string, optional):
5861
Path to file where input will write a record of the "since" time for each
5962
container to be able to not miss log records while Heka is down (see
@@ -69,6 +72,18 @@ Config:
6972
to zero (e.g. "0s") then the file will only be written out when Heka
7073
cleanly shuts down, meaning that if Heka crashes all container logs written
7174
since Heka has started will be re-fetched.
75+
- container_expiry_days (int, optional):
76+
The number of days after which to remove unseen containers from the sinces
77+
file. Defaults to 30 days. This prevents containers from building up
78+
in the file forever. It has the effect of replaying logs from any container
79+
which was not seen for this interval but then re-appears. Containers are
80+
tracked by container ID.
81+
- new_containers_replay_logs (bool, optional):
82+
Will newly discovered containers replay all of the logs currently available
83+
via the Docker logs endpoint? Defaults to true. If you are upgrading from
84+
a previous version of heka, you may want to consider setting this to false
85+
when first upgrading to prevent the massive replay of logs from all of
86+
your existing containers.
7287

7388
Example:
7489

plugins/docker/attacher.go

Lines changed: 47 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,9 @@ package docker
2626
// SOFTWARE.
2727

2828
import (
29-
"encoding/json"
3029
"errors"
3130
"fmt"
3231
"io"
33-
"os"
34-
"sync"
3532
"time"
3633

3734
"github.com/fsouza/go-dockerclient"
@@ -40,60 +37,47 @@ import (
4037
"github.com/pborman/uuid"
4138
)
4239

43-
type sinces struct {
44-
Since int64
45-
Containers map[string]int64
46-
}
4740

4841
type AttachManager struct {
49-
hostname string
50-
client DockerClient
51-
events chan *docker.APIEvents
52-
ir InputRunner
53-
endpoint string
54-
certPath string
55-
nameFromEnv string
56-
fieldsFromEnv []string
57-
fieldsFromLabels []string
58-
sincePath string
59-
sinces *sinces
60-
sinceLock sync.Mutex
61-
sinceInterval time.Duration
42+
hostname string
43+
client DockerClient
44+
events chan *docker.APIEvents
45+
ir InputRunner
46+
endpoint string
47+
certPath string
48+
nameFromEnv string
49+
fieldsFromEnv []string
50+
fieldsFromLabels []string
51+
sinces *SinceTracker
52+
newContainersReplayLogs bool
6253
}
6354

6455
// Construct an AttachManager and set up the Docker Client
6556
func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
6657
fieldsFromEnv []string, fieldsFromLabels []string,
67-
sincePath string, sinceInterval time.Duration) (*AttachManager, error) {
58+
sincePath string, sinceInterval time.Duration, containerExpiryDays int,
59+
newContainersReplayLogs bool) (*AttachManager, error) {
6860

6961
client, err := newDockerClient(certPath, endpoint)
7062
if err != nil {
7163
return nil, err
7264
}
7365

74-
m := &AttachManager{
75-
client: client,
76-
events: make(chan *docker.APIEvents),
77-
nameFromEnv: nameFromEnv,
78-
fieldsFromEnv: fieldsFromEnv,
79-
fieldsFromLabels: fieldsFromLabels,
80-
sincePath: sincePath,
81-
sinces: &sinces{},
82-
sinceInterval: sinceInterval,
83-
}
84-
85-
// Initialize the sinces from the JSON since file.
86-
sinceFile, err := os.Open(sincePath)
66+
sinceTracker, err := NewSinceTracker(containerExpiryDays, sincePath, sinceInterval)
8767
if err != nil {
88-
return nil, fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
68+
return nil, err
8969
}
90-
jsonDecoder := json.NewDecoder(sinceFile)
91-
m.sinceLock.Lock()
92-
err = jsonDecoder.Decode(m.sinces)
93-
m.sinceLock.Unlock()
94-
if err != nil {
95-
return nil, fmt.Errorf("Can't decode \"since\" file '%s': %s", sincePath, err.Error())
70+
71+
m := &AttachManager{
72+
client: client,
73+
events: make(chan *docker.APIEvents),
74+
nameFromEnv: nameFromEnv,
75+
fieldsFromEnv: fieldsFromEnv,
76+
fieldsFromLabels: fieldsFromLabels,
77+
sinces: sinceTracker,
78+
newContainersReplayLogs: newContainersReplayLogs,
9679
}
80+
9781
return m, nil
9882
}
9983

@@ -123,6 +107,7 @@ func (m *AttachManager) attachAll() error {
123107
// Main body of work
124108
func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error) error {
125109
m.ir = ir
110+
m.sinces.ir = ir
126111
m.hostname = hostname
127112

128113
// Retry this sleeping in between tries During this time
@@ -132,49 +117,31 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
132117
if err != nil {
133118
m.ir.LogError(err)
134119
return errors.New(
135-
"Failed to attach to Docker containers after retrying. Plugin giving up.")
120+
"Failed to attach to Docker containers after retrying. Plugin giving up.",
121+
)
136122
}
137123

138124
err = withRetries(func() error { return m.client.AddEventListener(m.events) })
139125
if err != nil {
140126
m.ir.LogError(err)
141127
return errors.New(
142-
"Failed to add Docker event listener after retrying. Plugin giving up.")
128+
"Failed to add Docker event listener after retrying. Plugin giving up.",
129+
)
143130
}
144131

145-
if m.sinceInterval > 0 {
132+
if m.sinces.Interval > 0 {
146133
go m.sinceWriteLoop(stopChan)
147134
}
148135
m.handleDockerEvents(stopChan) // Blocks until stopChan is closed.
149136
// Write to since file on the way out.
150-
m.writeSinceFile(time.Now())
137+
m.sinces.Write(time.Now())
151138
return nil
152139
}
153140

154-
func (m *AttachManager) writeSinceFile(t time.Time) {
155-
sinceFile, err := os.Create(m.sincePath)
156-
if err != nil {
157-
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sincePath,
158-
err.Error()))
159-
return
160-
}
161-
jsonEncoder := json.NewEncoder(sinceFile)
162-
m.sinceLock.Lock()
163-
m.sinces.Since = t.Unix()
164-
if err = jsonEncoder.Encode(m.sinces); err != nil {
165-
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sincePath,
166-
err.Error()))
167-
}
168-
m.sinceLock.Unlock()
169-
if err = sinceFile.Close(); err != nil {
170-
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sincePath,
171-
err.Error()))
172-
}
173-
}
174141

175142
// Periodically writes out a new since file, until stopped.
176143
func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
177-
ticker := time.Tick(m.sinceInterval)
144+
ticker := time.Tick(m.sinces.Interval)
178145
ok := true
179146
var now time.Time
180147
for ok {
@@ -183,7 +150,7 @@ func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
183150
if !ok {
184151
break
185152
}
186-
m.writeSinceFile(now)
153+
m.sinces.Write(now)
187154
case <-stopChan:
188155
ok = false
189156
break
@@ -255,7 +222,7 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
255222

256223
// Spin up one of these for each container we're watching.
257224
go func() {
258-
m.sinceLock.Lock()
225+
m.sinces.Lock()
259226
since, ok := m.sinces.Containers[id]
260227
if ok {
261228
// We've seen this container before, need to use a since value.
@@ -269,8 +236,14 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
269236
} else {
270237
// We haven't seen it, add it to our sinces.
271238
m.sinces.Containers[id] = 0
239+
240+
// And set the since appropriately from our settings.
241+
if !m.newContainersReplayLogs {
242+
// Use the last global since time when we connect to it
243+
since = m.sinces.Since
244+
}
272245
}
273-
m.sinceLock.Unlock()
246+
m.sinces.Unlock()
274247

275248
// This will block until the container exits.
276249
err := client.Logs(docker.LogsOptions{
@@ -282,17 +255,17 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
282255
Stderr: true,
283256
Since: since,
284257
Timestamps: false,
285-
Tail: "all",
258+
Tail: "all", // This is scoped by "Since" above
286259
RawTerminal: false,
287260
})
288261

289-
// Once it has exited, close our pipes, remove from the sinces, and (if
290-
// necessary) log the error.
262+
// Once it has exited, close our pipes, set the since time to now, and
263+
// (if necessary) log the error.
291264
outwr.Close()
292265
errwr.Close()
293-
m.sinceLock.Lock()
266+
m.sinces.Lock()
294267
m.sinces.Containers[id] = time.Now().Unix()
295-
m.sinceLock.Unlock()
268+
m.sinces.Unlock()
296269
if err != nil {
297270
err = fmt.Errorf("streaming container %s logs: %s", id, err.Error())
298271
m.ir.LogError(err)

plugins/docker/docker_log_input.go

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package docker
1818

1919
import (
20-
"encoding/json"
2120
"fmt"
22-
"os"
2321
"path/filepath"
2422
"time"
2523

@@ -28,13 +26,15 @@ import (
2826

2927
type DockerLogInputConfig struct {
3028
// A Docker endpoint.
31-
Endpoint string `toml:"endpoint"`
32-
CertPath string `toml:"cert_path"`
33-
SincePath string `toml:"since_path"`
34-
SinceInterval string `toml:"since_interval"`
35-
NameFromEnv string `toml:"name_from_env_var"`
36-
FieldsFromEnv []string `toml:"fields_from_env"`
37-
FieldsFromLabels []string `toml:"fields_from_labels"`
29+
Endpoint string `toml:"endpoint"`
30+
CertPath string `toml:"cert_path"`
31+
SincePath string `toml:"since_path"`
32+
SinceInterval string `toml:"since_interval"`
33+
NameFromEnv string `toml:"name_from_env_var"`
34+
FieldsFromEnv []string `toml:"fields_from_env"`
35+
FieldsFromLabels []string `toml:"fields_from_labels"`
36+
ContainerExpiryDays int `toml:"container_expiry_days"`
37+
NewContainersReplayLogs bool `toml:"new_containers_replay_logs"`
3838
}
3939

4040
type DockerLogInput struct {
@@ -50,16 +50,19 @@ func (di *DockerLogInput) SetPipelineConfig(pConfig *pipeline.PipelineConfig) {
5050

5151
func (di *DockerLogInput) ConfigStruct() interface{} {
5252
return &DockerLogInputConfig{
53-
Endpoint: "unix:///var/run/docker.sock",
54-
CertPath: "",
55-
SincePath: filepath.Join("docker", "logs_since.txt"),
56-
SinceInterval: "5s",
53+
Endpoint: "unix:///var/run/docker.sock",
54+
CertPath: "",
55+
SincePath: filepath.Join("docker", "logs_since.txt"),
56+
SinceInterval: "5s",
57+
ContainerExpiryDays: 30,
58+
NewContainersReplayLogs: true,
5759
}
5860
}
5961

6062
func (di *DockerLogInput) Init(config interface{}) error {
6163
conf := config.(*DockerLogInputConfig)
6264
globals := di.pConfig.Globals
65+
sincePath := globals.PrependBaseDir(conf.SincePath)
6366

6467
// Make sure since interval is valid.
6568
sinceInterval, err := time.ParseDuration(conf.SinceInterval)
@@ -68,32 +71,10 @@ func (di *DockerLogInput) Init(config interface{}) error {
6871
err.Error())
6972
}
7073

71-
// Make sure the since file exists.
72-
sincePath := globals.PrependBaseDir(conf.SincePath)
73-
_, err = os.Stat(sincePath)
74-
if os.IsNotExist(err) {
75-
sinceDir := filepath.Dir(sincePath)
76-
if err = os.MkdirAll(sinceDir, 0700); err != nil {
77-
return fmt.Errorf("Can't create storage directory '%s': %s", sinceDir,
78-
err.Error())
79-
}
80-
81-
sinceFile, err := os.Create(sincePath)
82-
if err != nil {
83-
return fmt.Errorf("Can't create \"since\" file '%s': %s", sincePath,
84-
err.Error())
85-
}
86-
jsonEncoder := json.NewEncoder(sinceFile)
87-
if err = jsonEncoder.Encode(&sinces{Containers: make(map[string]int64)}); err != nil {
88-
return fmt.Errorf("Can't write to \"since\" file '%s': %s", sincePath,
89-
err.Error())
90-
}
91-
if err = sinceFile.Close(); err != nil {
92-
return fmt.Errorf("Can't close \"since\" file '%s': %s", sincePath,
93-
err.Error())
94-
}
95-
} else if err != nil {
96-
return fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
74+
// Make sure we have a sinces File.
75+
err = EnsureSincesFile(conf, sincePath)
76+
if err != nil {
77+
return err
9778
}
9879

9980
di.stopChan = make(chan error)
@@ -107,6 +88,8 @@ func (di *DockerLogInput) Init(config interface{}) error {
10788
conf.FieldsFromLabels,
10889
sincePath,
10990
sinceInterval,
91+
conf.ContainerExpiryDays,
92+
conf.NewContainersReplayLogs,
11093
)
11194
if err != nil {
11295
return fmt.Errorf("DockerLogInput: failed to attach: %s", err.Error())

0 commit comments

Comments
 (0)