1+ /*
2+ * Copyright 2024 Responsive Computing, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package dev .responsive .kafka .internal .db .mongo ;
18+
19+ import static dev .responsive .kafka .api .config .ResponsiveConfig .STORAGE_HOSTNAME_CONFIG ;
20+ import static org .hamcrest .MatcherAssert .assertThat ;
21+
22+ import com .mongodb .client .MongoClient ;
23+ import dev .responsive .kafka .api .config .StorageBackend ;
24+ import dev .responsive .kafka .internal .db .partitioning .WindowSegmentPartitioner ;
25+ import dev .responsive .kafka .internal .utils .SessionUtil ;
26+ import dev .responsive .kafka .internal .utils .WindowedKey ;
27+ import dev .responsive .kafka .testutils .ResponsiveConfigParam ;
28+ import dev .responsive .kafka .testutils .ResponsiveExtension ;
29+ import java .util .ArrayList ;
30+ import java .util .Map ;
31+ import org .apache .kafka .common .utils .Bytes ;
32+ import org .apache .kafka .streams .KeyValue ;
33+ import org .hamcrest .Matchers ;
34+ import org .junit .jupiter .api .BeforeEach ;
35+ import org .junit .jupiter .api .Test ;
36+ import org .junit .jupiter .api .TestInfo ;
37+ import org .junit .jupiter .api .extension .RegisterExtension ;
38+
39+ class MongoWindowTableTest {
40+
41+ @ RegisterExtension
42+ public static final ResponsiveExtension EXT = new ResponsiveExtension (StorageBackend .MONGO_DB );
43+ private static final CollectionCreationOptions UNSHARDED = new CollectionCreationOptions (
44+ false ,
45+ 0
46+ );
47+ private static final byte [] DEFAULT_VALUE = new byte [] {1 };
48+
49+ private String name ;
50+ private MongoClient client ;
51+
52+ @ BeforeEach
53+ public void before (
54+ final TestInfo info ,
55+ @ ResponsiveConfigParam final Map <String , Object > props
56+ ) {
57+ name = info .getDisplayName ().replace ("()" , "" );
58+
59+ final String mongoConnection = (String ) props .get (STORAGE_HOSTNAME_CONFIG );
60+ client = SessionUtil .connect (mongoConnection , null , null );
61+ }
62+
63+ @ Test
64+ public void shouldSucceedSimpleSetGet () {
65+ // Given:
66+ final WindowSegmentPartitioner partitioner = new WindowSegmentPartitioner (10_000L , 1_000L );
67+ final var segment = partitioner .segmenter ().activeSegments (0 , 100 ).get (0 );
68+
69+ final MongoWindowedTable table = new MongoWindowedTable (client , name , partitioner ,
70+ false , UNSHARDED
71+ );
72+ final var flushManager = table .init (0 );
73+ flushManager .updateOffsetAndStreamTime (0 , 100 );
74+ flushManager .createSegment (segment );
75+
76+ // When:
77+ final var byteKey = Bytes .wrap ("key" .getBytes ());
78+ var writer = flushManager .createWriter (segment );
79+ writer .insert (
80+ new WindowedKey (byteKey , 0 ),
81+ DEFAULT_VALUE ,
82+ table .localEpoch (0 )
83+ );
84+ writer .flush ();
85+
86+ // Then:
87+ var value = table .fetch (0 , byteKey , 0 );
88+ assertThat (value , Matchers .equalTo (DEFAULT_VALUE ));
89+ value = table .fetch (0 , byteKey , 100 );
90+ assertThat (value , Matchers .nullValue ());
91+ value = table .fetch (0 , Bytes .wrap ("other" .getBytes ()), 0 );
92+ assertThat (value , Matchers .nullValue ());
93+ }
94+
95+ @ Test
96+ public void shouldSucceedRangeSetGet () {
97+ // Given:
98+ final WindowSegmentPartitioner partitioner = new WindowSegmentPartitioner (10_000L , 1_000L );
99+ final var segment = partitioner .segmenter ().activeSegments (0 , 100 ).get (0 );
100+
101+ final MongoWindowedTable table = new MongoWindowedTable (client , name , partitioner ,
102+ false , UNSHARDED
103+ );
104+ final var flushManager = table .init (0 );
105+ flushManager .updateOffsetAndStreamTime (0 , 6_000 );
106+ flushManager .createSegment (segment );
107+
108+ // When:
109+ final var byteKey = Bytes .wrap ("key" .getBytes ());
110+ final var windowedKey1 = new WindowedKey (byteKey , 500 );
111+ final var windowedKey2 = new WindowedKey (byteKey , 5_000 );
112+ var writer = flushManager .createWriter (segment );
113+ writer .insert (
114+ windowedKey1 ,
115+ DEFAULT_VALUE ,
116+ table .localEpoch (0 )
117+ );
118+ writer .insert (
119+ windowedKey2 ,
120+ DEFAULT_VALUE ,
121+ table .localEpoch (0 )
122+ );
123+ writer .flush ();
124+
125+ // Then:
126+ var it = table .fetch (0 , byteKey , 100 , 6_000 );
127+ var kvs = new ArrayList <KeyValue <WindowedKey , byte []>>();
128+ it .forEachRemaining (kvs ::add );
129+
130+ assertThat (kvs , Matchers .hasSize (2 ));
131+ assertThat (kvs .get (0 ).key .key , Matchers .equalTo (windowedKey1 .key ));
132+ assertThat (kvs .get (0 ).key .windowStartMs , Matchers .equalTo (windowedKey1 .windowStartMs ));
133+ assertThat (kvs .get (0 ).value , Matchers .equalTo (DEFAULT_VALUE ));
134+
135+ assertThat (kvs .get (1 ).key .key , Matchers .equalTo (windowedKey2 .key ));
136+ assertThat (kvs .get (1 ).key .windowStartMs , Matchers .equalTo (windowedKey2 .windowStartMs ));
137+ assertThat (kvs .get (1 ).value , Matchers .equalTo (DEFAULT_VALUE ));
138+ }
139+ }
0 commit comments