[FLINK-26423] Integrate log store to StoreSink#28
Conversation
9c20f08 to
700ee21
Compare
|
Sorry for being late. The current branch has conflicts with the master branch, and you can rebase master when you're free. |
9c2b019 to
efda42b
Compare
|
Rebased |
| RowType.of(new IntType())); | ||
|
|
||
| private final CommittableSerializer serializer = | ||
| new CommittableSerializer(fileSerializer, (SimpleVersionedSerializer) fileSerializer); |
There was a problem hiding this comment.
Nit: I'm not sure whether it's a good practice to pass fileSerializer as logCommittableSerializer for convenience. These two serializer types should be orthgoal.
There was a problem hiding this comment.
Let's use StringCommittableSerializer
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(GlobalCommitterOperator.class); |
There was a problem hiding this comment.
Should it beLoggerFactory.getLogger(AbstractCommitterOperator.class)?
| private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint; | ||
|
|
||
| /** The committable's serializer. */ | ||
| private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer; |
There was a problem hiding this comment.
Just curious, why not use CommittableSerializer?
There was a problem hiding this comment.
One is CommittableSerializer, one is global ManifestCommittableSerializer.
And this should be abstract class.
|
|
||
| private CommitRequestImpl(CommT committable) { | ||
| this.committable = committable; | ||
| this.state = CommitRequestState.RECEIVED; |
There was a problem hiding this comment.
Nit: since each request goes through requests.forEach(CommitRequestImpl::setSelected), there's no need to init this.state to CommitRequestState.RECEIVED
There was a problem hiding this comment.
Anyway, It needs to be initialized, and we don't want to set a null value
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.table.store.connector.sink.global; |
There was a problem hiding this comment.
Is it a good idea to put LocalCommitterOperator under the sink.global package?
There was a problem hiding this comment.
It is created because of global
| /** An operator that processes committables of a {@link Sink}. */ | ||
| public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator<Void> | ||
| implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, BoundedOneInput { | ||
| /** A {@link AbstractCommitterOperator} to process global committer. */ |
| Sink<SinkRecord> sink = logSinkProvider.createSink(); | ||
| if (sink instanceof TwoPhaseCommittingSink) { | ||
| TwoPhaseCommittingSink<SinkRecord, LogCommT> twoPhaseSink = | ||
| (TwoPhaseCommittingSink<SinkRecord, LogCommT>) logSinkProvider.createSink(); |
There was a problem hiding this comment.
Why create the sink twice?
|
|
||
| // in eventual mode, failure will result in duplicate data | ||
| FileStore fileStore = | ||
| buildFileStore( |
There was a problem hiding this comment.
It seems that buildFileStore always creates a file store with pk. Do we need to add the test that covers non-pk conditions?
There was a problem hiding this comment.
Yes, we can.
We need to refactor the FileStoreITCase, and even the Sink interface itself, which is a DataStream layer class that is more complex to build than a simple SQL can accomplish.
I create a jira, and subsequently we need to think through a problem, StoreSink exposed API should be what kind of, currently about keyed is rather confusing.
There was a problem hiding this comment.
LadyForest
left a comment
There was a problem hiding this comment.
Hi @JingsongLi, thanks for your contribution. It looks to me in general. I only left some minor comments.
LadyForest
left a comment
There was a problem hiding this comment.
Looks good to me. Thanks for your update!
…apache#28) * Enhance IcebergManifestFile to correctly handle ByteBuffer conversion and added a test for partition summary validation * Updated test for different string lengths * Updated tests for input length exceeding VARCHAR limit * Refactor IcebergConversions and IcebergManifestFile for improved ByteBuffer handling; update tests for string partition null padding * Add unit tests for IcebergConversions handling of VARCHAR types, including edge cases and concurrent access * Added comments explaining change
* [core] Add parquet write page limit parameter (apache#4632) * [flink] Fix that 'values-time' partition expire might throw NPE (apache#4646) * [spark] Support show create and describe view (apache#4645) * [test][spark] Add insert with column list test case (apache#4654) * [core] Expire partiitons add default delete num (apache#4652) * [flink] Optimizing parallelism for fixed bucekt and non-partitioned table (apache#4643) * [flink] support flink sourceIdleTime metric in ReadOperator (apache#4644) * [parquet] Fix minor format codes in parquet readers * [core] Optimize memory usage for expiring snapshots and tags (apache#4655) This closes apache#4655. * [spark] Paimon parser only resolve own supported procedures (apache#4662) * [core] Optimize overwrite commit to use CommitResult to retry (apache#4661) This closes apache#4661. * [core] Optimize drop partitions to avoid stack overflow (apache#4663) This closes apache#4663. * [hive] Make HiveMetastoreClient.addPartition thread safe (apache#4669) * [core] Support auth in REST Catalog (apache#4648) * [flink-cdc] kafka_sync_database supports table name mapping when prefix and postfix could not fit the need. (apache#4660) * [minor] Fix RESTCatalog.close should close client and refreshExecutor * [core] Introduce timeout for commit retry avoid long time loop (apache#4668) * [core] Introduce data-file.path-directory (apache#4672) * [core] Trim key field in reading, map it to value field (apache#4651) This closes apache#4651. --------- Co-authored-by: tsreaper <tsreaper96@gmail.com> * [hive][spark] Support creating external table without schema when the table already exists (apache#4638) * [hotfix] Fix flaky test of orc tests in ArrowBatchConverterTest (apache#4673) * [orc] Row group filter push down cannot work with bitmap index * [docs] add glue metastore repo for iceberg compability (apache#4674) * [core] Improve fault tolerance for data spill to disk. (apache#4675) * [doc] updated url links in documentation (apache#4679) * [hotfix] CloneAction throw more clear exception when no table in source catalog (apache#4682) * [core] Check file size after write bundle of records (apache#4685) * [doc] Fix links in sql-write * [core] Introduce RollbackToWatermarkProcedure for rollback (apache#4687) * [core] Add database API implementation in RESTCatalog (apache#4676) * [spark] Fix the build of read type in binlog table (apache#4689) * [core] Clean up invalid branch cache and not cache system table in caching catalog (apache#4681) * [core] Remove useless codes in CachingCatalog * [core] Rename BulkFormatMapping to FormatReaderMapping * [core] Rename BulkFormatMappingBuilder to FormatReaderMapping.Builder * [spark] Avoid explicitly creating catalog in PaimonTableValuedFunctions (apache#4690) * [core] Update drop Database API and remove api in URL (apache#4691) * [doc] update totalRecordCount and deltaRecordCount in understand-files.md (apache#4694) * [core] fix the issue where streaming reading of overwrite data would fail when retract type data appeared. (apache#4697) * [core] Introduce data-file.thin-mode in primary key table write (apache#4666) This closes apache#4666. * [core] Retry if snapshot commit hint failed. (apache#4701) * [flink] Replace legacy SourceFunction with v2 Source (apache#4614) This closes apache#4614. * [core] Store `cardinality` in the deletion vector meta (apache#4699) * [flink] kafka_sync_database supports different prefix and suffix for different db (apache#4704) * [core] Tolerate the NoSuchObjectException when report the partition statistic (apache#4708) * [core][spark] Enable limit pushdown and count optimization for dv table (apache#4709) * [flink] avoid using 'newHashMapWithExpectedSize' which is internal in flink (apache#4713) * [flink] Enable limit pushdown and count optimization for dv table (apache#4712) * [doc] Specific true column names for manifest files * [spark] Fix relativeBucketPath with data-file.path-directory (apache#4714) * [core] Introduce PurgeFilesProcedure to purge table (apache#4717) * [core] Remove Catalog.getTableLocation interface (apache#4718) * [core][spark] Fix create external table with schema evolution (apache#4719) * [orc] Optimize configuration creating in orc file format (apache#4716) * [core] Clean constants, caseSensitive, loader in Catalog (apache#4721) * [pom] prefer central repo for releases; limit apache-snapshots to snapshots (apache#4707) * [core] Refactor MetastoreClient methods to simplify catalog (apache#4726) * [core] Support alter database (apache#4700) * [core] Introduce Variant Data (apache#4729) * [flink] kafka_sync_database supports db whitelist and blacklist (apache#4732) * [core] Introduce CacheStats and expose ScanStats (apache#4678) * [core] Minor refactor for cache metrics * [cdc] Correct equalsIgnoreFieldId in UpdatedDataFieldsProcessFunctionBase * [cdc] add exception message for CdcRecordStoreMultiWriteOperator (apache#4734) * [core] Fix predicate literals cast in filter pushdown after schema evolution (apache#4705) * [hive] Fix sync hms partition with data-file.path-directory (apache#4735) * [flink] make warehouse in Flink action optional (apache#4728) * [core] Skip case checking in catalog (apache#4730) * [hive] Batch list tables and skip checking table exists in filesystem with hive catalog (apache#4737) * [hotfix] Remove unused SchemaEvolutionUtil methods (apache#4739) * [doc] Add doc for spark drop column with hive catalog * [dependency] Upgrade paimon shade version (apache#4740) * [doc] Document hive.metastore.disallow.incompatible.col.type.changes for Flink and Spark client * [spark] Fix load function with SparkGenericCatalog (apache#4741) * [flink] Support nested projection pushdown (apache#4667) This closes apache#4667. * [spark] Fix delete with partial non-convertible partition filter (apache#4738) * [hive] Ignore path comparison when creating external table (apache#4747) * [Parquet] Revert parquet patch apache#3883 which tries to construct column vectors like orc (apache#4745) * [parquet] parquet reader should not retrun VectorizedRowIterator for nested schema (apache#4749) * [core] Add _EXTERNAL_PATH in DataFileMeta This closes apache#4751 * [hotfix] Fix typo in NestedColumnReader and NestedPrimitiveColumnReader (apache#4752) Co-authored-by: yuzelin <zelin.yzl@alibaba-inc.com> * [core] fix parquet can not read row with last column is array. (apache#4755) * [core] Fix serializer error in 'Add _EXTERNAL_PATH in DataFileMeta' * [test] Add current version compatibility tests for manifest committable and split * [flink][cdc] Add support for retry cnt instead of busy wait and additional support to skip corrupt records in cdc writer (apache#4295) * [spark] Make show table extended compatible with spark3.4- (apache#4754) * [hotfix] Rename cdc schema change options in CdcRecordStoreWriteOperator (apache#4756) * [format] support parquet reader reading field with 'FIXED_LEN_BYTE_ARRAY' type (apache#4759) * [release] Update version to 1.1-SNAPSHOT * [core] Introduce VariantType (apache#4757) * [spark] Integrate Variant with Spark4 (apache#4764) * [core] Support read external path in DataFileMeta (apache#4761) * [core] External Path in DataFileMeta should be the file path (apache#4766) * [hotfix] Add spark3 profile in deploy_staging_jars (apache#4768) * [core] Support Table API in RESTCatalog (apache#4736) * [core] optimize the binlog table read performance (apache#4773) * [rest] Fix GetTableResponse should return fields (apache#4769) * [flink] Introduce max_two_pt for Flink lookup join (apache#4772) * [doc] Add max_two_pt to 'lookup.dynamic-partition' option * [docs] Fix typo in docs/content/pk-table&append-table (apache#4776) * [doc] Add EXTERNAL_PATH to manifest doc * [core] SortLookupStoreWriter should support empty record (apache#4777) * [core] Support alter table API (apache#4775) * [core] Optimize fileFormat discovery and avoid creating fileFormat (apache#4782) * [spark] Fix writing null struct col (apache#4787) * [docs] Use `config.yaml` for flink version >= 1.19 (apache#4784) * [core] Add check of older_than when RemoveOrphanFiles (apache#4779) * [core] Support customized tag name in Batch TagCreationMode (apache#4778) * [hotfix] Fix invalid link in Partial Update doc (apache#4789) * [core] Support to query indexes (apache#4788) * [common] Using a faster deserialization method in RoaringBitmap32 (apache#4765) * [parquet] Support using file index result to filter row ranges (apache#4780) * [doc] Fix incorrect header level in manage-branches.md * [core] Add schema validation for record-level.time-field (apache#4758) * [test] Remove the loggingTimer to fix the CDC build error (apache#4799) * [hive] Fix listTablesImpl possible timeout issue (apache#4800) * [core] Support partition API (apache#4786) * [rest] Partition methods should check table first in RESTCatalog * [rest] Remove useless fetchOptionsFromServer in RESTCatalog * [doc] Add a note to the delete branch operation to explain its behavior (apache#4803) * [parquet] Fix file index result filter the row ranges missing rowgroup offset problem (apache#4806) * [core] Introduce Partition to Catalog.listPartitions (apache#4807) * [spark] Purge file need refresh table avoid FileNotFound (apache#4809) * [iceberg] Add Iceberg database / table options as alias when generating Iceberg metadata (apache#4811) * [iceberg] Add iceberg options to docs * [spark] Bump Spark 3.5.4 (apache#4819) * [hotfix] Fix flaky test testCleanWithBranch (apache#4826) * [spark] Remove unreasonable test output (apache#4825) * [core] Refactor TagManager to remove unnecessary tag existence check (apache#4820) * [test] Disable PrimaryKeyFileStoreTableITCase.testFullCompactionChangelogProducerStreamingRandom * [hotfix] Fix typo of create_tag procedure doc (apache#4829) * [doc] Add release 1.0 doc * [common] Upgrade the RoaringBitmap version from 1.0.5 to 1.2.1 (apache#4832) * [doc] Modify master to 1.1-SNAPSHOT * [spark] SparkGenericCatalog support tag DDL (apache#4833) * [hotfix] Refactor RemoveOrphanFilesActionITCase in flink-1.18 to avoid flaky test (apache#4840) * [core] overwrite should be true when commit change log (apache#4838) * [core] remove nullable check for record-level.time-field (apache#4837) * [core] support write to the external path (apache#4770) * [core] Refactor ExternalPathProvider to generate file in DataFilePathFactory * [parquet] Merge the file index and the deletion vector and push down filtering (apache#4812) * [core] Support customize action for partition mark done. (apache#4817) * [doc] Improving the mysql-cdc.md usage Documentation (apache#4792) * [core] Introduce incremental-to-auto-tag for reading changes between auto tags and let incremental-between be tag-first (apache#4808) * [core] support null value for record-level.time-field (apache#4839) * [flink][action] add '`' to the fields of merge into action to avoid exceptions when the field name is an SQL keyword. (apache#4846) * [core] SnapshotReaderImpl.toChangesPlan should use snapshot for tags (apache#4841) * [doc] Fix names in fileindex spec * [doc] Add 2025 on NOTICE (apache#4850) * [core] Introduce nested type cast to string (apache#4848) * [core] Use cast to string to show partition in system table (apache#4852) * [flink] Fix the refresh executor not work after reopen (apache#4851) * [core] add SuccessFileTagCallback for tag creation (apache#4847) * [spark] Introduce paimon_incremental_between_timestamp and paimon_incremental_to_auto_tag tvf (apache#4855) * [refactor] Refactor LookupJoin: add logs for troubleshoot and polish DynamicPartitionLoader (apache#4828) * [hotfix][doc] Fix some typo and add a three-level catalogue (apache#4858) * [core] Support auto create tag with custom duration (apache#4857) * [flink] Flink batch delete supports partial-update.remove-record-on-sequence-group option (apache#4861) * [core] Introduce batch partition methods in catalog (apache#4866) * [core] Fix that incremental-to-auto-tag return wrong result if snapshots are expired (apache#4869) * [core] Fix retract behavior for FieldProductAgg when accumulator is null (apache#4842) * [core] Introduce catalog loader in Catalog to replace MetastoreClient (apache#4874) * [iceberg] Introduce feature to migrate table from iceberg to paimon (apache#4639) This closes apache#4639. * [core] Add UT for expire partitions with hms config metastore.partitioned-table (apache#4875) * [core] Fix remove orphan files with data file path directory (apache#4871) * [flink] Add Tests and ITCases in flink for RESTCatalog (apache#4805) * [core] Introduce PrivilegeManagerLoader to avoid hardcode FileBasedPrivilegeManager (apache#4877) * [core] Support parsing row type json without field id (apache#4876) * [log] logging specific join keys and results for lookup join (apache#4856) * [doc] Update metadata url for mysql cdc * [test] remove incorrect and confusing tests in PartialUpdateMergeFunctionTest (apache#4882) * [e2e] upgrade the flink-sql-connector-kafka version (apache#4881) * [flink] Supports debezium-bson formats of kafka data which collected from mongodb via debezium (apache#4870) * [rest] Supports global system tables (apache#4880) * [core] Do not use Collections.unmodifiableMap to avoid stack overflow (apache#4883) * [hotfix] remove the useless comment (apache#4889) * [core] Fix invalidate tables with same tableNames in other db issue (apache#4895) * [core] Add a http-report action to reporting partition done to remote servers. (apache#4862) * [rest] Add name to GetTableResponse and remove path (apache#4894) * [hotfix] fix some typos in CombinedTableCompactorSink (apache#4890) * [core] Introduce conversion from parquet type to paimon type (apache#4888) * [spark] Clean empty directory after removing orphan files (apache#4824) * [docs] update link about Apache Doris (apache#4898) * [hotfix] fix Trino version to 440 (apache#4896) * [flink] Add a action/procedure to remove unexisting files from manifests (apache#4781) * [core] Support partition API and update get table (apache#4879) * [core] Extract loadTable in CatalogUtils (apache#4904) * [flink] Introduce precommit compact for newly created files in unaware bucket tables (apache#4902) * [doc] Add doc for precommit-compact * [core] Fix that sequence group fields are mistakenly aggregated by default aggregator in partial update (apache#4897) * [core] Introduce SnapshotCommit to abstract atomically commit (apache#4911) * [core] Support view API in Rest catalog (apache#4908) * [hotfix][doc] fix the url link in document (apache#4914) * [core] Add min_partition_stats and max_partition_stats columns to manifests system table (apache#4922) * [hotfix] Modify the type conversion method (apache#4928) * [test] Fix the unstable random tests in PrimaryKeyFileStoreTableITCase (apache#4933) * [test] Fix the unstable testCloneWithSchemaEvolution (apache#4932) * [hotfix][doc] Add quotes to the partition (apache#4931) * [core] Remove unnecessary toString call in `DataFilePathFactoryTest` (apache#4924) * [core] Make FileIOLoader extends Serializable * [spark] Introduce SparkV2FilterConverter (apache#4915) * [rest] Add http conf and ExponentialHttpRetryInterceptor to handle retry In RESTCatalog (apache#4929) * [common] A FileIO API to list files iteratively (apache#4834) * [core] Make CatalogContext implements Serializable (apache#4936) * [core] Refactor the orphan clean and expire function for external paths (apache#4903) * [core] Introduce DataFilePathFactories to unify cache factories * [test] Fix the unstable testNoChangelogProducerStreamingRandom (apache#4940) * [flink] Introduce scan bounded to force bounded in streaming job (apache#4941) * [docs] Fix typo of BIG_ENDIAN (apache#4945) * [hotfix] Minor fix for FileIO.listFilesIterative * [refactor] Clean unused codes in Lock (apache#4948) * [core] Refine CommitMessage toString (apache#4950) * [rest] Refactor AuthProviders to remove credential concept (apache#4959) * [core] Support data token in RESTCatalog (apache#4944) * [core] Clear cache when deleting the snapshot (apache#4966) * [iceberg] Support skipping AWS Glue archive (apache#4962) * [hotfix] remove_orphan_files action shouldn't check table argument (table=null means clean whole database) (apache#4961) * [core] Fix NPE when retracting collect and merge-map (apache#4960) = * [hotfix] Update the maven version requirements in the readme (apache#4955) * [rest] Refactor RESTTokenFileIO to cache FileIO in static cache (apache#4965) * [hotfix] Fix flaky test AppendOnlyFileStoreTableTest#testBatchOrderWithCompaction (apache#4964) * [doc][spark] Add read metadata columns (apache#4953) * [core] Optimized iterative list implementations for FileIO (apache#4952) * [core] Remove Catalog.fileio method (apache#4973) * [core] Fix that sequence fields are mistakenly aggregated by default aggregator in AggregateMergeFunction (apache#4977) * [spark] Fix update table with char type (apache#4972) * [spark] Fix rollback not correctly identify tag or snapshot (apache#4947) * [rest] Optimize partition methods to let rest return table not partitioned (apache#4979) * [doc] Pypaimon api table_scan plan splits. (apache#4978) * [filesystem] Support Tencent COSN (apache#4854) * [core] ObjectRefresh with iterative list and batched commit (apache#4980) * [doc] Add sys prefix to procedure (apache#4981) * [core] Throw exception if increment query with rescale bucket (apache#4984) * [core] Populate more metadata to object table (apache#4987) * [hotfix] [docs] Fix cdc doc url and some typos (apache#4968) * [spark] Fallback to spark except query if increment query with rescale bucket (apache#4989) * [parquet] Refactory parquet reader using spark code. (apache#4982) * [test][flink] Add tests back in PreAggregationITCase which deleted by apache#4982 * [parquet] Parquet ColumnarBatch should return ColumnarRowIterator for nested schema * [parquet] Introduce LongIterator to Parquet RowIndexGenerator (apache#4991) * [hotfix] Fix NPE in ColumnarRowIterator.reset * [cdc] Add option to prevent logging of corrupted records (apache#4918) * [flink] Replace per record in ReadOperator to work with object reuse * [core] Refactory ColumnarRowIterator using LongIterator. (apache#4992) * Merged Yelp customizations into Paimon (apache#29) * add debezium avro format * Support debezium avro format in cdc action. * fix ut * fix * fix * fix * Recovery cdc test * add KafkaDebeziumAvroSyncTableActionITCase * add test * add avro cdc test * fix test * add doc * handle complex types in TypeUtils when converting string to Object, also enable handling comlpex types in Avro * dont use id in datafield for equals. Triggers DataField to not match and thinks is a schema change * fix DataField comparison mismatch for nested RowTypes * PRODENAB-66: add CODEOWNERS and PR template Signed-off-by: Max Falk <gfalk@yelp.com> * add owners Signed-off-by: Max Falk <gfalk@yelp.com> * [common][flink] Add support for complex types in kafka debezium avro cdc action * Fix itest and separate the code flow for complex types * Use toString instead of asText * Add Yelp files * PRODENAB-149 fix infinite loop when a field is a unparseable value using counter and log skipped messages in main * PRODENAB-149: skip unreadable records for all cdc writers Signed-off-by: Max Falk <gfalk@yelp.com> * Run mvn spotless:apply * Squash Yelpy setup and skipping unreadable rows This squashes all our previous patches to paimon into a single commit. Contains: * Yelpy repo setup: https://github.yelpcorp.com/misc/flink-lib-paimon/commit/d1512a7ad3b7f691e6875c15795efd303c0bfcf3 * Skipping unreadable rows: https://github.yelpcorp.com/misc/flink-lib-paimon/commit/11350ab57215935138c7d122e20523c84da198bb Signed-off-by: Max Falk <gfalk@yelp.com> * PRODENAB-221: Fix JSON deserialization of strings * athena cannot query the existing metadata for the iceberg compatibility mode. trying with this commit to see if it helps get athena working * squash Yelpy commits Unreadable rows: https://github.yelpcorp.com/misc/flink-lib-paimon/commit/b59f96ede555e26d326b1d156529decf37ba18f2 Deserialization of strings: https://github.yelpcorp.com/misc/flink-lib-paimon/commit/788410dd3dbd460fd89e5170a1f942193f0a63ad Signed-off-by: Max Falk <gfalk@yelp.com> * revert metadata athena format Signed-off-by: Max Falk <gfalk@yelp.com> * add missing newline Signed-off-by: Max Falk <gfalk@yelp.com> * add enum support for flink debezium format * DA-4260: skip glue archive with -Dglue.skip-archive=true * enable glue.skip-archive by default * DA-4260: move glue.skip-archive to a table option * Set the default glue.skip-archive to false * set CASCADE true, as before * Use constants instead of hardcoding values * Enhance IcebergManifestFile to correctly handle ByteBuffer conversion (apache#28) * Enhance IcebergManifestFile to correctly handle ByteBuffer conversion and added a test for partition summary validation * Updated test for different string lengths * Updated tests for input length exceeding VARCHAR limit * Refactor IcebergConversions and IcebergManifestFile for improved ByteBuffer handling; update tests for string partition null padding * Add unit tests for IcebergConversions handling of VARCHAR types, including edge cases and concurrent access * Added comments explaining change * Fixed bad merge --------- Signed-off-by: Max Falk <gfalk@yelp.com> Co-authored-by: zhuangchong <zhuang.kerwin@gmail.com> Co-authored-by: umesh <umesh@yelp.com> Co-authored-by: Max Falk <gfalk@yelp.com> Co-authored-by: Ashish Khatkar <ashish@Ashishs-Laptop.local> Co-authored-by: Ashish Khatkar <akhatkar64@gmail.com> Co-authored-by: Adel Atallah <adel@yelp.com> Co-authored-by: Sofya Irwin <sofya@yelp.com> Co-authored-by: Halit Olali <halitolali@yelp.com> Co-authored-by: jkukreja <jkukreja@yelp.com> Co-authored-by: Sina Siadat <sina@yelp.com> --------- Signed-off-by: Max Falk <gfalk@yelp.com> Co-authored-by: aiden.dong <782112163@qq.com> Co-authored-by: yuzelin <33053040+yuzelin@users.noreply.github.com> Co-authored-by: Zouxxyy <zouxinyu.zxy@alibaba-inc.com> Co-authored-by: askwang <135721692+askwang@users.noreply.github.com> Co-authored-by: HunterXHunter <cnlingmingqiang@gmail.com> Co-authored-by: herefree <841043203@qq.com> Co-authored-by: Jingsong <jingsonglee0@gmail.com> Co-authored-by: lining <lining.jln@alibaba-inc.com> Co-authored-by: JackeyLee007 <JackeyLee007@126.com> Co-authored-by: xuzifu666 <1206332514@qq.com> Co-authored-by: YeJunHao <41894543+leaves12138@users.noreply.github.com> Co-authored-by: tsreaper <tsreaper96@gmail.com> Co-authored-by: Giannis Polyzos <ipolyzos.se@gmail.com> Co-authored-by: Yujiang Zhong <42907416+zhongyujiang@users.noreply.github.com> Co-authored-by: yangjf2019 <54518670+yangjf2019@users.noreply.github.com> Co-authored-by: jiangmengmeng <1804226997@qq.com> Co-authored-by: liming.1018 <liming.1018@bytedance.com> Co-authored-by: yunfengzhou-hub <yuri.zhouyunfeng@outlook.com> Co-authored-by: WenjunMin <aitozi@apache.org> Co-authored-by: LsomeYeah <94825748+LsomeYeah@users.noreply.github.com> Co-authored-by: zyz33 <35164637+zhangyazhe@users.noreply.github.com> Co-authored-by: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com> Co-authored-by: Jiao Mingye <35512473+mxdzs0612@users.noreply.github.com> Co-authored-by: HouliangQi <neuyilan@163.com> Co-authored-by: yuzelin <zelin.yzl@alibaba-inc.com> Co-authored-by: Wenchao Wu <60921147+Stephen0421@users.noreply.github.com> Co-authored-by: Ashish Khatkar <akhatkar64@gmail.com> Co-authored-by: yinyao <144221862+RunningDB@users.noreply.github.com> Co-authored-by: Weijie Guo <reswqa@163.com> Co-authored-by: wangwj <hongli.wwj@gmail.com> Co-authored-by: xiangyu0xf <xiangyu0xf@gmail.com> Co-authored-by: Tan-JiaLiang <tanjialiang1997@gmail.com> Co-authored-by: XiaoHongbo <1346652787@qq.com> Co-authored-by: HunterXHunter <1356469429@qq.com> Co-authored-by: Lucian <131578653+Moonlight-CL@users.noreply.github.com> Co-authored-by: cxzl25 <3898450+cxzl25@users.noreply.github.com> Co-authored-by: Xuannan <suxuannan95@gmail.com> Co-authored-by: chuangchuang <lizc9@xiaopeng.com> Co-authored-by: Mingyu Chen (Rayner) <yunyou@selectdb.com> Co-authored-by: chenjian2664 <chenjian2664@gmail.com> Co-authored-by: Xiaoguang Zhu <smdsbz@qq.com> Co-authored-by: Gang Wu <ustcwg@gmail.com> Co-authored-by: Sina Siadat <siadat@gmail.com> Co-authored-by: HunterXHunter <mingqianglin1010@gmail.com> Co-authored-by: liujinhui <965147871@qq.com> Co-authored-by: Xiaoguang Zhu <zhuxiaoguang.zxg@alibaba-inc.com> Co-authored-by: Adel Atallah <2213999+atallahade@users.noreply.github.com> Co-authored-by: zhuangchong <zhuang.kerwin@gmail.com> Co-authored-by: umesh <umesh@yelp.com> Co-authored-by: Max Falk <gfalk@yelp.com> Co-authored-by: Ashish Khatkar <ashish@Ashishs-Laptop.local> Co-authored-by: Adel Atallah <adel@yelp.com> Co-authored-by: Sofya Irwin <sofya@yelp.com> Co-authored-by: Halit Olali <halitolali@yelp.com> Co-authored-by: jkukreja <jkukreja@yelp.com> Co-authored-by: Sina Siadat <sina@yelp.com>
StoreSink is a hybrid sink. We need: