Skip to content

Commit fb3d2c6

Browse files
authored
add compressionType,enableChunking for pulsar client , upgrade dependencies for security (#5191)
* add support for listener name which is described in https://pulsar.apache.org/docs/next/concepts-multiple-advertised-listeners/ * upgrade for CVE-2023-20883 CVE-2022-1471 CVE-2023-20860 * add enableChunking * downgrade pulsar client version * fix * 增加压缩算法
1 parent 3439c48 commit fb3d2c6

File tree

7 files changed

+84
-11
lines changed

7 files changed

+84
-11
lines changed

admin/pom.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<dependency>
3737
<groupId>org.springframework.boot</groupId>
3838
<artifactId>spring-boot-dependencies</artifactId>
39-
<version>2.5.4</version>
39+
<version>2.5.15</version>
4040
<type>pom</type>
4141
<scope>import</scope>
4242
</dependency>
@@ -48,7 +48,12 @@
4848
<dependency>
4949
<groupId>io.ebean</groupId>
5050
<artifactId>ebean</artifactId>
51-
<version>11.41.1</version>
51+
<version>11.45.1</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.yaml</groupId>
55+
<artifactId>snakeyaml</artifactId>
56+
<version>2.0</version>
5257
</dependency>
5358
</dependencies>
5459
</dependencyManagement>

client-adapter/launcher/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<dependency>
1818
<groupId>org.springframework.boot</groupId>
1919
<artifactId>spring-boot-dependencies</artifactId>
20-
<version>2.5.4</version>
20+
<version>2.5.15</version>
2121
<type>pom</type>
2222
<scope>import</scope>
2323
</dependency>

client-adapter/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
<dependency>
118118
<groupId>org.springframework.boot</groupId>
119119
<artifactId>spring-boot</artifactId>
120-
<version>2.5.4</version>
120+
<version>2.5.15</version>
121121
</dependency>
122122
<dependency>
123123
<groupId>com.h2database</groupId>
@@ -238,7 +238,7 @@
238238
<dependency>
239239
<groupId>org.yaml</groupId>
240240
<artifactId>snakeyaml</artifactId>
241-
<version>1.29</version>
241+
<version>2.0</version>
242242
</dependency>
243243
<!-- test -->
244244
<dependency>

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,18 @@ public class PulsarMQConstants {
6262
public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";
6363

6464
/**
65-
* Pulsar admin服务器地址
65+
* Pulsar 监听器名字
6666
*/
6767
public static final String PULSARMQ_LISTENER_NAME = ROOT + "." + "listenerName";
6868

69+
70+
/**
71+
* Pulsar 开启chunking
72+
*/
73+
public static final String PULSARMQ_ENABLE_CHUNKING = ROOT + "." + "enableChunking";
74+
75+
/**
76+
* Pulsar 压缩算法
77+
*/
78+
public static final String PULSARMQ_COMPRESSION_TYPE = ROOT + "." + "compressionType";
6979
}

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ public class PulsarMQProducerConfig extends MQProperties {
4242
*/
4343
private String listenerName;
4444

45+
/**
46+
* enableChunking
47+
*/
48+
private boolean enableChunking;
49+
50+
/**
51+
* compressionType
52+
*/
53+
private String compressionType;
54+
4555
public String getServerUrl() {
4656
return serverUrl;
4757
}
@@ -81,4 +91,18 @@ public String getListenerName() {
8191
public void setListenerName(String listenerName) {
8292
this.listenerName = listenerName;
8393
}
94+
95+
public void setEnableChunking(boolean enableChunking) {
96+
this.enableChunking = enableChunking;
97+
}
98+
public boolean getEnableChunking() {
99+
return this.enableChunking;
100+
}
101+
102+
public void setCompressionType(String compressionType) {
103+
this.compressionType = compressionType;
104+
}
105+
public String getCompressionType() {
106+
return this.compressionType;
107+
}
84108
}

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,17 @@ private void loadPulsarMQProperties(Properties properties) {
137137
tmpProperties.setListenerName(listenerName);
138138
}
139139

140+
String enableChunkingStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_CHUNKING);
141+
if (!StringUtils.isEmpty(enableChunkingStr)) {
142+
tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr));
143+
}
144+
145+
String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE);
146+
if (!StringUtils.isEmpty(compressionType)) {
147+
tmpProperties.setCompressionType(compressionType);
148+
}
149+
150+
140151
if (logger.isDebugEnabled()) {
141152
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
142153
}
@@ -408,11 +419,34 @@ private Producer<byte[]> getProducer(String topic) {
408419
}
409420

410421
// 创建指定topic的生产者
411-
producer = client.newProducer()
412-
.topic(fullTopic)
422+
ProducerBuilder producerBuilder = client.newProducer();
423+
if(pulsarMQProperties.getEnableChunking()){
424+
producerBuilder.enableChunking(true);
425+
producerBuilder.enableBatching(false);
426+
}
427+
428+
if(!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
429+
switch(pulsarMQProperties.getCompressionType().toLowerCase()) {
430+
case "lz4":
431+
producerBuilder.compressionType(CompressionType.LZ4);
432+
break;
433+
case "zlib":
434+
producerBuilder.compressionType(CompressionType.ZLIB);
435+
break;
436+
case "zstd":
437+
producerBuilder.compressionType(CompressionType.ZSTD);
438+
break;
439+
case "snappy":
440+
producerBuilder.compressionType(CompressionType.SNAPPY);
441+
break;
442+
}
443+
}
444+
445+
producer = producerBuilder.topic(fullTopic)
413446
// 指定路由器
414447
.messageRouter(new MessageRouterImpl(topic))
415448
.create();
449+
416450
// 放入缓存
417451
PRODUCERS.put(topic, producer);
418452
}

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@
9999
<java_target_version>1.8</java_target_version>
100100
<file_encoding>UTF-8</file_encoding>
101101
<javadoc_skip>true</javadoc_skip>
102-
<spring_version>5.3.9</spring_version>
102+
<spring_version>5.3.26</spring_version>
103103
<log4j_version>2.17.0</log4j_version>
104-
<rocketmq_version>4.8.0</rocketmq_version>
104+
<rocketmq_version>4.9.8</rocketmq_version>
105105
<rabbitmq_version>5.18.0</rabbitmq_version>
106106
<mq_amqp_client>1.0.3</mq_amqp_client>
107107
<kafka_version>2.4.0</kafka_version>
108-
<pulsar_version>2.8.1</pulsar_version>
108+
<pulsar_version>2.11.4</pulsar_version>
109109
<mysql_driver_version>5.1.48</mysql_driver_version>
110110
<maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
111111
<maven-surefire.version>2.22.1</maven-surefire.version>

0 commit comments

Comments
 (0)