Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
package com.xiaojukeji.know.streaming.km.core.service.acl;

import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.ResourceType;

import java.util.List;

public interface KafkaAclService {
Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId);

public interface KafkaAclService extends MetaDataService<AclBinding> {
List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId);

Integer countKafkaAclFromDB(Long clusterPhyId);

Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType resourceType);

Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId);

List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType);

List<KafkaAclPO> getTopicAclFromDB(Long clusterPhyId, String topicName);

List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.acl.ACLAtomParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import org.apache.kafka.common.resource.ResourceType;

import java.util.Date;
import java.util.List;

public interface OpKafkaAclService {
/**
Expand All @@ -19,14 +15,5 @@ public interface OpKafkaAclService {
*/
Result<Void> deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator);

/**
* 删除ACL
*/
Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator);

Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO);

void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList);

int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
Expand All @@ -36,11 +35,13 @@
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import scala.jdk.javaapi.CollectionConverters;

Expand Down Expand Up @@ -77,18 +78,49 @@ private void init() {
}

@Override
public Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId) {
if (LoadedClusterPhyCache.getByPhyId(clusterPhyId) == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}

public Result<List<AclBinding>> getDataFromKafka(ClusterPhy clusterPhy) {
try {
return (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhyId, ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhyId));
Result<List<AclBinding>> dataResult = (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhy.getId(), ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhy.getId()));
if (dataResult.failed()) {
Result.buildFromIgnoreData(dataResult);
}

return Result.buildSuc(dataResult.getData());
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(e.getResultStatus());
}
}

@Override
public void writeToDB(Long clusterPhyId, List<AclBinding> dataList) {
Map<String, KafkaAclPO> dbPOMap = this.getKafkaAclFromDB(clusterPhyId).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));

long now = System.currentTimeMillis();
for (AclBinding aclBinding: dataList) {
KafkaAclPO newPO = KafkaAclConverter.convert2KafkaAclPO(clusterPhyId, aclBinding, now);
KafkaAclPO oldPO = dbPOMap.remove(newPO.getUniqueField());
if (oldPO == null) {
// 新增的ACL
this.insertAndIgnoreDuplicate(newPO);
}

// 不需要update
}

// 删除已经不存在的
for (KafkaAclPO dbPO: dbPOMap.values()) {
kafkaAclDAO.deleteById(dbPO);
}
}

@Override
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);

return kafkaAclDAO.delete(lambdaQueryWrapper);
}

@Override
public List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
Expand Down Expand Up @@ -116,7 +148,7 @@ public Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType res
return 0;
}

return (int)poList.stream().map(elem -> elem.getResourceName()).distinct().count();
return (int)poList.stream().map(KafkaAclPO::getResourceName).distinct().count();
}

@Override
Expand All @@ -130,15 +162,7 @@ public Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId) {
return 0;
}

return (int)poList.stream().map(elem -> elem.getPrincipal()).distinct().count();
}

@Override
public List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(KafkaAclPO::getResourceType, resType);
return kafkaAclDAO.selectList(queryWrapper);
return (int)poList.stream().map(KafkaAclPO::getPrincipal).distinct().count();
}

@Override
Expand All @@ -152,15 +176,6 @@ public List<KafkaAclPO> getTopicAclFromDB(Long clusterPhyId, String topicName) {
return kafkaAclDAO.selectList(queryWrapper);
}

@Override
public List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(KafkaAclPO::getResourceType, ResourceType.GROUP.code());
queryWrapper.eq(KafkaAclPO::getResourceName, groupName);
return kafkaAclDAO.selectList(queryWrapper);
}

/**************************************************** private method ****************************************************/

