Skip to content

[Bug] postgresql-x => postgresql-x local 模式 字段 NULL 异常 #1155

@yqwoe

Description

@yqwoe

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

create table source
(
    id            BIGINT NOT NULL,
    action_type   STRING,
    action_option STRING, #-> DEFAULT NULL
    target_type   STRING,
    source_type   STRING,
    source_id     BIGINT,
    target_id     BIGINT,
    created_at    TIMESTAMP,
    updated_at    TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'postgresql-x',
      'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev?currentSchema=public',
      'username' = 'yqwoe',
      'password' = 'yqwoe900316',
      'table-name' = 'actions'
      );

create table sink
(
    id            BIGINT NOT NULL,
    action_type   STRING,
    action_option STRING, #-> DEFAULT NULL
    target_type   STRING,
    source_type   STRING,
    source_id     BIGINT,
    target_id     BIGINT,
    created_at    TIMESTAMP,
    updated_at    TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'postgresql-x',
      'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev_bak?currentSchema=public',
      'username' = 'yqwoe',
      'password' = 'yqwoe900316',
      'table-name' = 'actions',
      'sink.all-replace' = 'true'
      );

insert into sink
select id,
       action_type,
       action_option,
       target_type,
       source_type,
       source_id,
       target_id,
       created_at,
       updated_at
from source;

数据库中 action_option 字段默认为NULL,数据也为NULL

执行 local模式报错

What you expected to happen

2022-08-16 15:50:40,315 - 4287 ERROR [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) (1/1)#0] com.dtstack.chunjun.source.DtInputFormatSourceFunction:Exception happened, start to close format
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:85)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:135)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.element.column.NullColumn cannot be cast to org.apache.flink.table.data.StringData
	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	... 10 more

How to reproduce

image

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions