3636import com .mongodb .client .result .UpdateResult ;
3737import dev .responsive .kafka .internal .db .MongoKVFlushManager ;
3838import dev .responsive .kafka .internal .db .RemoteKVTable ;
39+ import dev .responsive .kafka .internal .utils .Iterators ;
3940import java .time .Instant ;
4041import java .util .Date ;
4142import java .util .concurrent .ConcurrentHashMap ;
4243import java .util .concurrent .ConcurrentMap ;
4344import java .util .concurrent .TimeUnit ;
4445import org .apache .kafka .common .utils .Bytes ;
46+ import org .apache .kafka .streams .KeyValue ;
4547import org .apache .kafka .streams .state .KeyValueIterator ;
4648import org .bson .codecs .configuration .CodecProvider ;
4749import org .bson .codecs .configuration .CodecRegistry ;
@@ -105,10 +107,10 @@ public String name() {
105107 @ Override
106108 public MongoKVFlushManager init (final int kafkaPartition ) {
107109 final KVMetadataDoc metaDoc = metadata .findOneAndUpdate (
108- Filters .eq (KVMetadataDoc .PARTITION , kafkaPartition ),
110+ Filters .eq (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition ),
109111 Updates .combine (
110- Updates .setOnInsert (KVMetadataDoc .PARTITION , kafkaPartition ),
111- Updates .setOnInsert (KVMetadataDoc .PARTITION , kafkaPartition ),
112+ Updates .setOnInsert (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition ),
113+ Updates .setOnInsert (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition ),
112114 Updates .setOnInsert (KVMetadataDoc .OFFSET , NO_COMMITTED_OFFSET ),
113115 Updates .inc (KVMetadataDoc .EPOCH , 1 ) // will set the value to 1 if it doesn't exist
114116 ),
@@ -129,7 +131,7 @@ public MongoKVFlushManager init(final int kafkaPartition) {
129131
130132 @ Override
131133 public byte [] get (final int kafkaPartition , final Bytes key , final long minValidTs ) {
132- final KVDoc v = docs .find (Filters .eq (KVDoc .ID , key .get ())).first ();
134+ final KVDoc v = docs .find (Filters .eq (KVDoc .KEY , key .get ())).first ();
133135 return v == null ? null : v .getValue ();
134136 }
135137
@@ -140,12 +142,20 @@ public KeyValueIterator<Bytes, byte[]> range(
140142 final Bytes to ,
141143 final long minValidTs
142144 ) {
143- throw new UnsupportedOperationException ();
145+ final Iterable <KVDoc > results = docs .find (Filters .and (
146+ Filters .eq (KVDoc .KAFKA_PARTITION , kafkaPartition ),
147+ Filters .gte (KVDoc .KEY , from .get ()),
148+ Filters .lte (KVDoc .KEY , to .get ())
149+ ));
150+ return Iterators .kv (results .iterator (), MongoKVTable ::extractKeyValue );
144151 }
145152
146153 @ Override
147154 public KeyValueIterator <Bytes , byte []> all (final int kafkaPartition , final long minValidTs ) {
148- throw new UnsupportedOperationException ();
155+ final Iterable <KVDoc > results = docs .find (
156+ Filters .eq (KVDoc .KAFKA_PARTITION , kafkaPartition )
157+ );
158+ return Iterators .kv (results .iterator (), MongoKVTable ::extractKeyValue );
149159 }
150160
151161 @ Override
@@ -158,7 +168,8 @@ public WriteModel<KVDoc> insert(
158168 final long epoch = kafkaPartitionToEpoch .get (kafkaPartition );
159169 return new UpdateOneModel <>(
160170 Filters .and (
161- Filters .eq (KVDoc .ID , key .get ()),
171+ Filters .eq (KVDoc .KEY , key .get ()),
172+ Filters .eq (KVDoc .KAFKA_PARTITION , kafkaPartition ),
162173 Filters .lte (KVDoc .EPOCH , epoch )
163174 ),
164175 Updates .combine (
@@ -175,7 +186,8 @@ public WriteModel<KVDoc> delete(final int kafkaPartition, final Bytes key) {
175186 final long epoch = kafkaPartitionToEpoch .get (kafkaPartition );
176187 return new UpdateOneModel <>(
177188 Filters .and (
178- Filters .eq (KVDoc .ID , key .get ()),
189+ Filters .eq (KVDoc .KEY , key .get ()),
190+ Filters .eq (KVDoc .KAFKA_PARTITION , kafkaPartition ),
179191 Filters .lte (KVDoc .EPOCH , epoch )
180192 ),
181193 Updates .combine (
@@ -190,7 +202,7 @@ public WriteModel<KVDoc> delete(final int kafkaPartition, final Bytes key) {
190202 @ Override
191203 public long fetchOffset (final int kafkaPartition ) {
192204 final KVMetadataDoc result = metadata .find (
193- Filters .eq (KVMetadataDoc .PARTITION , kafkaPartition )
205+ Filters .eq (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition )
194206 ).first ();
195207 if (result == null ) {
196208 throw new IllegalStateException ("Expected to find metadata row" );
@@ -203,7 +215,7 @@ public UpdateResult setOffset(final int kafkaPartition, final long offset) {
203215
204216 return metadata .updateOne (
205217 Filters .and (
206- Filters .eq (KVMetadataDoc .PARTITION , kafkaPartition ),
218+ Filters .eq (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition ),
207219 Filters .lte (KVMetadataDoc .EPOCH , epoch )
208220 ),
209221 Updates .combine (
@@ -219,7 +231,7 @@ public long localEpoch(final int kafkaPartition) {
219231
220232 public long fetchEpoch (final int kafkaPartition ) {
221233 final KVMetadataDoc result = metadata .find (
222- Filters .eq (KVMetadataDoc .PARTITION , kafkaPartition )
234+ Filters .eq (KVMetadataDoc .KAFKA_PARTITION , kafkaPartition )
223235 ).first ();
224236 if (result == null ) {
225237 throw new IllegalStateException ("Expected to find metadata row" );
@@ -233,4 +245,11 @@ public long approximateNumEntries(final int kafkaPartition) {
233245 return 0 ;
234246 }
235247
248+ private static KeyValue <Bytes , byte []> extractKeyValue (final KVDoc row ) {
249+ return new KeyValue <>(
250+ Bytes .wrap (row .getKey ()),
251+ row .getValue ()
252+ );
253+ }
254+
236255}
0 commit comments