Skip to content

Commit 8b2525b

Browse files
authored
Add buffered logger to the Python bootloader (#28317)
* [WIP] Add buffered logging to the Python Bootloader * Take pip out of quiet 1 * Reroute Execute fns to new ExecuteEnvWithIO
1 parent d559cfe commit 8b2525b

File tree

4 files changed

+60
-28
lines changed

4 files changed

+60
-28
lines changed

sdks/go/container/tools/buffered_logging.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"strings"
2222
)
2323

24-
const INITIAL_LOG_SIZE int = 255
24+
const initialLogSize int = 255
2525

2626
// BufferedLogger is a wrapper around the FnAPI logging client meant to be used
2727
// in place of stdout and stderr in bootloader subprocesses. Not intended for
@@ -46,7 +46,7 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
4646
}
4747
n, err := b.builder.Write(p)
4848
if b.logs == nil {
49-
b.logs = make([]string, 0, INITIAL_LOG_SIZE)
49+
b.logs = make([]string, 0, initialLogSize)
5050
}
5151
b.logs = append(b.logs, b.builder.String())
5252
b.builder.Reset()

sdks/go/pkg/beam/util/execx/exec.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,30 @@
1717
package execx
1818

1919
import (
20+
"io"
2021
"os"
2122
"os/exec"
2223
)
2324

2425
// Execute runs the program with the given arguments. It attaches stdio to the
2526
// child process.
2627
func Execute(prog string, args ...string) error {
27-
return ExecuteEnv(nil, prog, args...)
28+
return ExecuteEnvWithIO(nil, os.Stdin, os.Stdout, os.Stderr, prog, args...)
2829
}
2930

