Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
93 changes: 93 additions & 0 deletions sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@

import logging
import os
import random
import string
import typing
import unittest

import pytest

import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
from apache_beam.transforms.util import GcpSecret
from apache_beam.transforms.util import Secret

try:
from google.cloud import secretmanager
except ImportError:
secretmanager = None # type: ignore[assignment]

TEST_PREFIX_URN = "beam:transforms:xlang:test:prefix"
TEST_MULTI_URN = "beam:transforms:xlang:test:multi"
Expand Down Expand Up @@ -140,6 +150,24 @@ def run_group_by_key(self, pipeline):
| beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1])))))
assert_that(res, equal_to(['0:1,2', '1:3']))

def run_group_by_key_no_assert(self, pipeline):
"""
Target transform - GroupByKey, with no assertion for checking errors
(https://beam.apache.org/documentation/programming-guide/#groupbykey)
Test scenario - Grouping a collection of KV<K,V> to a collection of
KV<K, Iterable<V>> by key
Boundary conditions checked -
- PCollection<KV<?, ?>> to external transforms
- PCollection<KV<?, Iterable<?>>> from external transforms
"""
with pipeline as p:
_ = (
p
| beam.Create([(0, "1"), (0, "2"),
(1, "3")], reshuffle=False).with_output_types(
typing.Tuple[int, str])
| beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service))

def run_cogroup_by_key(self, pipeline):
"""
Target transform - CoGroupByKey
Expand Down Expand Up @@ -298,6 +326,71 @@ def test_partition(self, test_pipeline=None):
test_pipeline or self.create_pipeline())


@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
@unittest.skipIf(secretmanager is None, 'secretmanager not installed')
class ValidateRunnerGBEKTest(unittest.TestCase):
def setUp(self):
if secretmanager is not None:
self.project_id = 'apache-beam-testing'
secret_postfix = ''.join(random.choice(string.digits) for _ in range(6))
self.secret_id = 'gbek_secret_tests_' + secret_postfix
self.client = secretmanager.SecretManagerServiceClient()
self.project_path = f'projects/{self.project_id}'
self.secret_path = f'{self.project_path}/secrets/{self.secret_id}'
try:
self.client.get_secret(request={'name': self.secret_path})
except Exception:
self.client.create_secret(
request={
'parent': self.project_path,
'secret_id': self.secret_id,
'secret': {
'replication': {
'automatic': {}
}
}
})
self.client.add_secret_version(
request={
'parent': self.secret_path,
'payload': {
'data': Secret.generate_secret_bytes()
}
})
version_name = f'{self.secret_path}/versions/latest'
self.gcp_secret = GcpSecret(version_name)
self.secret_option = f'type:GcpSecret;version_name:{version_name}'

def tearDown(self):
if secretmanager is not None:
self.client.delete_secret(request={'name': self.secret_path})

def create_pipeline(self):
test_pipeline = TestPipeline()
test_pipeline.not_use_test_runner_api = True
return test_pipeline

# This test and test_group_by_key_gbek_bad_secret validate that the gbek
# pipeline option is correctly passed through
@pytest.mark.uses_java_expansion_service
@pytest.mark.uses_python_expansion_service
def test_group_by_key_gbek(self, test_pipeline=None):
test_pipeline = test_pipeline or self.create_pipeline()
good_secret = self.secret_option
test_pipeline.options.view_as(SetupOptions).gbek = good_secret
CrossLanguageTestPipelines().run_group_by_key(test_pipeline)

# Verify actually using secret manager
test_pipeline = self.create_pipeline()
nonexistent_secret = 'version_name:nonexistent_secret'
test_pipeline.options.view_as(SetupOptions).gbek = nonexistent_secret
with self.assertRaisesRegex(
Exception, 'Secret string must contain a valid type parameter'):
CrossLanguageTestPipelines().run_group_by_key_no_assert(test_pipeline)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading