Skip to content

Commit 7b5d737

Browse files
authored
Merge pull request #3766 from jsternberg/k8s-driver-statefulset
feat: add persistent storage options to k8s driver
2 parents c05897b + 0a0fc90 commit 7b5d737

File tree

5 files changed

+409
-167
lines changed

5 files changed

+409
-167
lines changed

driver/kubernetes/driver.go

Lines changed: 133 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,17 @@ type Driver struct {
4444

4545
// if you add fields, remember to update docs:
4646
// https://github.com/docker/docs/blob/main/content/build/drivers/kubernetes.md
47-
minReplicas int
48-
deployment *appsv1.Deployment
49-
configMaps []*corev1.ConfigMap
50-
deploymentClient kubeclient.DeploymentClient
51-
podClient kubeclient.PodClient
52-
configMapClient kubeclient.ConfigMapClient
53-
podChooser podchooser.PodChooser
54-
defaultLoad bool
55-
timeout time.Duration
47+
minReplicas int
48+
deployment *appsv1.Deployment
49+
statefulSet *appsv1.StatefulSet
50+
configMaps []*corev1.ConfigMap
51+
deploymentClient kubeclient.DeploymentClient
52+
statefulSetClient kubeclient.StatefulSetClient
53+
podClient kubeclient.PodClient
54+
configMapClient kubeclient.ConfigMapClient
55+
podChooser podchooser.PodChooser
56+
defaultLoad bool
57+
timeout time.Duration
5658
}
5759

