Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
Expand Down Expand Up @@ -78,7 +79,7 @@ public Broker getBrokerMetadata(Long clusterPhyId, Integer brokerId) throws NotE

try {
BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class);
BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);

return this.convert2Broker(clusterPhyId, brokerId, metadata);
} catch (KeeperException ke) {
logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke);
Expand Down Expand Up @@ -279,7 +280,7 @@ private Broker convert2Broker(Long clusterPhyId, Integer brokerId, BrokerMetadat
metadata.setJmxPort(brokerMetadata.getJmxPort());
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
metadata.setRack(brokerMetadata.getRack());
metadata.setStatus(1);
metadata.setStatus(Constant.ALIVE);
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
Expand Down Expand Up @@ -51,7 +50,6 @@
* }
*
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerMetadata implements Serializable {
private static final long serialVersionUID = 3918113492423375809L;
Expand All @@ -74,34 +72,92 @@ public class BrokerMetadata implements Serializable {

private String rack;

@JsonIgnore
public String getExternalHost() {
if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) {
// external如果不存在,就返回host
return host;
public List<String> getEndpoints() {
return endpoints;
}

public void setEndpoints(List<String> endpoints) {
this.endpoints = endpoints;
}

public Map<String, IpPortData> getEndpointMap() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}

return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp();
return endpointMap;
}

public String getHost() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}

return host;
}

public void setHost(String host) {
this.host = host;
}

@JsonIgnore
public String getInternalHost() {
if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) {
// internal如果不存在,就返回host
return host;
public Integer getPort() {
if (endpointMap == null) {
this.parseBrokerMetadata();
}
return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp();

return port;
}

public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
brokerMetadata.setEndpointMap(new HashMap<>());
public void setPort(Integer port) {
this.port = port;
}

public Integer getJmxPort() {
return jmxPort;
}

public void setJmxPort(Integer jmxPort) {
this.jmxPort = jmxPort;
}

public Integer getVersion() {
return version;
}

if (brokerMetadata.getEndpoints().isEmpty()) {
public void setVersion(Integer version) {
this.version = version;
}

public Long getTimestamp() {
return timestamp;
}

public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public String getRack() {
return rack;
}

public void setRack(String rack) {
this.rack = rack;
}

private synchronized void parseBrokerMetadata() {
if (this.endpointMap != null) {
return;
}

if (this.endpoints == null || this.endpoints.isEmpty()) {
this.endpointMap = new HashMap<>(0);
return;
}

Map<String, IpPortData> tempEndpointMap = new HashMap<>();

// example EXTERNAL://10.179.162.202:7092
for (String endpoint: brokerMetadata.getEndpoints()) {
for (String endpoint: this.endpoints) {
int idx1 = endpoint.indexOf("://");
int idx2 = endpoint.lastIndexOf(":");
if (idx1 == -1 || idx2 == -1 || idx1 == idx2) {
Expand All @@ -111,19 +167,37 @@ public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
String brokerHost = endpoint.substring(idx1 + "://".length(), idx2);
String brokerPort = endpoint.substring(idx2 + 1);

brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
tempEndpointMap.put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));

if (KafkaConstant.INTERNAL_KEY.equals(endpoint.substring(0, idx1))) {
// 优先使用internal的地址进行展示
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
this.host = brokerHost;
this.port = ConvertUtil.string2Integer(brokerPort);
}

if (null == brokerMetadata.getHost()) {
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
if (null == this.host) {
this.host = brokerHost;
this.port = ConvertUtil.string2Integer(brokerPort);
}
}

this.endpointMap = tempEndpointMap;
}

public static void main(String[] args) {
String str = "{\t\n" +
"\t\"listener_security_protocol_map\":{\"EXTERNAL\":\"SASL_PLAINTEXT\",\"INTERNAL\":\"SASL_PLAINTEXT\"},\n" +
"\t\"endpoints\":[\"EXTERNAL://10.179.162.202:7092\",\"INTERNAL://10.179.162.202:7093\"],\n" +
"\t\"jmx_port\":8099,\n" +
"\t\"host\":null,\n" +
"\t\"timestamp\":\"1627289710439\",\n" +
"\t\"port\":-1,\n" +
"\t\"version\":4\n" +
"}";

BrokerMetadata bm = JSON.parseObject(str, BrokerMetadata.class);
System.out.println(bm.getHost());
System.out.println(JSON.toJSON(bm));
}
}