3535 import io .pixelsdb .pixels .common .physical .StorageFactory ;
3636 import io .pixelsdb .pixels .common .utils .ConfigFactory ;
3737 import io .pixelsdb .pixels .common .utils .DateUtil ;
38+ import io .pixelsdb .pixels .common .utils .IndexUtils ;
3839 import io .pixelsdb .pixels .common .utils .RetinaUtils ;
3940 import io .pixelsdb .pixels .core .PixelsWriter ;
4041 import io .pixelsdb .pixels .core .TypeDescription ;
@@ -101,14 +102,14 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me
101102
102103 // 1. Calculate Primary Key and Bucket ID
103104 ByteString pkByteString = calculatePrimaryKeyBytes (colsInLine );
104- // Assume BucketCache has the necessary method and configuration
105- int bucketId = RetinaUtils .getBucketIdFromByteBuffer ( pkByteString );
106- VnodeIdentifier vnodeIdentifier = RetinaUtils . getInstance (). getVnodeIdentifierFromBucketId ( bucketId );
105+ int retinaBucketId = RetinaUtils . getBucketIdFromByteBuffer ( pkByteString );
106+ VnodeIdentifier vnodeIdentifier = RetinaUtils .getInstance (). getVnodeIdentifierFromBucketId ( retinaBucketId );
107+ int indexBucketId = IndexUtils . getBucketIdFromByteBuffer ( pkByteString );
107108 PerVirtualNodeWriter retinaNodeWriter = retinaWriters .computeIfAbsent (vnodeIdentifier , id ->
108109 {
109110 try
110111 {
111- return initializeRetinaWriter (bucketId );
112+ return initializeRetinaWriter (retinaBucketId );
112113 } catch (Exception e )
113114 {
114115 throw new RuntimeException ("Failed to initialize writer for bucket " + id , e );
@@ -122,7 +123,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me
122123 try
123124 {
124125 // 4. Update Index Entry
125- updateIndexEntry (retinaNodeWriter , pkByteString );
126+ updateIndexEntry (retinaNodeWriter , pkByteString , indexBucketId );
126127
127128 // 5. Check and Flush Row Batch
128129 if (retinaNodeWriter .rowBatch .size >= retinaNodeWriter .rowBatch .getMaxSize ())
@@ -222,7 +223,7 @@ private ByteString calculatePrimaryKeyBytes(String[] colsInLine)
222223 return ByteString .copyFrom ((ByteBuffer ) indexKeyBuffer .rewind ());
223224 }
224225
225- private void updateIndexEntry (PerVirtualNodeWriter bucketWriter , ByteString pkByteString ) throws IndexException
226+ private void updateIndexEntry (PerVirtualNodeWriter bucketWriter , ByteString pkByteString , int indexBucketId ) throws IndexException
226227 {
227228 IndexProto .PrimaryIndexEntry .Builder builder = IndexProto .PrimaryIndexEntry .newBuilder ();
228229 builder .getIndexKeyBuilder ()
@@ -237,7 +238,7 @@ private void updateIndexEntry(PerVirtualNodeWriter bucketWriter, ByteString pkBy
237238 .setFileId (bucketWriter .currFile .getId ())
238239 .setRgRowOffset (bucketWriter .rgRowOffset ++);
239240
240- bucketWriter .indexEntries .add (builder .build ());
241+ bucketWriter .indexBucketQueues [ indexBucketId ] .add (builder .build ());
241242 }
242243
243244 private void flushRowBatch (PerVirtualNodeWriter bucketWriter ) throws IOException , IndexException
@@ -252,10 +253,18 @@ private void flushRowBatch(PerVirtualNodeWriter bucketWriter) throws IOException
252253 bucketWriter .prevRgId = bucketWriter .rgId ;
253254 }
254255
255- // Push index entries to the corresponding IndexService (determined by targetNode address)
256- bucketWriter .indexService .putPrimaryIndexEntries (index .getTableId (), index .getId (), bucketWriter .indexEntries , bucketWriter .option );
257- bucketWriter .indexService .flushIndexEntriesOfFile (index .getTableId (), index .getId (),bucketWriter .currFile .getId (), true , bucketWriter .option );
258- bucketWriter .indexEntries .clear ();
256+ for (int i = 0 ; i < bucketWriter .totalBuckets ; i ++)
257+ {
258+ List <IndexProto .PrimaryIndexEntry > queue = bucketWriter .indexBucketQueues [i ];
259+ if (!queue .isEmpty ())
260+ {
261+ IndexOption option = bucketWriter .indexOptions [i ];
262+ bucketWriter .indexService .putPrimaryIndexEntries (index .getTableId (), index .getId (), queue , option );
263+ queue .clear ();
264+ }
265+ }
266+ bucketWriter .indexService .flushIndexEntriesOfFile (index .getTableId (), index .getId (),
267+ bucketWriter .currFile .getId (), true , bucketWriter .defaultIndexOption );
259268 }
260269
261270 private void closePixelsFile (PerVirtualNodeWriter bucketWriter ) throws IOException , IndexException
@@ -279,13 +288,16 @@ private class PerVirtualNodeWriter
279288 int prevRgId ;
280289 int rowCounter ;
281290 int vNodeId ;
282- IndexOption option ;
283291 NodeProto .NodeInfo targetNode ;
284- List <IndexProto .PrimaryIndexEntry > indexEntries = new ArrayList <>();
285292 VectorizedRowBatch rowBatch ;
286293 IndexService indexService ;
287294 RowIdAllocator rowIdAllocator ;
288295
296+ private final IndexOption defaultIndexOption ;
297+ private final int totalBuckets ;
298+ private final List <IndexProto .PrimaryIndexEntry >[] indexBucketQueues ;
299+ private final IndexOption [] indexOptions ;
300+
289301 public PerVirtualNodeWriter (PixelsWriter writer , File file , Path path , NodeProto .NodeInfo node , int vNodeId )
290302 {
291303 this .pixelsWriter = writer ;
@@ -301,15 +313,17 @@ public PerVirtualNodeWriter(PixelsWriter writer, File file, Path path, NodeProto
301313 this .indexService = indexServices .computeIfAbsent (node .getAddress (), nodeInfo ->
302314 RPCIndexService .CreateInstance (nodeInfo , indexServerPort ));
303315 this .rowIdAllocator = new RowIdAllocator (index .getTableId (), maxRowNum , this .indexService );
304- initIndexOption ();
305- }
306-
307- private void initIndexOption ()
308- {
309- this .option = IndexOption .builder ()
310- .vNodeId (this .vNodeId )
311- .build ();
316+ this .totalBuckets = IndexUtils .getInstance ().getBucketNum ();
317+ this .indexBucketQueues = new List [totalBuckets ];
318+ this .indexOptions = new IndexOption [totalBuckets ];
319+ this .defaultIndexOption = new IndexOption ();
320+ for (int i = 0 ; i < totalBuckets ; i ++)
321+ {
322+ this .indexBucketQueues [i ] = new ArrayList <>();
323+ this .indexOptions [i ] = IndexOption .builder ()
324+ .vNodeId (i )
325+ .build ();
326+ }
312327 }
313-
314328 }
315329 }
0 commit comments