3031
// ExecuteEnv runs the program with the given arguments with additional environment
3132
// variables. It attaches stdio to the child process.
3233
func ExecuteEnv(env map[string]string, prog string, args ...string) error {
34+
return ExecuteEnvWithIO(env, os.Stdin, os.Stdout, os.Stderr, prog, args...)
35+
}
36+
37+
// ExecuteEnvWithIO runs the program with the given arguments with additional environment
38+
// variables. It attaches custom IO to the child process.
39+
func ExecuteEnvWithIO(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) error {
3340
cmd := exec.Command(prog, args...)
34-
cmd.Stdin = os.Stdin
35-
cmd.Stdout = os.Stdout
36-
cmd.Stderr = os.Stderr
41+
cmd.Stdin = stdin
42+
cmd.Stdout = stdout
43+
cmd.Stderr = stderr
3744
if env != nil {
3845
cmd.Env = os.Environ()
3946
for k, v := range env {

sdks/python/container/boot.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func launchSDKProcess() error {
205205
}
206206
}
207207

208-
if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
208+
if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil {
209209
fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr)
210210
// Send error message to logging service before returning up the call stack
211211
logger.Errorf(ctx, fmtErr.Error())
@@ -379,7 +379,7 @@ func setupAcceptableWheelSpecs() error {
379379
}
380380

381381
// installSetupPackages installs Beam SDK and user dependencies.
382-
func installSetupPackages(files []string, workDir string, requirementsFiles []string) error {
382+
func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error {
383383
log.Printf("Installing setup packages ...")
384384

385385
if err := setupAcceptableWheelSpecs(); err != nil {
@@ -389,25 +389,25 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st
389389
pkgName := "apache-beam"
390390
isSdkInstalled := isPackageInstalled(pkgName)
391391
if !isSdkInstalled {
392-
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/.")
392+
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/")
393393
}
394394
// Install the Dataflow Python SDK and worker packages.
395395
// We install the extra requirements in case of using the beam sdk. These are ignored by pip
396396
// if the user is using an SDK that does not provide these.
397-
if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
397+
if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
398398
return fmt.Errorf("failed to install SDK: %v", err)
399399
}
400400
// The staged files will not disappear due to restarts because workDir is a
401401
// folder that is mapped to the host (and therefore survives restarts).
402402
for _, f := range requirementsFiles {
403-
if err := pipInstallRequirements(files, workDir, f); err != nil {
403+
if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil {
404404
return fmt.Errorf("failed to install requirements: %v", err)
405405
}
406406
}
407-
if err := installExtraPackages(files, extraPackagesFile, workDir); err != nil {
407+
if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil {
408408
return fmt.Errorf("failed to install extra packages: %v", err)
409409
}
410-
if err := pipInstallPackage(files, workDir, workflowFile, false, true, nil); err != nil {
410+
if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil {
411411
return fmt.Errorf("failed to install workflow: %v", err)
412412
}
413413

@@ -450,7 +450,7 @@ func processArtifactsInSetupOnlyMode() {
450450
}
451451
files[i] = filePayload.GetPath()
452452
}
453-
if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil {
453+
if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil {
454454
log.Fatalf("Failed to install required packages: %v", setupErr)
455455
}
456456
}

sdks/python/container/piputil.go

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818
import (
1919
"bufio"
2020
"bytes"
21+
"context"
2122
"errors"
2223
"fmt"
2324
"log"
@@ -26,16 +27,18 @@ import (
2627
"path/filepath"
2728
"strings"
2829

30+
"github.com/apache/beam/sdks/v2/go/container/tools"
2931
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
3032
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
3133
)
3234

3335
// pipInstallRequirements installs the given requirement, if present.
34-
func pipInstallRequirements(files []string, dir, name string) error {
36+
func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []string, dir, name string) error {
3537
pythonVersion, err := expansionx.GetPythonVersion()
3638
if err != nil {
3739
return err
3840
}
41+
bufLogger := tools.NewBufferedLogger(logger)
3942
for _, file := range files {
4043
if file == name {
4144
// We run the install process in two rounds in order to avoid as much
@@ -50,7 +53,13 @@ func pipInstallRequirements(files []string, dir, name string) error {
5053
// also installs dependencies. The key is that if all the packages have
5154
// been installed in the first round then this command will be a no-op.
5255
args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir}
53-
return execx.Execute(pythonVersion, args...)
56+
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
57+
if err != nil {
58+
bufLogger.FlushAtError(ctx)
59+
return err
60+
}
61+
bufLogger.FlushAtDebug(ctx)
62+
return nil
5463
}
5564
}
5665
return nil
@@ -69,11 +78,12 @@ func isPackageInstalled(pkgName string) bool {
6978
}
7079

7180
// pipInstallPackage installs the given package, if present.
72-
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
81+
func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string, dir, name string, force, optional bool, extras []string) error {
7382
pythonVersion, err := expansionx.GetPythonVersion()
7483
if err != nil {
7584
return err
7685
}
86+
bufLogger := tools.NewBufferedLogger(logger)
7787
for _, file := range files {
7888
if file == name {
7989
var packageSpec = name
@@ -97,19 +107,34 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
97107
// installed version will match the package specified, the package itself
98108
// will not be reinstalled, but its dependencies will now be resolved and
99109
// installed if necessary. This achieves our goal outlined above.
100-
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
110+
args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
101111
filepath.Join(dir, packageSpec)}
102-
err := execx.Execute(pythonVersion, args...)
112+
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
103113
if err != nil {
114+
bufLogger.FlushAtError(ctx)
104115
return err
116+
} else {
117+
bufLogger.FlushAtDebug(ctx)
105118
}
106-
args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
107-
return execx.Execute(pythonVersion, args...)
119+
args = []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
120+
err = execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
121+
if err != nil {
122+
bufLogger.FlushAtError(ctx)
123+
return err
124+
}
125+
bufLogger.FlushAtDebug(ctx)
126+
return nil
108127
}
109128

110129
// Case when we do not perform a forced reinstall.
111-
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
112-
return execx.Execute(pythonVersion, args...)
130+
args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
131+
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
132+
if err != nil {
133+
bufLogger.FlushAtError(ctx)
134+
return err
135+
}
136+
bufLogger.FlushAtDebug(ctx)
137+
return nil
113138
}
114139
}
115140
if optional {
@@ -120,7 +145,7 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
120145

121146
// installExtraPackages installs all the packages declared in the extra
122147
// packages manifest file.
123-
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
148+
func installExtraPackages(ctx context.Context, logger *tools.Logger, files []string, extraPackagesFile, dir string) error {
124149
// First check that extra packages manifest file is present.
125150
for _, file := range files {
126151
if file != extraPackagesFile {
@@ -139,7 +164,7 @@ func installExtraPackages(files []string, extraPackagesFile, dir string) error {
139164
for s.Scan() {
140165
extraPackage := s.Text()
141166
log.Printf("Installing extra package: %s", extraPackage)
142-
if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
167+
if err = pipInstallPackage(ctx, logger, files, dir, extraPackage, true, false, nil); err != nil {
143168
return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
144169
}
145170
}
@@ -167,13 +192,13 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
167192
// assume that the pipleine was started with the Beam SDK found in the wheel
168193
// file, and we try to install it. If not successful, we fall back to installing
169194
// SDK from source tarball provided in sdkSrcFile.
170-
func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
195+
func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
171196
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
172197

173198
if sdkWhlFile != "" {
174199
// by default, pip rejects to install wheel if same version already installed
175200
isDev := strings.Contains(sdkWhlFile, ".dev")
176-
err := pipInstallPackage(files, workDir, sdkWhlFile, isDev, false, []string{"gcp"})
201+
err := pipInstallPackage(ctx, logger, files, workDir, sdkWhlFile, isDev, false, []string{"gcp"})
177202
if err == nil {
178203
return nil
179204
}
@@ -185,6 +210,6 @@ func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhl
185210
return nil
186211
}
187212
}
188-
err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
213+
err := pipInstallPackage(ctx, logger, files, workDir, sdkSrcFile, false, false, []string{"gcp"})
189214
return err
190215
}

0 commit comments

Comments
 (0)