private Result<List<AclBinding>> getAclByZKClient(VersionItemParam itemParam){
Expand All @@ -170,7 +185,7 @@ private Result<List<AclBinding>> getAclByZKClient(VersionItemParam itemParam){
for (ZkAclStore store: CollectionConverters.asJava(ZkAclStore.stores())) {
Result<List<AclBinding>> rl = this.getSpecifiedTypeAclByZKClient(param.getClusterPhyId(), store.patternType());
if (rl.failed()) {
return rl;
return Result.buildFromIgnoreData(rl);
}

aclList.addAll(rl.getData());
Expand Down Expand Up @@ -229,4 +244,19 @@ private Result<List<AclBinding>> getSpecifiedTypeAclByZKClient(Long clusterPhyId

return Result.buildSuc(kafkaAclList);
}

private Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
try {
kafkaAclDAO.insert(kafkaAclPO);

return Result.buildSuc();
} catch (DuplicateKeyException dke) {
// 直接写入,如果出现key冲突则直接忽略,因为key冲突时,表示该数据已完整存在,不需要替换任何数据
return Result.buildSuc();
} catch (Exception e) {
log.error("method=insertAndIgnoreDuplicate||kafkaAclPO={}||errMsg=exception", kafkaAclPO, e);

return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
Expand All @@ -32,7 +31,6 @@
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
Expand All @@ -41,8 +39,6 @@

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;

Expand Down Expand Up @@ -169,11 +165,6 @@ public Result<Void> deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator) {
return rv;
}

@Override
public Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator) {
return Result.buildSuc();
}

@Override
public Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
try {
Expand All @@ -190,34 +181,6 @@ public Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
}
}

@Override
public void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);

Map<String, KafkaAclPO> dbPOMap = kafkaAclDAO.selectList(lambdaQueryWrapper).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));
for (KafkaAclPO po: poList) {
KafkaAclPO dbPO = dbPOMap.remove(po.getUniqueField());
if (dbPO == null) {
// 新增的ACL
this.insertAndIgnoreDuplicate(po);
}
}

// 删除已经不存在的
for (KafkaAclPO dbPO: dbPOMap.values()) {
kafkaAclDAO.deleteById(dbPO);
}
}

@Override
public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) {
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.le(KafkaAclPO::getUpdateTime, beforeTime);
return kafkaAclDAO.delete(lambdaQueryWrapper);
}

/**************************************************** private method ****************************************************/

private Result<Void> deleteInDB(KafkaAclPO kafkaAclPO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.meta.KafkaMetaService;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
Expand All @@ -14,7 +14,7 @@
/**
* 查看Connector
*/
public interface ConnectorService extends KafkaMetaService<KSConnector> {
public interface ConnectorService extends MetaDataService<KSConnector> {
/**
* 获取所有的连接器名称列表
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.meta;
package com.xiaojukeji.know.streaming.km.core.service.meta;

import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
Expand All @@ -13,7 +13,7 @@
/**
* Kafka元信息服务接口
*/
public interface KafkaMetaService<T> {
public interface MetaDataService<T> {
/**
* 从Kafka中获取数据
* @param connectCluster connect集群
Expand All @@ -26,19 +26,26 @@ public interface KafkaMetaService<T> {
* @param clusterPhy kafka集群
* @return 全部资源集合, 成功的资源列表
*/
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }
default Result<List<T>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new ArrayList<>()); }

/**
* 元信息同步至DB中
* @param clusterId 集群ID
* @param fullNameSet 全部资源列表
* @param fullResSet 全部资源列表
* @param dataList 成功的资源列表
*/
default void writeToDB(Long clusterId, Set<String> fullNameSet, List<T> dataList) {}
default void writeToDB(Long clusterId, Set<String> fullResSet, List<T> dataList) {}

/**
* 元信息同步至DB中
* @param clusterId 集群ID
* @param dataList 成功的资源列表
*/
default void writeToDB(Long clusterId, List<T> dataList) {}

/**
* 依据kafka集群ID删除数据
* @param clusterPhyId kafka集群ID
*/
default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; }
int deleteInDBByKafkaClusterId(Long clusterPhyId);
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;

import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;

import java.util.List;

public interface ZookeeperService {
/**
* 从ZK集群中获取ZK信息
*/
Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig);

void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList);

public interface ZookeeperService extends MetaDataService<ZookeeperInfo> {
List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId);

/**
Expand Down
Loading