5860
func (d *Driver) IsMobyDriver() bool {
@@ -65,31 +67,18 @@ func (d *Driver) Config() driver.InitConfig {
6567

6668
func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
6769
return progress.Wrap("[internal] booting buildkit", l, func(sub progress.SubLogger) error {
68-
_, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
69-
if err != nil {
70-
if !apierrors.IsNotFound(err) {
71-
return errors.Wrapf(err, "error for bootstrap %q", d.deployment.Name)
72-
}
73-
74-
for _, cfg := range d.configMaps {
75-
// create ConfigMap first if exists
76-
_, err = d.configMapClient.Create(ctx, cfg, metav1.CreateOptions{})
77-
if err != nil {
78-
if !apierrors.IsAlreadyExists(err) {
79-
return errors.Wrapf(err, "error while calling configMapClient.Create for %q", cfg.Name)
80-
}
81-
_, err = d.configMapClient.Update(ctx, cfg, metav1.UpdateOptions{})
82-
if err != nil {
83-
return errors.Wrapf(err, "error while calling configMapClient.Update for %q", cfg.Name)
84-
}
85-
}
70+
if d.deployment != nil {
71+
if err := bootstrap(ctx, d, d.deploymentClient, d.deployment.Name, d.deployment); err != nil {
72+
return err
8673
}
74+
}
8775

88-
_, err = d.deploymentClient.Create(ctx, d.deployment, metav1.CreateOptions{})
89-
if err != nil {
90-
return errors.Wrapf(err, "error while calling deploymentClient.Create for %q", d.deployment.Name)
76+
if d.statefulSet != nil {
77+
if err := bootstrap(ctx, d, d.statefulSetClient, d.statefulSet.Name, d.statefulSet); err != nil {
78+
return err
9179
}
9280
}
81+
9382
return sub.Wrap(
9483
fmt.Sprintf("waiting for %d pods to be ready, timeout: %s", d.minReplicas, units.HumanDuration(d.timeout)),
9584
func() error {
@@ -98,11 +87,76 @@ func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
9887
})
9988
}
10089

90+
type appClient[S any] interface {
91+
Get(ctx context.Context, name string, opts metav1.GetOptions) (*S, error)
92+
Create(ctx context.Context, spec *S, opts metav1.CreateOptions) (*S, error)
93+
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
94+
}
95+
96+
func bootstrap[S any](ctx context.Context, d *Driver, client appClient[S], name string, spec *S) error {
97+
if _, err := client.Get(ctx, name, metav1.GetOptions{}); err != nil {
98+
if !apierrors.IsNotFound(err) {
99+
return errors.Wrapf(err, "error for bootstrap %q", name)
100+
}
101+
102+
for _, cfg := range d.configMaps {
103+
// create ConfigMap first if exists
104+
if _, err = d.configMapClient.Create(ctx, cfg, metav1.CreateOptions{}); err != nil {
105+
if !apierrors.IsAlreadyExists(err) {
106+
return errors.Wrapf(err, "error while calling configMapClient.Create for %q", cfg.Name)
107+
}
108+
109+
if _, err = d.configMapClient.Update(ctx, cfg, metav1.UpdateOptions{}); err != nil {
110+
return errors.Wrapf(err, "error while calling configMapClient.Update for %q", cfg.Name)
111+
}
112+
}
113+
}
114+
115+
if _, err = client.Create(ctx, spec, metav1.CreateOptions{}); err != nil {
116+
return errors.Wrapf(err, "error while calling Create for %q", d.deployment.Name)
117+
}
118+
}
119+
return nil
120+
}
121+
101122
func (d *Driver) wait(ctx context.Context) error {
123+
if d.deployment != nil {
124+
if err := d.waitDeployments(ctx); err != nil {
125+
return err
126+
}
127+
}
128+
129+
if d.statefulSet != nil {
130+
if err := d.waitStatefulSets(ctx); err != nil {
131+
return err
132+
}
133+
}
134+
return nil
135+
}
136+
137+
func (d *Driver) waitDeployments(ctx context.Context) error {
138+
return wait(ctx, d, d.deploymentClient, d.deployment.Name, func(s *appsv1.Deployment) error {
139+
if s.Status.ReadyReplicas < int32(d.minReplicas) {
140+
return errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, s.Status.ReadyReplicas)
141+
}
142+
return nil
143+
})
144+
}
145+
146+
func (d *Driver) waitStatefulSets(ctx context.Context) error {
147+
return wait(ctx, d, d.statefulSetClient, d.statefulSet.Name, func(s *appsv1.StatefulSet) error {
148+
if s.Status.ReadyReplicas < int32(d.minReplicas) {
149+
return errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, s.Status.ReadyReplicas)
150+
}
151+
return nil
152+
})
153+
}
154+
155+
func wait[S any](ctx context.Context, d *Driver, client appClient[S], name string, check func(*S) error) error {
102156
// TODO: use watch API
103157
var (
104158
err error
105-
depl *appsv1.Deployment
159+
spec *S
106160
)
107161

108162
timeoutChan := time.After(d.timeout)
@@ -116,31 +170,50 @@ func (d *Driver) wait(ctx context.Context) error {
116170
case <-timeoutChan:
117171
return err
118172
case <-ticker.C:
119-
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
173+
spec, err = client.Get(ctx, name, metav1.GetOptions{})
120174
if err == nil {
121-
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
175+
if err = check(spec); err == nil {
122176
return nil
123177
}
124-
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, depl.Status.ReadyReplicas)
125178
}
126179
}
127180
}
128181
}
129182

130-
func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
131-
depl, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
132-
if err != nil {
133-
// TODO: return err if err != ErrNotFound
134-
return &driver.Info{
135-
Status: driver.Inactive,
136-
}, nil
183+
func (d *Driver) Info(ctx context.Context) (_ *driver.Info, err error) {
184+
var depl *appsv1.Deployment
185+
if d.deployment != nil {
186+
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
187+
if err != nil {
188+
// TODO: return err if err != ErrNotFound
189+
return &driver.Info{
190+
Status: driver.Inactive,
191+
}, nil
192+
}
193+
if depl.Status.ReadyReplicas <= 0 {
194+
return &driver.Info{
195+
Status: driver.Stopped,
196+
}, nil
197+
}
137198
}
138-
if depl.Status.ReadyReplicas <= 0 {
139-
return &driver.Info{
140-
Status: driver.Stopped,
141-
}, nil
199+
200+
var stat *appsv1.StatefulSet
201+
if d.statefulSet != nil {
202+
stat, err = d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
203+
if err != nil {
204+
// TODO: return err if err != ErrNotFound
205+
return &driver.Info{
206+
Status: driver.Inactive,
207+
}, nil
208+
}
209+
if depl.Status.ReadyReplicas <= 0 {
210+
return &driver.Info{
211+
Status: driver.Stopped,
212+
}, nil
213+
}
142214
}
143-
pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl)
215+
216+
pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl, stat)
144217
if err != nil {
145218
return nil, err
146219
}
@@ -182,11 +255,22 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
182255
return nil
183256
}
184257

185-
if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
186-
if !apierrors.IsNotFound(err) {
187-
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
258+
if d.deployment != nil {
259+
if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
260+
if !apierrors.IsNotFound(err) {
261+
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
262+
}
188263
}
189264
}
265+
266+
if d.statefulSet != nil {
267+
if err := d.statefulSetClient.Delete(ctx, d.statefulSet.Name, metav1.DeleteOptions{}); err != nil {
268+
if !apierrors.IsNotFound(err) {
269+
return errors.Wrapf(err, "error while calling statefulSetClient.Delete for %q", d.statefulSet.Name)
270+
}
271+
}
272+
}
273+
190274
for _, cfg := range d.configMaps {
191275
if err := d.configMapClient.Delete(ctx, cfg.Name, metav1.DeleteOptions{}); err != nil {
192276
if !apierrors.IsNotFound(err) {

driver/kubernetes/factory.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
131131
d.defaultLoad = defaultLoad
132132
d.timeout = timeout
133133

134-
d.deployment, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
134+
d.deployment, d.statefulSet, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
135135
if err != nil {
136136
return nil, err
137137
}
@@ -143,20 +143,23 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
143143
return nil, err
144144
}
145145
d.deploymentClient = clients.Deployments
146+
d.statefulSetClient = clients.StatefulSets
146147
d.podClient = clients.Pods
147148
d.configMapClient = clients.ConfigMaps
148149

149150
switch loadbalance {
150151
case LoadbalanceSticky:
151152
d.podChooser = &podchooser.StickyPodChooser{
152-
Key: cfg.ContextPathHash,
153-
PodClient: d.podClient,
154-
Deployment: d.deployment,
153+
Key: cfg.ContextPathHash,
154+
PodClient: d.podClient,
155+
Deployment: d.deployment,
156+
StatefulSet: d.statefulSet,
155157
}
156158
case LoadbalanceRandom:
157159
d.podChooser = &podchooser.RandomPodChooser{
158-
PodClient: d.podClient,
159-
Deployment: d.deployment,
160+
PodClient: d.podClient,
161+
Deployment: d.deployment,
162+
StatefulSet: d.statefulSet,
160163
}
161164
}
162165
return d, nil
@@ -199,6 +202,8 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
199202
deploymentOpt.RequestsMemory = v
200203
case k == "requests.ephemeral-storage":
201204
deploymentOpt.RequestsEphemeralStorage = v
205+
case k == "persistent-volume-claim.requests.storage":
206+
deploymentOpt.RequestsPersistentStorage = v
202207
case k == "limits.cpu":
203208
deploymentOpt.LimitsCPU = v
204209
case k == "limits.memory":

driver/kubernetes/kubeclient/client.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ type DeploymentClient interface {
1717
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
1818
}
1919

20+
type StatefulSetClient interface {
21+
Get(ctx context.Context, name string, opts metav1.GetOptions) (*appsv1.StatefulSet, error)
22+
Create(ctx context.Context, deployment *appsv1.StatefulSet, opts metav1.CreateOptions) (*appsv1.StatefulSet, error)
23+
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
24+
}
25+
2026
type ConfigMapClient interface {
2127
Create(ctx context.Context, configMap *corev1.ConfigMap, opts metav1.CreateOptions) (*corev1.ConfigMap, error)
2228
Update(ctx context.Context, configMap *corev1.ConfigMap, opts metav1.UpdateOptions) (*corev1.ConfigMap, error)
@@ -29,9 +35,10 @@ type PodClient interface {
2935
}
3036

3137
type Clients struct {
32-
Deployments DeploymentClient
33-
ConfigMaps ConfigMapClient
34-
Pods PodClient
38+
Deployments DeploymentClient
39+
StatefulSets StatefulSetClient
40+
ConfigMaps ConfigMapClient
41+
Pods PodClient
3542
}
3643

3744
func New(config *rest.Config, namespace string) (*Clients, error) {
@@ -51,9 +58,10 @@ func New(config *rest.Config, namespace string) (*Clients, error) {
5158
}
5259

5360
return &Clients{
54-
Deployments: &deploymentClient{client: appsClient, namespace: namespace},
55-
ConfigMaps: &configMapClient{client: coreClient, namespace: namespace},
56-
Pods: &podClient{client: coreClient, namespace: namespace},
61+
Deployments: &deploymentClient{client: appsClient, namespace: namespace},
62+
StatefulSets: &statefulSetClient{client: appsClient, namespace: namespace},
63+
ConfigMaps: &configMapClient{client: coreClient, namespace: namespace},
64+
Pods: &podClient{client: coreClient, namespace: namespace},
5765
}, nil
5866
}
5967

@@ -110,6 +118,48 @@ func (c *deploymentClient) Delete(ctx context.Context, name string, opts metav1.
110118
Error()
111119
}
112120

121+
type statefulSetClient struct {
122+
client rest.Interface
123+
namespace string
124+
}
125+
126+
func (c *statefulSetClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*appsv1.StatefulSet, error) {
127+
result := &appsv1.StatefulSet{}
128+
err := c.client.Get().
129+
UseProtobufAsDefault().
130+
Namespace(c.namespace).
131+
Resource("statefulsets").
132+
Name(name).
133+
VersionedParams(&opts, ParameterCodec()).
134+
Do(ctx).
135+
Into(result)
136+
return result, err
137+
}
138+
139+
func (c *statefulSetClient) Create(ctx context.Context, statefulSet *appsv1.StatefulSet, opts metav1.CreateOptions) (*appsv1.StatefulSet, error) {
140+
result := &appsv1.StatefulSet{}
141+
err := c.client.Post().
142+
UseProtobufAsDefault().
143+
Namespace(c.namespace).
144+
Resource("statefulsets").
145+
VersionedParams(&opts, ParameterCodec()).
146+
Body(statefulSet).
147+
Do(ctx).
148+
Into(result)
149+
return result, err
150+
}
151+
152+
func (c *statefulSetClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
153+
return c.client.Delete().
154+
UseProtobufAsDefault().
155+
Namespace(c.namespace).
156+
Resource("statefulsets").
157+
Name(name).
158+
Body(&opts).
159+
Do(ctx).
160+
Error()
161+
}
162+
113163
type configMapClient struct {
114164
client rest.Interface
115165
namespace string

0 commit comments

Comments
 (0)