Skip to content

Commit 765b1a7

Browse files
Statestore: Implement ReadStateBytes (#1265)
* Templating out Framework implementation for Pluggable State Storage * Updating commit * Saving my progress, todo: add clientcapabilities to plugin go * Updated read to be similar to invoke actions and handle streaming in protov6server * Updated go mod * updating some chunking logic * Added chunking to readstatebytes * ran linter * Fixed test to pass CI possibly * updating getstatestore * updated other tests * remove files unrelated to schema/metadata * add stub impl for state store rpcs * doc fixes + field updates to match proposed protocol * update copyright headers * finish impl for get metadata and get provider schema * add validation error for state stores in v5 provider * err msg fixes * update to use main branch commit * initial impl, no package docs or protov6server/fwserver tests * rename to initialize * add more tests * add package docs and move client capability to internal * Add readStateBytes * addressed PR comments --------- Co-authored-by: Austin Valle <austinvalle@gmail.com>
1 parent 4746af9 commit 765b1a7

File tree

8 files changed

+712
-5
lines changed

8 files changed

+712
-5
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright IBM Corp. 2021, 2025
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package fwserver
5+
6+
import (
7+
"context"
8+
9+
"github.com/hashicorp/terraform-plugin-framework/diag"
10+
"github.com/hashicorp/terraform-plugin-framework/internal/logging"
11+
"github.com/hashicorp/terraform-plugin-framework/statestore"
12+
)
13+
14+
type ReadStateBytesRequest struct {
15+
StateID string
16+
StateStore statestore.StateStore
17+
}
18+
19+
type ReadStateBytesResponse struct {
20+
StateBytes []byte
21+
Diagnostics diag.Diagnostics
22+
}
23+
24+
// ReadStateBytes implements the framework server ReadStateBytes RPC.
25+
func (s *Server) ReadStateBytes(ctx context.Context, req *ReadStateBytesRequest, resp *ReadStateBytesResponse) {
26+
if req == nil {
27+
return
28+
}
29+
30+
if stateStoreWithConfigure, ok := req.StateStore.(statestore.StateStoreWithConfigure); ok {
31+
logging.FrameworkTrace(ctx, "StateStore implements StateStoreWithConfigure")
32+
33+
configureReq := statestore.ConfigureRequest{
34+
StateStoreData: s.StateStoreConfigureData.StateStoreConfigureData,
35+
}
36+
configureResp := statestore.ConfigureResponse{}
37+
38+
logging.FrameworkTrace(ctx, "Calling provider defined StateStore Configure")
39+
stateStoreWithConfigure.Configure(ctx, configureReq, &configureResp)
40+
logging.FrameworkTrace(ctx, "Called provider defined StateStore Configure")
41+
42+
resp.Diagnostics.Append(configureResp.Diagnostics...)
43+
44+
if resp.Diagnostics.HasError() {
45+
return
46+
}
47+
}
48+
49+
readReq := statestore.ReadRequest{
50+
StateID: req.StateID,
51+
}
52+
53+
readResp := statestore.ReadResponse{}
54+
55+
logging.FrameworkTrace(ctx, "Calling provider defined StateStore Read")
56+
req.StateStore.Read(ctx, readReq, &readResp)
57+
logging.FrameworkTrace(ctx, "Called provider defined StateStore Read")
58+
59+
resp.Diagnostics.Append(readResp.Diagnostics...)
60+
resp.StateBytes = readResp.StateBytes
61+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright IBM Corp. 2021, 2025
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package fwserver_test
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"testing"
10+
11+
"github.com/google/go-cmp/cmp"
12+
"github.com/google/go-cmp/cmp/cmpopts"
13+
"github.com/hashicorp/terraform-plugin-framework/diag"
14+
"github.com/hashicorp/terraform-plugin-framework/internal/fwserver"
15+
"github.com/hashicorp/terraform-plugin-framework/internal/testing/testprovider"
16+
"github.com/hashicorp/terraform-plugin-framework/statestore"
17+
)
18+
19+
func TestServerReadStateBytesResource(t *testing.T) {
20+
t.Parallel()
21+
22+
testCases := map[string]struct {
23+
server *fwserver.Server
24+
request *fwserver.ReadStateBytesRequest
25+
expectedResponse *fwserver.ReadStateBytesResponse
26+
}{
27+
"success-with-zero-results": {
28+
server: &fwserver.Server{
29+
Provider: &testprovider.Provider{},
30+
},
31+
request: &fwserver.ReadStateBytesRequest{
32+
StateStore: &testprovider.StateStore{
33+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
34+
resp.StateBytes = []byte{}
35+
},
36+
},
37+
StateID: "test_id",
38+
},
39+
expectedResponse: &fwserver.ReadStateBytesResponse{
40+
StateBytes: []byte{},
41+
},
42+
},
43+
"success-with-nil-results": {
44+
server: &fwserver.Server{
45+
Provider: &testprovider.Provider{},
46+
},
47+
request: &fwserver.ReadStateBytesRequest{
48+
StateStore: &testprovider.StateStore{
49+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
50+
resp.StateBytes = nil
51+
},
52+
},
53+
StateID: "test_id",
54+
},
55+
expectedResponse: &fwserver.ReadStateBytesResponse{
56+
StateBytes: nil,
57+
},
58+
},
59+
"success-with-data": {
60+
server: &fwserver.Server{
61+
Provider: &testprovider.Provider{},
62+
},
63+
request: &fwserver.ReadStateBytesRequest{
64+
StateStore: &testprovider.StateStore{
65+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
66+
resp.StateBytes = []byte("test-data")
67+
},
68+
},
69+
StateID: "test_id",
70+
},
71+
expectedResponse: &fwserver.ReadStateBytesResponse{
72+
StateBytes: []byte("test-data"),
73+
},
74+
},
75+
"success-with-configure": {
76+
server: &fwserver.Server{
77+
StateStoreConfigureData: fwserver.StateStoreConfigureData{
78+
StateStoreConfigureData: "test-statestore-configure-value",
79+
},
80+
Provider: &testprovider.Provider{},
81+
},
82+
request: &fwserver.ReadStateBytesRequest{
83+
StateStore: &testprovider.StateStoreWithConfigure{
84+
StateStore: &testprovider.StateStore{
85+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
86+
resp.StateBytes = []byte("test-data")
87+
},
88+
},
89+
ConfigureMethod: func(ctx context.Context, req statestore.ConfigureRequest, resp *statestore.ConfigureResponse) {
90+
stateStoreData, ok := req.StateStoreData.(string)
91+
92+
if !ok {
93+
resp.Diagnostics.AddError(
94+
"Unexpected ConfigureRequest.StateStoreData",
95+
fmt.Sprintf("Expected string, got: %T", req.StateStoreData),
96+
)
97+
return
98+
}
99+
100+
if stateStoreData != "test-statestore-configure-value" {
101+
resp.Diagnostics.AddError(
102+
"Unexpected ConfigureRequest.StateStoreData",
103+
fmt.Sprintf("Expected test-statestore-configure-value, got: %q", stateStoreData),
104+
)
105+
}
106+
},
107+
},
108+
StateID: "test_id",
109+
},
110+
expectedResponse: &fwserver.ReadStateBytesResponse{
111+
StateBytes: []byte("test-data"),
112+
},
113+
},
114+
"empty-state-id": {
115+
server: &fwserver.Server{
116+
Provider: &testprovider.Provider{},
117+
},
118+
request: &fwserver.ReadStateBytesRequest{
119+
StateStore: &testprovider.StateStore{
120+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
121+
resp.StateBytes = []byte{}
122+
},
123+
},
124+
StateID: "",
125+
},
126+
expectedResponse: &fwserver.ReadStateBytesResponse{
127+
StateBytes: []byte{},
128+
},
129+
},
130+
"with-diagnostics": {
131+
server: &fwserver.Server{
132+
Provider: &testprovider.Provider{},
133+
},
134+
request: &fwserver.ReadStateBytesRequest{
135+
StateStore: &testprovider.StateStore{
136+
ReadMethod: func(ctx context.Context, req statestore.ReadRequest, resp *statestore.ReadResponse) {
137+
resp.StateBytes = []byte("test-data")
138+
resp.Diagnostics.AddWarning("Test Warning", "This is a test warning")
139+
},
140+
},
141+
StateID: "test_id",
142+
},
143+
expectedResponse: &fwserver.ReadStateBytesResponse{
144+
StateBytes: []byte("test-data"),
145+
Diagnostics: diag.Diagnostics{
146+
diag.NewWarningDiagnostic("Test Warning", "This is a test warning"),
147+
},
148+
},
149+
},
150+
}
151+
152+
for name, testCase := range testCases {
153+
t.Run(name, func(t *testing.T) {
154+
t.Parallel()
155+
156+
response := &fwserver.ReadStateBytesResponse{}
157+
testCase.server.ReadStateBytes(context.Background(), testCase.request, response)
158+
159+
opts := cmp.Options{
160+
cmpopts.EquateEmpty(),
161+
}
162+
163+
if diff := cmp.Diff(response, testCase.expectedResponse, opts...); diff != "" {
164+
t.Errorf("unexpected difference: %s", diff)
165+
}
166+
})
167+
}
168+
}

