Skip to content

Commit 8339ef1

Browse files
committed
k8s resource watch and triger action based on the events
1 parent df2ab4b commit 8339ef1

File tree

7 files changed

+407
-3
lines changed

7 files changed

+407
-3
lines changed

docs/design/test-harness-framework.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ The framework should automatically inject these filter values into the environme
469469
### Skipping features
470470
The test framework should provide the ability to explicitly exclude features during a test run. This could be done with the following flags:
471471

472-
* `--skip-feature` - a regular expression that skips features with matching names
472+
* `--skip-features` - a regular expression that skips features with matching names
473473
* `--skip-assessment` - a regular expression that skips assessment with matching name
474474
* `--skip-lables` - a comma-separated list of key/value pairs used to skip features with matching lables
475475

examples/watch_resources/README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Watching for Resource Changes
2+
3+
The test harness supports several methods for querying Kubernetes object types and watching for resource states. This example shows how to watch particular resource and how to register the functions to act upon based on the events recieved.
4+
5+
6+
# Watch for the deployment and triger action based on the event
7+
8+
Watch has to run as goroutine to get the different events based on the k8s resource state changes.
9+
```go
10+
func TestWatchForResources(t *testing.T) {
11+
...
12+
dep := appsv1.Deployment{
13+
ObjectMeta: metav1.ObjectMeta{Name: "watch-dep", Namespace: cfg.Namespace()},
14+
}
15+
16+
// watch for the deployment and triger action based on the event recieved.
17+
go cl.Resources().Watch(&appsv1.DeploymentList{}, &client.ListOptions{
18+
FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name),
19+
Namespace: dep.Namespace}, cl.RESTConfig()).WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx)
20+
...
21+
}
22+
```
23+
24+
# Function/Action definition and registering these actions
25+
26+
```go
27+
// onAdd is the function executed when the kubernetes watch notifies the
28+
// presence of a new kubernetes deployment in the cluster
29+
func onAdd(obj interface{}) {
30+
dep := obj.(*appsv1.Deployment)
31+
depName := dep.GetName()
32+
fmt.Printf("Dep name recieved is %s", depName)
33+
if depName == "watch-dep" {
34+
fmt.Println("Dep name matches with actual name!")
35+
}
36+
}
37+
38+
// onDelete is the function executed when the kubernetes watch notifies
39+
// delete event on deployment
40+
func onDelete(obj interface{}) {
41+
dep := obj.(*appsv1.Deployment)
42+
depName := dep.GetName()
43+
if depName == "watch-dep" {
44+
fmt.Println("Deployment deleted successfully!")
45+
}
46+
}
47+
```
48+
49+
The above functions can be registered using Register functions(WithAddFunc(), WithDeleteFunc(), WithUpdateFunc()) defined under klient/k8s/watcher/watch.go as shown in the example.
50+
51+
# How to stop the watcher
52+
Create a global EventHandlerFuncs variable to store the watcher object and call Stop() as shown in example TestWatchForResourcesWithStop() test.
53+
54+
Note: User should explicitly invoke the Stop() after the watch once the feature is done to ensure no unwanted go routine thread leackage.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watch_resources
18+
19+
import (
20+
"os"
21+
"testing"
22+
23+
"sigs.k8s.io/e2e-framework/pkg/env"
24+
"sigs.k8s.io/e2e-framework/pkg/envconf"
25+
"sigs.k8s.io/e2e-framework/pkg/envfuncs"
26+
)
27+
28+
var testenv env.Environment
29+
30+
func TestMain(m *testing.M) {
31+
testenv = env.New()
32+
kindClusterName := envconf.RandomName("watch-for-resources", 16)
33+
namespace := envconf.RandomName("watch-ns", 16)
34+
testenv.Setup(
35+
envfuncs.CreateKindCluster(kindClusterName),
36+
envfuncs.CreateNamespace(namespace),
37+
)
38+
testenv.Finish(
39+
envfuncs.DeleteNamespace(namespace),
40+
envfuncs.DestroyKindCluster(kindClusterName),
41+
)
42+
os.Exit(testenv.Run(m))
43+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package watch_resources
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
appsv1 "k8s.io/api/apps/v1"
24+
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/labels"
27+
"k8s.io/klog/v2"
28+
"sigs.k8s.io/e2e-framework/klient/k8s/resources"
29+
"sigs.k8s.io/e2e-framework/klient/k8s/watcher"
30+
"sigs.k8s.io/e2e-framework/pkg/envconf"
31+
"sigs.k8s.io/e2e-framework/pkg/features"
32+
)
33+
34+
func TestWatchForResources(t *testing.T) {
35+
watchFeature := features.New("test watcher").WithLabel("env", "dev").
36+
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
37+
cl, err := cfg.NewClient()
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
dep := appsv1.Deployment{
43+
ObjectMeta: metav1.ObjectMeta{Name: "watch-dep", Namespace: cfg.Namespace()},
44+
}
45+
46+
// watch for the deployment and triger action based on the event recieved.
47+
go cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))).
48+
WithAddFunc(onAdd).WithDeleteFunc(onDelete).Start(ctx)
49+
50+
return ctx
51+
}).
52+
Assess("create watch deployment", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
53+
// create a deployment
54+
deployment := newDeployment(cfg.Namespace(), "watch-dep", 1)
55+
client, err := cfg.NewClient()
56+
if err != nil {
57+
t.Fatal(err)
58+
}
59+
if err := client.Resources().Create(ctx, deployment); err != nil {
60+
t.Fatal(err)
61+
}
62+
return context.WithValue(ctx, "test-dep", deployment)
63+
}).
64+
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
65+
client, err := cfg.NewClient()
66+
if err != nil {
67+
t.Fatal(err)
68+
}
69+
depl := ctx.Value("test-dep").(*appsv1.Deployment)
70+
if err := client.Resources().Delete(ctx, depl); err != nil {
71+
t.Fatal(err)
72+
}
73+
return ctx
74+
}).Feature()
75+
76+
testenv.Test(t, watchFeature)
77+
78+
}
79+
80+
// TestWatchForResourcesWithStop() demonstartes how to start and stop the watcher
81+
var w *watcher.EventHandlerFuncs
82+
83+
func TestWatchForResourcesWithStop(t *testing.T) {
84+
watchFeature := features.New("test watcher with stop").WithLabel("env", "prod").
85+
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
86+
cl, err := cfg.NewClient()
87+
if err != nil {
88+
t.Fatal(err)
89+
}
90+
91+
dep := appsv1.Deployment{
92+
ObjectMeta: metav1.ObjectMeta{Name: "watchnstop-dep", Namespace: cfg.Namespace()},
93+
}
94+
95+
// watch for the deployment and triger action based on the event recieved.
96+
w = cl.Resources().Watch(&appsv1.DeploymentList{}, resources.WithFieldSelector(labels.FormatLabels(map[string]string{"metadata.name": dep.Name}))).
97+
WithAddFunc(onAdd).WithDeleteFunc(onDelete)
98+
99+
go func() {
100+
err := w.Start(ctx)
101+
if err != nil {
102+
t.Error(err)
103+
}
104+
}()
105+
106+
return ctx
107+
}).
108+
Assess("create watch deployment", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
109+
// create a deployment
110+
deployment := newDeployment(cfg.Namespace(), "watchnstop-dep", 1)
111+
client, err := cfg.NewClient()
112+
if err != nil {
113+
t.Fatal(err)
114+
}
115+
if err := client.Resources().Create(ctx, deployment); err != nil {
116+
t.Fatal(err)
117+
}
118+
return context.WithValue(ctx, "stop-dep", deployment)
119+
}).
120+
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
121+
client, err := cfg.NewClient()
122+
if err != nil {
123+
t.Fatal(err)
124+
}
125+
depl := ctx.Value("stop-dep").(*appsv1.Deployment)
126+
if err := client.Resources().Delete(ctx, depl); err != nil {
127+
t.Fatal(err)
128+
}
129+
130+
w.Stop()
131+
132+
return ctx
133+
}).Feature()
134+
135+
testenv.Test(t, watchFeature)
136+
137+
}
138+
139+
func newDeployment(namespace string, name string, replicas int32) *appsv1.Deployment {
140+
return &appsv1.Deployment{
141+
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, Labels: map[string]string{"app": "watch-for-resources"}},
142+
Spec: appsv1.DeploymentSpec{
143+
Replicas: &replicas,
144+
Selector: &metav1.LabelSelector{
145+
MatchLabels: map[string]string{"app": "watch-for-resources"},
146+
},
147+
Template: v1.PodTemplateSpec{
148+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "watch-for-resources"}},
149+
Spec: v1.PodSpec{Containers: []v1.Container{{Name: "nginx", Image: "nginx"}}},
150+
},
151+
},
152+
}
153+
}
154+
155+
// onAdd is the function executed when the kubernetes watch notifies the
156+
// presence of a new kubernetes deployment in the cluster
157+
func onAdd(obj interface{}) {
158+
dep := obj.(*appsv1.Deployment)
159+
depName := dep.GetName()
160+
if depName == "watch-dep" || depName == "watchnstop-dep" {
161+
klog.InfoS("Deployment name matches with actual name!")
162+
}
163+
}
164+
165+
// onDelete is the function executed when the kubernetes watch notifies
166+
// delete event on deployment
167+
func onDelete(obj interface{}) {
168+
dep := obj.(*appsv1.Deployment)
169+
depName := dep.GetName()
170+
if depName == "watch-dep" || depName == "watchnstop-dep" {
171+
klog.InfoS("Deployment deleted successfully!")
172+
}
173+
}

