Skip to content

Commit 9cc477c

Browse files
authored
Add system test app (#84)
* add the smoke test app and run the smoke test in PRs Adds the smoke test app from sindri to the public repo under a new module kafka-client-examples::simple-example. Adds steps to the PR builder to run smoke tests * update master builds * working main build * use local pulumi * java11 * LOGGER->LOG --------- Co-authored-by: Rohan Desai <rohan@responsive.dev>
1 parent feebcc6 commit 9cc477c

File tree

9 files changed

+355
-2
lines changed

9 files changed

+355
-2
lines changed

.github/workflows/github-main.yaml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
name: CI/CD Main
22
on:
33
push:
4-
branches: main
4+
branches:
5+
- main
56
jobs:
67
build:
78
permissions:
@@ -49,3 +50,40 @@ jobs:
4950
with:
5051
arguments: pushCRD pushDocker pushHelm -PdockerRegistry=${{ steps.login-ecr-public.outputs.registry }}/j8q9y0n6 -PhelmRegistry=${{ steps.login-ecr-public.outputs.registry }}/j8q9y0n6
5152
if: github.ref == 'refs/heads/main'
53+
54+
- name: Configure AWS Credentials
55+
uses: aws-actions/configure-aws-credentials@v2
56+
with:
57+
role-to-assume: arn:aws:iam::292505934682:role/github-responsive-pub-main
58+
role-session-name: github-publish-artifacts
59+
aws-region: us-west-2
60+
61+
- name: Login to Amazon ECR
62+
id: login-ecr
63+
uses: aws-actions/amazon-ecr-login@v1
64+
65+
- uses: actions/checkout@v3
66+
with:
67+
repository: responsivedev/sindri
68+
path: sindri
69+
ref: refs/heads/master
70+
token: ${{ secrets.TOOLS_GHA_TOKEN }}
71+
72+
- name: Install Kind
73+
uses: helm/kind-action@v1.5.0
74+
with:
75+
install_only: true
76+
77+
- name: Run Smoke Test
78+
run: "sindri/scripts/run-smoke-test -s master -w system-tests-pub -p true -i public.ecr.aws/j8q9y0n6/responsiveinc/simple-example -t `./gradlew kafka-client:cV | grep \"Project version\" | sed 's/Project version: //'`"
79+
env:
80+
DD_API_KEY: ${{ secrets.DD_API_KEY }}
81+
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
82+
83+
- name: Push next pub version to Sindri
84+
working-directory: sindri
85+
run: |
86+
git config --global user.name 'Responsive Tools'
87+
git config --global user.email 'tools@responsive.dev'
88+
git commit -am "update responsive-pub artifact versions"
89+
git push

.github/workflows/github-pr.yaml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,26 @@ jobs:
3030
- name: Build & Test
3131
uses: gradle/gradle-build-action@v2
3232
with:
33-
arguments: build
33+
arguments: build kafka-client-examples:simple-example:buildDocker
34+
35+
- uses: actions/checkout@v3
36+
with:
37+
repository: responsivedev/sindri
38+
path: sindri
39+
ref: refs/heads/master
40+
token: ${{ secrets.TOOLS_GHA_TOKEN }}
41+
42+
- name: Login to Amazon ECR
43+
id: login-ecr
44+
uses: aws-actions/amazon-ecr-login@v1
45+
46+
- name: Install Kind
47+
uses: helm/kind-action@v1.5.0
48+
with:
49+
install_only: true
50+
51+
- name: Run Smoke Test
52+
run: "sindri/scripts/run-smoke-test -l -w system-tests-pub -s master -p false -i simple-example -t `./gradlew kafka-client:cV | grep \"Project version\" | sed 's/Project version: //'`"
53+
env:
54+
DD_API_KEY: ${{ secrets.DD_API_KEY }}
55+
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
plugins {
2+
id("responsive.java-application-conventions")
3+
id("responsive.docker")
4+
}
5+
6+
application {
7+
mainClass.set("dev.responsive.examples.simpleapp.Main")
8+
}
9+
10+
dependencies {
11+
// todo: how to set the version here?
12+
implementation(project(":kafka-client"))
13+
implementation("com.google.guava:guava:32.1.1-jre")
14+
implementation("org.apache.kafka:kafka-clients:3.4.0")
15+
implementation("org.apache.kafka:kafka-streams:3.4.0")
16+
implementation("io.opentelemetry.javaagent:opentelemetry-javaagent:1.25.0")
17+
implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
18+
implementation("org.apache.commons:commons-text:1.10.0")
19+
implementation("com.scylladb:java-driver-core:4.15.0.0")
20+
implementation("com.scylladb:java-driver-query-builder:4.15.0.0")
21+
implementation("com.scylladb:java-driver-mapper-runtime:4.15.0.0")
22+
}
23+
24+
java {
25+
toolchain {
26+
languageVersion.set(JavaLanguageVersion.of(11))
27+
}
28+
}
29+
30+
version = project(":kafka-client").version
31+
32+
responsive_docker.dockerImage.set("simple-example:$version")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM public.ecr.aws/j8q9y0n6/responsivedev/system-test-base:0.1.0
2+
3+
COPY libs/*.jar /usr/share/java/responsive-simpleapp/
4+
COPY scripts/* /
5+
6+
CMD /run-simpleapp
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
---
2+
rules:
3+
# kafka client metrics
4+
5+
- bean: kafka.consumer:type=consumer-fetch-manager-metrics,partition=*,topic=*,client-id=*
6+
metricAttribute:
7+
partition: param(partition)
8+
topic: param(topic)
9+
clientid: param(client-id)
10+
mapping:
11+
records-lag:
12+
metric: kafka.streams.records.lag
13+
type: gauge
14+
desc: the current lag of the partition
15+
unit: '{messages}'
16+
- bean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*
17+
metricAttribute:
18+
clientid: param(client-id)
19+
mapping:
20+
records-lag-max:
21+
metric: kafka.streams.records.lag.max
22+
type: gauge
23+
desc: the current lag of the partition
24+
unit: '{messages}'
25+
- bean: kafka.streams:type=stream-thread-metrics,thread-id=*
26+
metricAttribute:
27+
thread: param(thread-id)
28+
mapping:
29+
process-total:
30+
metric: kafka.streams.thread.process.total
31+
type: gauge
32+
desc: total records processed
33+
unit: '{records}'
34+
- bean: kafka.streams:type=stream-topic-metrics,thread-id=*,task-id=*,processor-node-id=*,topic=*
35+
metricAttribute:
36+
thread: param(thread-id)
37+
task: param(task-id)
38+
processor: param(processor-node-id)
39+
topic: param(topic)
40+
mapping:
41+
records-consumed-total:
42+
metric: kafka.streams.topic.consumed.total
43+
type: gauge
44+
desc: total records consumed
45+
unit: '{records}'
46+
- bean: kafka.streams:type=stream-task-metrics,thread-id=*,task-id=*
47+
metricAttribute:
48+
thread: param(thread-id)
49+
task: param(task-id)
50+
mapping:
51+
process-total:
52+
metric: kafka.streams.task.process.total
53+
type: gauge
54+
desc: total records processed
55+
unit: '{records}'
56+
- bean: kafka.streams:type=stream-thread-metrics,thread-id=*
57+
metricAttribute:
58+
thread: param(thread-id)
59+
mapping:
60+
blocked-time-ns-total:
61+
metric: kafka.streams.thread.blocked.time.total.ns
62+
type: gauge
63+
desc: total time the stream thread was blocked
64+
unit: '{nanoseconds}'
65+
- bean: kafka.streams:type=stream-thread-metrics,thread-id=*
66+
metricAttribute:
67+
thread: param(thread-id)
68+
mapping:
69+
thread-start-time:
70+
metric: kafka.streams.thread.start.time
71+
type: gauge
72+
desc: the time the kafka streams thread was started
73+
unit: '{milliseconds}'
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
EXPORTER_OPTS="-javaagent:/usr/share/java/responsive-simpleapp/opentelemetry-javaagent-1.25.0.jar
2+
-Dotel.metrics.exporter=otlp
3+
-Dotel.service.name=simpleapp
4+
-Dotel.jmx.config=/otel-jmx.config
5+
-Dotel.exporter.otlp.endpoint=${CONTROLLER_ENDPOINT}
6+
-Dotel.exporter.otlp.metrics.endpoint=${CONTROLLER_ENDPOINT}
7+
-Dotel.resource.attributes=responsiveApplicationId=responsive/responsive-simpleapp
8+
-Dotel.metric.export.interval=10000
9+
"
10+
11+
JMX_OPTS="-Dcom.sun.management.jmxremote=true
12+
-Dcom.sun.management.jmxremote.port=7192
13+
-Dcom.sun.management.jmxremote.authenticate=false
14+
-Dcom.sun.management.jmxremote.ssl=false
15+
-Dcom.sun.management.jmxremote.local.only=false
16+
-Dcom.sun.management.jmxremote.rmi.port=7192
17+
-Djava.rmi.server.hostname=${POD_IP}
18+
"
19+
20+
java -Dorg.apache.logging.log4j.level=INFO ${EXPORTER_OPTS} ${JMX_OPTS} -cp "/usr/share/java/responsive-simpleapp/*" dev.responsive.examples.simpleapp.Main ${CONFIG_PATH}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package dev.responsive.examples.simpleapp;
2+
3+
import java.io.FileInputStream;
4+
import java.io.IOException;
5+
import java.io.InputStream;
6+
import java.util.Map;
7+
import java.util.Properties;
8+
import java.util.stream.Collectors;
9+
10+
public class Main {
11+
public static void main(String[] args) throws IOException {
12+
final Map<Object, Object> rawCfg;
13+
try (InputStream input = new FileInputStream(args[0])) {
14+
final Properties properties = new Properties();
15+
properties.load(input);
16+
rawCfg = properties.keySet().stream()
17+
.collect(Collectors.toMap(k -> k, properties::get));
18+
}
19+
final SimpleApplication application = new SimpleApplication(rawCfg);
20+
Runtime.getRuntime().addShutdownHook(new Thread(application::stop));
21+
application.start();
22+
application.await();
23+
}
24+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package dev.responsive.examples.simpleapp;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
5+
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
6+
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
7+
import dev.responsive.kafka.api.ResponsiveStores;
8+
import java.net.InetSocketAddress;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Properties;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
import org.apache.kafka.clients.admin.Admin;
14+
import org.apache.kafka.clients.admin.NewTopic;
15+
import org.apache.kafka.common.config.AbstractConfig;
16+
import org.apache.kafka.common.config.ConfigDef;
17+
import org.apache.kafka.common.config.ConfigDef.Importance;
18+
import org.apache.kafka.common.serialization.Serdes;
19+
import org.apache.kafka.streams.KafkaStreams;
20+
import org.apache.kafka.streams.StreamsBuilder;
21+
import org.apache.kafka.streams.kstream.Consumed;
22+
import org.apache.kafka.streams.kstream.KStream;
23+
import org.apache.kafka.streams.kstream.KTable;
24+
import org.apache.kafka.streams.kstream.Produced;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
public class SimpleApplication {
29+
private static final Logger LOG = LoggerFactory.getLogger(SimpleApplication.class);
30+
31+
public static final class Config extends AbstractConfig {
32+
private static final String PREFIX = "responsive.simple.app.";
33+
private static final String SOURCE = PREFIX + "source";
34+
private static final String NAME = PREFIX + "name";
35+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
36+
.define(NAME, ConfigDef.Type.STRING, "", Importance.LOW, "test run name")
37+
.define(SOURCE, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, "source topic");
38+
39+
private Config(final Map<?, ?> properties) {
40+
super(CONFIG_DEF, properties);
41+
}
42+
43+
private String getSourceTopic() {
44+
return getString(NAME) + "-" + getString(SOURCE);
45+
}
46+
}
47+
48+
private final KafkaStreams kafkaStreams;
49+
private final Config config;
50+
private boolean stopped = false;
51+
52+
public SimpleApplication(final Map<?, ?> properties) {
53+
config = new Config(properties);
54+
maybeCreateTopics();
55+
maybeCreateKeyspace();
56+
LOG.info("build topology");
57+
kafkaStreams = buildTopology(config, properties);
58+
}
59+
60+
public synchronized void start() {
61+
if (!stopped) {
62+
LOG.info("start kafka streams");
63+
kafkaStreams.start();
64+
}
65+
}
66+
67+
public synchronized void stop() {
68+
if (!stopped) {
69+
kafkaStreams.close();
70+
stopped = true;
71+
this.notify();
72+
}
73+
}
74+
75+
public synchronized void await() {
76+
while (!stopped) {
77+
try {
78+
this.wait();
79+
} catch (final InterruptedException e) {
80+
throw new RuntimeException(e);
81+
}
82+
}
83+
}
84+
85+
private static KafkaStreams buildTopology(final Config cfg, Map<?, ?> rawCfg) {
86+
final StreamsBuilder builder = new StreamsBuilder();
87+
final String source = cfg.getSourceTopic();
88+
final KStream<byte[], byte[]> stream =
89+
builder.stream(source, Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()));
90+
final AtomicLong msgCounter = new AtomicLong(0);
91+
final KStream<byte[], byte[]> result = stream.mapValues((k, v) -> {
92+
final var msgCount = msgCounter.incrementAndGet();
93+
if (msgCount % 1000 == 0) {
94+
System.out.printf("received %d %s%n", msgCount, new String(v));
95+
}
96+
return v;
97+
});
98+
final KTable<byte[], Long> counts = stream.groupByKey()
99+
.count(ResponsiveStores.materialized(cfg.getString(Config.NAME)));
100+
result.to(source + "-out", Produced.with(Serdes.ByteArray(), Serdes.ByteArray()));
101+
counts.toStream().to(source + "-counts", Produced.with(Serdes.ByteArray(), Serdes.Long()));
102+
final Properties properties = new Properties();
103+
properties.putAll(rawCfg);
104+
return ResponsiveKafkaStreams.create(builder.build(properties), properties);
105+
}
106+
107+
private void maybeCreateTopics() {
108+
try (final Admin admin = Admin.create(config.originals())) {
109+
final String source = config.getSourceTopic();
110+
for (final var topic : List.of(source, source + "-out", source + "-counts")) {
111+
LOG.info("create topic {}", topic);
112+
try {
113+
admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1)));
114+
} catch (final RuntimeException e) {
115+
LOG.info("Error creating topic: " + e);
116+
}
117+
}
118+
}
119+
}
120+
121+
private void maybeCreateKeyspace() {
122+
LOG.info("create keyspace test");
123+
try (final CqlSession session = cqlSession()) {
124+
final CreateKeyspace createKeyspace = SchemaBuilder.createKeyspace("test")
125+
.ifNotExists()
126+
.withSimpleStrategy(1);
127+
session.execute(createKeyspace.build());
128+
}
129+
}
130+
131+
private CqlSession cqlSession() {
132+
return CqlSession.builder()
133+
.addContactPoint(new InetSocketAddress("scylla-svc", 9042))
134+
.withLocalDatacenter("datacenter1")
135+
.build();
136+
}
137+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ plugins {
2121
}
2222

2323
include("kafka-client")
24+
include("kafka-client-examples:simple-example")
2425
include("controller-api")
2526
include("operator")
2627
include("tools")

0 commit comments

Comments
 (0)