Skip to content

Commit f79ec5b

Browse files
committed
Cleanup cio.FIFOSet interface
Remove duplication with cio.Config unexport newFIFOSetTempDir() since it includes hardcoded paths Expose os.RemoveAll() as part of FIFOSet instead of a Dir Signed-off-by: Daniel Nephin <dnephin@gmail.com>
1 parent 6393165 commit f79ec5b

File tree

4 files changed

+89
-105
lines changed

4 files changed

+89
-105
lines changed

cio/io.go

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -75,68 +75,70 @@ type Creation func(id string) (IO, error)
7575
// will be sent only to the first reads
7676
type Attach func(*FIFOSet) (IO, error)
7777

78+
// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
79+
type FIFOSet struct {
80+
Config
81+
close func() error
82+
}
83+
84+
func (f *FIFOSet) Close() error {
85+
if f.close != nil {
86+
return f.close()
87+
}
88+
return nil
89+
}
90+
91+
// NewFIFOSet returns a new FIFOSet from a Config and a close function
92+
func NewFIFOSet(config Config, close func() error) *FIFOSet {
93+
return &FIFOSet{Config: config, close: close}
94+
}
95+
7896
// NewIO returns an Creation that will provide IO sets without a terminal
7997
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
8098
return NewIOWithTerminal(stdin, stdout, stderr, false)
8199
}
82100

83-
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
101+
// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal
84102
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
85103
return func(id string) (_ IO, err error) {
86-
paths, err := NewFifos(id)
104+
fifos, err := newFIFOSetTempDir(id)
87105
if err != nil {
88106
return nil, err
89107
}
90108
defer func() {
91-
if err != nil && paths.Dir != "" {
92-
os.RemoveAll(paths.Dir)
109+
if err != nil {
110+
fifos.Close()
93111
}
94112
}()
95-
cfg := Config{
96-
Terminal: terminal,
97-
Stdout: paths.Out,
98-
Stderr: paths.Err,
99-
Stdin: paths.In,
100-
}
113+
cfg := fifos.Config
114+
cfg.Terminal = terminal
101115
i := &cio{config: cfg}
102116
set := &ioSet{
103117
in: stdin,
104118
out: stdout,
105119
err: stderr,
106120
}
107-
closer, err := copyIO(paths, set, cfg.Terminal)
108-
if err != nil {
109-
return nil, err
110-
}
121+
closer, err := copyIO(fifos, set, cfg.Terminal)
111122
i.closer = closer
112-
return i, nil
123+
return i, err
113124
}
114125
}
115126

116127
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
117128
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
118-
return func(paths *FIFOSet) (IO, error) {
119-
if paths == nil {
129+
return func(fifos *FIFOSet) (IO, error) {
130+
if fifos == nil {
120131
return nil, fmt.Errorf("cannot attach to existing fifos")
121132
}
122-
cfg := Config{
123-
Terminal: paths.Terminal,
124-
Stdout: paths.Out,
125-
Stderr: paths.Err,
126-
Stdin: paths.In,
127-
}
128-
i := &cio{config: cfg}
133+
i := &cio{config: fifos.Config}
129134
set := &ioSet{
130135
in: stdin,
131136
out: stdout,
132137
err: stderr,
133138
}
134-
closer, err := copyIO(paths, set, cfg.Terminal)
135-
if err != nil {
136-
return nil, err
137-
}
139+
closer, err := copyIO(fifos, set, fifos.Terminal)
138140
i.closer = closer
139-
return i, nil
141+
return i, err
140142
}
141143
}
142144

@@ -156,24 +158,13 @@ func NullIO(id string) (IO, error) {
156158
return &cio{}, nil
157159
}
158160

159-
// FIFOSet is a set of fifos for use with tasks
160-
type FIFOSet struct {
161-
// Dir is the directory holding the task fifos
162-
Dir string
163-
// In, Out, and Err fifo paths
164-
In, Out, Err string
165-
// Terminal returns true if a terminal is being used for the task
166-
Terminal bool
167-
}
168-
169161
type ioSet struct {
170162
in io.Reader
171163
out, err io.Writer
172164
}
173165

174166
type wgCloser struct {
175167
wg *sync.WaitGroup
176-
dir string
177168
set []io.Closer
178169
cancel context.CancelFunc
179170
}
@@ -183,12 +174,10 @@ func (g *wgCloser) Wait() {
183174
}
184175

185176
func (g *wgCloser) Close() error {
177+
// TODO: this should return all errors, not mask them
186178
for _, f := range g.set {
187179
f.Close()
188180
}
189-
if g.dir != "" {
190-
return os.RemoveAll(g.dir)
191-
}
192181
return nil
193182
}
194183

cio/io_unix.go

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/containerd/fifo"
1515
)
1616

17-
// NewFifos returns a new set of fifos for the task
18-
func NewFifos(id string) (*FIFOSet, error) {
17+
// newFIFOSetTempDir returns a new set of fifos for the task
18+
func newFIFOSetTempDir(id string) (*FIFOSet, error) {
1919
root := "/run/containerd/fifo"
2020
if err := os.MkdirAll(root, 0700); err != nil {
2121
return nil, err
@@ -24,21 +24,23 @@ func NewFifos(id string) (*FIFOSet, error) {
2424
if err != nil {
2525
return nil, err
2626
}
27-
return &FIFOSet{
28-
Dir: dir,
29-
In: filepath.Join(dir, id+"-stdin"),
30-
Out: filepath.Join(dir, id+"-stdout"),
31-
Err: filepath.Join(dir, id+"-stderr"),
32-
}, nil
27+
closer := func() error {
28+
return os.RemoveAll(dir)
29+
}
30+
return NewFIFOSet(Config{
31+
Stdin: filepath.Join(dir, id+"-stdin"),
32+
Stdout: filepath.Join(dir, id+"-stdout"),
33+
Stderr: filepath.Join(dir, id+"-stderr"),
34+
}, closer), nil
3335
}
3436

3537
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
3638
var (
3739
f io.ReadWriteCloser
38-
set []io.Closer
3940
ctx, cancel = context.WithCancel(context.Background())
4041
wg = &sync.WaitGroup{}
4142
)
43+
set := []io.Closer{fifos}
4244
defer func() {
4345
if err != nil {
4446
for _, f := range set {
@@ -48,7 +50,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
4850
}
4951
}()
5052

51-
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
53+
if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
5254
return nil, err
5355
}
5456
set = append(set, f)
@@ -57,7 +59,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
5759
w.Close()
5860
}(f)
5961

60-
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
62+
if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
6163
return nil, err
6264
}
6365
set = append(set, f)
@@ -68,7 +70,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
6870
wg.Done()
6971
}(f)
7072