klient/decoder/decoder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func DecodeEach(ctx context.Context, manifest io.Reader, handlerFn HandlerFunc,
132132
return nil
133133
}
134134

135-
// Decode a stream of documents of any Kind using either the innate typing of the scheme.
135+
// DecodeAll is a stream of documents of any Kind using either the innate typing of the scheme.
136136
// Falls back to the unstructured.Unstructured type if a matching type cannot be found for the Kind.
137137
// Options may be provided to configure the behavior of the decoder.
138138
func DecodeAll(ctx context.Context, manifest io.Reader, options ...DecodeOption) ([]k8s.Object, error) {
@@ -144,7 +144,7 @@ func DecodeAll(ctx context.Context, manifest io.Reader, options ...DecodeOption)
144144
return objects, err
145145
}
146146

147-
// Decode any single-document YAML or JSON input using either the innate typing of the scheme.
147+
// DecodeAny decodes any single-document YAML or JSON input using either the innate typing of the scheme.
148148
// Falls back to the unstructured.Unstructured type if a matching type cannot be found for the Kind.
149149
// Options may be provided to configure the behavior of the decoder.
150150
func DecodeAny(manifest io.Reader, options ...DecodeOption) (k8s.Object, error) {

klient/k8s/resources/resources.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/client-go/rest"
2828
cr "sigs.k8s.io/controller-runtime/pkg/client"
2929
"sigs.k8s.io/e2e-framework/klient/k8s"
30+
"sigs.k8s.io/e2e-framework/klient/k8s/watcher"
3031
)
3132

3233
type Resources struct {
@@ -66,6 +67,11 @@ func New(cfg *rest.Config) (*Resources, error) {
6667
return res, nil
6768
}
6869

70+
// GetConfig hepls to get config type *rest.Config
71+
func (r *Resources) GetConfig() *rest.Config {
72+
return r.config
73+
}
74+
6975
func (r *Resources) WithNamespace(ns string) *Resources {
7076
r.namespace = ns
7177
return r
@@ -182,3 +188,19 @@ func (r *Resources) Label(obj k8s.Object, label map[string]string) {
182188
func (r *Resources) GetScheme() *runtime.Scheme {
183189
return r.scheme
184190
}
191+
192+
func (r *Resources) Watch(object k8s.ObjectList, opts ...ListOption) *watcher.EventHandlerFuncs {
193+
listOptions := &metav1.ListOptions{}
194+
195+
for _, fn := range opts {
196+
fn(listOptions)
197+
}
198+
199+
o := &cr.ListOptions{Raw: listOptions}
200+
201+
return &watcher.EventHandlerFuncs{
202+
ListOptions: o,
203+
K8sObject: object,
204+
Cfg: r.GetConfig(),
205+
}
206+
}

0 commit comments

Comments
 (0)