internal/proto6server/serve.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@ func (s *Server) UnlockState(ctx context.Context, req *tfprotov6.UnlockStateRequ
7272
panic("UnlockState not implemented")
7373
}
7474

75-
func (s *Server) ReadStateBytes(ctx context.Context, req *tfprotov6.ReadStateBytesRequest) (*tfprotov6.ReadStateBytesStream, error) {
76-
// TODO: Implement in a separate PR for just ReadStateBytes
77-
panic("ReadStateBytes not implemented")
78-
}
79-
8075
func (s *Server) WriteStateBytes(ctx context.Context, req *tfprotov6.WriteStateBytesStream) (*tfprotov6.WriteStateBytesResponse, error) {
8176
// TODO: Implement in a separate PR for just WriteStateBytes
8277
panic("WriteStateBytes not implemented")
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright IBM Corp. 2021, 2025
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package proto6server
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"errors"
10+
"fmt"
11+
"io"
12+
13+
"github.com/hashicorp/terraform-plugin-framework/internal/fwserver"
14+
"github.com/hashicorp/terraform-plugin-framework/internal/logging"
15+
"github.com/hashicorp/terraform-plugin-framework/internal/toproto6"
16+
"github.com/hashicorp/terraform-plugin-go/tfprotov6"
17+
)
18+
19+
func (s *Server) ReadStateBytes(ctx context.Context, proto6Req *tfprotov6.ReadStateBytesRequest) (*tfprotov6.ReadStateBytesStream, error) {
20+
ctx = s.registerContext(ctx)
21+
ctx = logging.InitContext(ctx)
22+
23+
fwResp := &tfprotov6.ReadStateBytesStream{}
24+
25+
statestore, diags := s.FrameworkServer.StateStore(ctx, proto6Req.TypeName)
26+
27+
if diags.HasError() {
28+
return &tfprotov6.ReadStateBytesStream{
29+
Chunks: func(push func(chunk tfprotov6.ReadStateByteChunk) bool) {
30+
push(tfprotov6.ReadStateByteChunk{
31+
Diagnostics: toproto6.Diagnostics(ctx, diags),
32+
})
33+
},
34+
}, nil
35+
}
36+
37+
readStateBytesReq := &fwserver.ReadStateBytesRequest{
38+
StateID: proto6Req.StateID,
39+
StateStore: statestore,
40+
}
41+
readStateBytesResp := &fwserver.ReadStateBytesResponse{}
42+
43+
s.FrameworkServer.ReadStateBytes(ctx, readStateBytesReq, readStateBytesResp)
44+
45+
if readStateBytesResp.Diagnostics.HasError() {
46+
return &tfprotov6.ReadStateBytesStream{
47+
Chunks: func(push func(chunk tfprotov6.ReadStateByteChunk) bool) {
48+
push(tfprotov6.ReadStateByteChunk{
49+
Diagnostics: toproto6.Diagnostics(ctx, readStateBytesResp.Diagnostics),
50+
})
51+
},
52+
}, nil
53+
}
54+
55+
// If ConfigureStateStore isn't called prior to ReadStateBytes
56+
if int(s.FrameworkServer.StateStoreConfigureData.ServerCapabilities.ChunkSize) == 0 {
57+
return &tfprotov6.ReadStateBytesStream{
58+
Chunks: func(push func(chunk tfprotov6.ReadStateByteChunk) bool) {
59+
push(tfprotov6.ReadStateByteChunk{
60+
Diagnostics: []*tfprotov6.Diagnostic{
61+
{
62+
Severity: tfprotov6.DiagnosticSeverityError,
63+
Summary: "Error reading state",
64+
Detail: fmt.Sprintf("No chunk size received from Terraform while reading state data for %s. This is a bug and should be reported.",
65+
proto6Req.StateID,
66+
),
67+
},
68+
},
69+
})
70+
},
71+
}, nil
72+
}
73+
74+
chunkSize := int(s.FrameworkServer.StateStoreConfigureData.ServerCapabilities.ChunkSize)
75+
76+
reader := bytes.NewReader(readStateBytesResp.StateBytes)
77+
totalLength := reader.Size()
78+
rangeStart := 0
79+
80+
fwResp.Chunks = func(yield func(tfprotov6.ReadStateByteChunk) bool) {
81+
for {
82+
readBytes := make([]byte, chunkSize)
83+
byteCount, err := reader.Read(readBytes)
84+
if err != nil && !errors.Is(err, io.EOF) {
85+
chunkWithDiag := tfprotov6.ReadStateByteChunk{
86+
Diagnostics: []*tfprotov6.Diagnostic{
87+
{
88+
Severity: tfprotov6.DiagnosticSeverityError,
89+
Summary: "Error reading state",
90+
Detail: fmt.Sprintf("An unexpected error occurred while reading state data for %s: %s",
91+
proto6Req.StateID,
92+
err,
93+
),
94+
},
95+
},
96+
}
97+
if !yield(chunkWithDiag) {
98+
return
99+
}
100+
}
101+
102+
if byteCount == 0 {
103+
return
104+
}
105+
106+
chunk := tfprotov6.ReadStateByteChunk{
107+
StateByteChunk: tfprotov6.StateByteChunk{
108+
Bytes: readBytes[:byteCount],
109+
TotalLength: totalLength,
110+
Range: tfprotov6.StateByteRange{
111+
Start: int64(rangeStart),
112+
End: int64(rangeStart + byteCount - 1),
113+
},
114+
},
115+
}
116+
if !yield(chunk) {
117+
return
118+
}
119+
120+
rangeStart += byteCount
121+
}
122+
}
123+
124+
return fwResp, nil
125+
}

0 commit comments

Comments
 (0)