Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public List<String> listGroupsFromKafka(ClusterPhy clusterPhy) throws AdminOpera
}

props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers());
props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId()));

adminClient = KSPartialKafkaAdminClient.create(props);
KSListGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups(
Expand Down Expand Up @@ -178,6 +179,7 @@ public KSGroupDescription getGroupDescriptionFromKafka(ClusterPhy clusterPhy, St
}

props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers());
props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId()));

adminClient = KSPartialKafkaAdminClient.create(props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,10 +77,12 @@ private void closeKafkaAdminClient(Long clusterPhyId) {

LOGGER.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId);

boolean allSuccess = this.closeAdminClientList(adminClientList);
boolean allSuccess = this.closeAdminClientList(clusterPhyId, adminClientList);

if (allSuccess) {
LOGGER.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId);
} else {
LOGGER.error("close kafka AdminClient exist failed and can ignore this error, clusterPhyId:{}", clusterPhyId);
}
} catch (Exception e) {
LOGGER.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e);
Expand Down Expand Up @@ -116,6 +119,7 @@ private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapSe

adminClientList = new ArrayList<>();
for (int i = 0; i < clientCnt; ++i) {
props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("ApacheAdminClient||clusterPhyId=%d||Cnt=%d", clusterPhyId, i));
adminClientList.add(AdminClient.create(props));
}

Expand All @@ -125,25 +129,27 @@ private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapSe
} catch (Exception e) {
LOGGER.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e);

this.closeAdminClientList(adminClientList);
this.closeAdminClientList(clusterPhyId, adminClientList);
} finally {
modifyClientMapLock.unlock();
}

return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId).get((int)(System.currentTimeMillis() % clientCnt));
}

private boolean closeAdminClientList(List<AdminClient> adminClientList) {
private boolean closeAdminClientList(Long clusterPhyId, List<AdminClient> adminClientList) {
if (adminClientList == null) {
return true;
}

boolean allSuccess = true;
for (AdminClient adminClient: adminClientList) {
try {
adminClient.close();
// 关闭客户端,超时时间为30秒
adminClient.close(Duration.ofSeconds(30));
} catch (Exception e) {
// ignore
LOGGER.error("close kafka AdminClient exist failed, clusterPhyId:{}", clusterPhyId, e);
allSuccess = false;
}
}
Expand Down