71-
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
73+
if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
7274
return nil, err
7375
}
7476
set = append(set, f)
@@ -83,35 +85,33 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
8385
}
8486
return &wgCloser{
8587
wg: wg,
86-
dir: fifos.Dir,
8788
set: set,
8889
cancel: cancel,
8990
}, nil
9091
}
9192

9293
// NewDirectIO returns an IO implementation that exposes the pipes directly
9394
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
94-
set, err := NewFifos("")
95+
set, err := newFIFOSetTempDir("")
9596
if err != nil {
9697
return nil, err
9798
}
98-
f := &DirectIO{
99-
set: set,
100-
terminal: terminal,
101-
}
99+
set.Terminal = terminal
100+
f := &DirectIO{set: set}
101+
102102
defer func() {
103103
if err != nil {
104104
f.Delete()
105105
}
106106
}()
107-
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
107+
if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
108108
return nil, err
109109
}
110-
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
110+
if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
111111
f.Stdin.Close()
112112
return nil, err
113113
}
114-
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
114+
if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
115115
f.Stdin.Close()
116116
f.Stdout.Close()
117117
return nil, err
@@ -125,8 +125,7 @@ type DirectIO struct {
125125
Stdout io.ReadCloser
126126
Stderr io.ReadCloser
127127

128-
set *FIFOSet
129-
terminal bool
128+
set *FIFOSet
130129
}
131130

132131
// IOCreate returns IO avaliable for use with task creation
@@ -141,12 +140,7 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
141140

142141
// Config returns the Config
143142
func (f *DirectIO) Config() Config {
144-
return Config{
145-
Terminal: f.terminal,
146-
Stdin: f.set.In,
147-
Stdout: f.set.Out,
148-
Stderr: f.set.Err,
149-
}
143+
return f.set.Config
150144
}
151145

152146
// Cancel stops any IO copy operations
@@ -177,8 +171,5 @@ func (f *DirectIO) Close() error {
177171

178172
// Delete removes the underlying directory containing fifos
179173
func (f *DirectIO) Delete() error {
180-
if f.set.Dir == "" {
181-
return nil
182-
}
183-
return os.RemoveAll(f.set.Dir)
174+
return f.set.Close()
184175
}

cio/io_windows.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ import (
1313

1414
const pipeRoot = `\\.\pipe`
1515

16-
// NewFifos returns a new set of fifos for the task
17-
func NewFifos(id string) (*FIFOSet, error) {
16+
// newFIFOSetTempDir returns a new set of fifos for the task
17+
func newFIFOSetTempDir(id string) (*FIFOSet, error) {
1818
return &FIFOSet{
19-
In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
20-
Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
21-
Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
19+
StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
20+
StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
21+
StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
2222
}, nil
2323
}
2424

@@ -28,10 +28,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
2828
set []io.Closer
2929
)
3030

31-
if fifos.In != "" {
32-
l, err := winio.ListenPipe(fifos.In, nil)
31+
if fifos.StdIn != "" {
32+
l, err := winio.ListenPipe(fifos.StdIn, nil)
3333
if err != nil {
34-
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In)
34+
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn)
3535
}
3636
defer func(l net.Listener) {
3737
if err != nil {
@@ -43,7 +43,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
4343
go func() {
4444
c, err := l.Accept()
4545
if err != nil {
46-
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In)
46+
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn)
4747
return
4848
}
4949
io.Copy(c, ioset.in)
@@ -52,10 +52,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
5252
}()
5353
}
5454

55-
if fifos.Out != "" {
56-
l, err := winio.ListenPipe(fifos.Out, nil)
55+
if fifos.StdOut != "" {
56+
l, err := winio.ListenPipe(fifos.StdOut, nil)
5757
if err != nil {
58-
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out)
58+
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut)
5959
}
6060
defer func(l net.Listener) {
6161
if err != nil {
@@ -69,7 +69,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
6969
defer wg.Done()
7070
c, err := l.Accept()
7171
if err != nil {
72-
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out)
72+
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut)
7373
return
7474
}
7575
io.Copy(ioset.out, c)
@@ -78,10 +78,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
7878
}()
7979
}
8080

81-
if !tty && fifos.Err != "" {
82-
l, err := winio.ListenPipe(fifos.Err, nil)
81+
if !tty && fifos.StdErr != "" {
82+
l, err := winio.ListenPipe(fifos.StdErr, nil)
8383
if err != nil {
84-
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err)
84+
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr)
8585
}
8686
defer func(l net.Listener) {
8787
if err != nil {
@@ -95,7 +95,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
9595
defer wg.Done()
9696
c, err := l.Accept()
9797
if err != nil {
98-
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err)
98+
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr)
9999
return
100100
}
101101
io.Copy(ioset.err, c)

0 commit comments

Comments
 (0)