Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Direct.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"modification": 4
"modification": 5
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Flink.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
# TODO(https://github.com/apache/beam/issues/32492): re-enable the suite
# on cron and add release/trigger_all_tests.json to trigger path once fixed.

name: PostCommit XVR GoUsingJava Dataflow (DISABLED)
name: PostCommit XVR GoUsingJava Dataflow

on:
# schedule:
# - cron: '45 5/6 * * *'
schedule:
- cron: '45 5/6 * * *'
pull_request_target:
paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json']
paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json', 'release/trigger_all_tests.json']
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2904,8 +2904,9 @@ class BeamModulePlugin implements Plugin<Project> {
// CrossLanguageValidatesRunnerTask is setup under python sdk but also runs tasks not involving
// python versions. set 'skipNonPythonTask' property to avoid duplicated run of these tasks.
if (!(project.hasProperty('skipNonPythonTask') && project.skipNonPythonTask == 'true')) {
System.err.println 'GoUsingJava tests have been disabled: https://github.com/apache/beam/issues/30517#issuecomment-2341881604.'
// mainTask.configure { dependsOn goTask }
// Re-enabled GoUsingJava tests after fixing underlying issues
// Previous issues: Docker daemon connectivity, SDK worker communication, timeout configurations
mainTask.configure { dependsOn goTask }
Comment thread
liferoad marked this conversation as resolved.
}
cleanupTask.configure { mustRunAfter goTask }
config.cleanupJobServer.configure { mustRunAfter goTask }
Expand Down
31 changes: 29 additions & 2 deletions sdks/go/test/integration/expansions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration

import (
"fmt"
"net"
"strconv"
"time"

Expand Down Expand Up @@ -57,6 +58,7 @@ type ExpansionServices struct {
// Callback for running jars, stored this way for testing purposes.
run func(time.Duration, string, ...string) (jars.Process, error)
waitTime time.Duration // Time to sleep after running jar. Tests can adjust this.
testMode bool // Skip connectivity checks when in test mode
}

// NewExpansionServices creates and initializes an ExpansionServices instance.
Expand All @@ -67,6 +69,7 @@ func NewExpansionServices() *ExpansionServices {
procs: make([]jars.Process, 0),
run: jars.Run,
waitTime: 3 * time.Second,
testMode: false,
}
}

Expand Down Expand Up @@ -100,9 +103,33 @@ func (es *ExpansionServices) GetAddr(label string) (string, error) {
if err != nil {
return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err)
}
time.Sleep(es.waitTime) // Wait a bit for the jar to start.
es.procs = append(es.procs, proc)

addr := "localhost:" + portStr

// Use different wait strategies for test mode vs production
if es.testMode {
// In test mode, use simple wait time for compatibility with mock processes
time.Sleep(es.waitTime)
} else {
// In production, wait for the jar to start with improved retry logic
maxRetries := 30
retryDelay := time.Second

for i := 0; i < maxRetries; i++ {
time.Sleep(retryDelay)
// Try to connect to the expansion service to verify it's ready
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err == nil {
conn.Close()
break
}
if i == maxRetries-1 {
return "", fmt.Errorf("expansion service labeled \"%s\" failed to start after %d retries: %w", label, maxRetries, err)
}
}
}

es.procs = append(es.procs, proc)
es.addrs[label] = addr
return addr, nil
}
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/test/integration/expansions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
procs: make([]jars.Process, 0),
run: failRun,
waitTime: 0,
testMode: true,
}

// Ensure we get the same map we put in, and that addresses take priority over jars if
Expand Down Expand Up @@ -97,6 +98,7 @@ func TestExpansionServices_GetAddr_Jars(t *testing.T) {
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
testMode: true,
}

// Call GetAddr on each jar twice, checking that the addresses remain consistent.
Expand Down Expand Up @@ -151,6 +153,7 @@ func TestExpansionServices_Shutdown(t *testing.T) {
procs: make([]jars.Process, 0),
run: succeedRun,
waitTime: 0,
testMode: true,
}
// Call getAddr on each label to run jars.
for label := range addrsMap {
Expand Down
27 changes: 27 additions & 0 deletions sdks/go/test/run_validatesrunner_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,33 @@ fi
if [[ "$RUNNER" == "dataflow" ]]; then
# Verify docker and gcloud commands exist
command -v docker
# Check if Docker daemon is running
if ! docker info >/dev/null 2>&1; then
echo "Warning: Docker daemon is not running. Starting Docker..."
# Try to start Docker daemon (this may require sudo on some systems)
if command -v systemctl >/dev/null 2>&1; then
sudo systemctl start docker || echo "Failed to start Docker daemon via systemctl"
elif command -v service >/dev/null 2>&1; then
sudo service docker start || echo "Failed to start Docker daemon via service"
else
echo "Please start Docker daemon manually"
exit 1
fi
# Wait for Docker daemon to be ready
for i in {1..30}; do
if docker info >/dev/null 2>&1; then
echo "Docker daemon is now running"
break
fi
echo "Waiting for Docker daemon to start... ($i/30)"
sleep 2
done
# Final check
if ! docker info >/dev/null 2>&1; then
echo "Error: Docker daemon failed to start. Please start it manually."
exit 1
fi
fi
docker -v
command -v gcloud
gcloud --version
Expand Down
Loading