Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/install_guide/版本升级手册.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming');


-- 多集群管理权限2023-07-06新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming');

INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');
```

### 升级至 `3.3.0` 版本
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long cluste
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);

// 类型转化
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
List<GroupOverviewVO> voList = groupList.stream().map(GroupConverter::convert2GroupOverviewVO).collect(Collectors.toList());

// 搜索groupName
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
Expand Down Expand Up @@ -301,7 +301,7 @@ public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
return opGroupService.deleteGroupTopicOffset(
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
operator
);
Expand All @@ -313,7 +313,7 @@ public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
return opGroupService.deleteGroupTopicPartitionOffset(
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
operator
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.xiaojukeji.know.streaming.km.core.service.group;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;

public interface OpGroupService {
/**
* 删除Topic
* 删除Offset
*/
Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator);
Result<Void> deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator);
Result<Void> deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
Expand Down Expand Up @@ -46,6 +45,8 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
private static final ILog LOGGER = LogFactory.getLog(OpGroupServiceImpl.class);

private static final String DELETE_GROUP_OFFSET = "deleteGroupOffset";
private static final String DELETE_GROUP_TOPIC_OFFSET = "deleteGroupTopicOffset";
private static final String DELETE_GROUP_TP_OFFSET = "deleteGroupTopicPartitionOffset";

@Autowired
private GroupDAO groupDAO;
Expand All @@ -66,7 +67,9 @@ protected VersionItemTypeEnum getVersionItemType() {

@PostConstruct
private void init() {
registerVCHandler(DELETE_GROUP_OFFSET, V_1_1_0, V_MAX, "deleteGroupOffset", this::deleteGroupOffsetByClient);
registerVCHandler(DELETE_GROUP_OFFSET, V_2_0_0, V_MAX, "deleteGroupOffsetByClient", this::deleteGroupOffsetByClient);
registerVCHandler(DELETE_GROUP_TOPIC_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicOffsetByClient", this::deleteGroupTopicOffsetByClient);
registerVCHandler(DELETE_GROUP_TP_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicPartitionOffsetByClient", this::deleteGroupTopicPartitionOffsetByClient);
}

@Override
Expand All @@ -80,70 +83,84 @@ public Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator) {
return rv;
}

// 清理数据库中的数据
if (DeleteGroupTypeEnum.GROUP.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理Group数据
this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName());
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName());
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
DeleteGroupTopicParam topicParam = (DeleteGroupTopicParam) param;
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), topicParam.getTopicName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(topicParam))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理group + topic 数据
this.deleteGroupMemberInDB(topicParam.getClusterPhyId(), topicParam.getGroupName(), topicParam.getTopicName());
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(param.getDeleteGroupTypeEnum())) {
// 记录操作
DeleteGroupTopicPartitionParam partitionParam = (DeleteGroupTopicPartitionParam) param;
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), partitionParam.getTopicName(), partitionParam.getPartitionId()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(partitionParam))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 不需要进行清理
}
// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理Group数据
this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName());
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName());

return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

/**************************************************** private method ****************************************************/
@Override
public Result<Void> deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator) {
// 日志记录
LOGGER.info("method=deleteGroupTopicOffset||param={}||operator={}||msg=delete group topic offset", ConvertUtil.obj2Json(param), operator);

private Result<Void> deleteGroupOffsetByClient(VersionItemParam itemParam) {
DeleteGroupParam deleteGroupParam = (DeleteGroupParam) itemParam;

if (DeleteGroupTypeEnum.GROUP.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupByClient(itemParam);
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupTopicOffsetByClient(itemParam);
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
return this.deleteGroupTopicPartitionOffsetByClient(itemParam);
try {
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TOPIC_OFFSET, param);
if (rv == null || rv.failed()) {
return rv;
}

// 清理数据库中的数据
// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

// 清理group + topic 数据
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName(), param.getTopicName());

return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

@Override
public Result<Void> deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator) {
// 日志记录
LOGGER.info("method=deleteGroupTopicPartitionOffset||param={}||operator={}||msg=delete group topic partition offset", ConvertUtil.obj2Json(param), operator);

try {
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TP_OFFSET, param);
if (rv == null || rv.failed()) {
return rv;
}

return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "删除Offset时,删除的类型参数非法");
// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.DELETE.getDesc(),
ModuleEnum.KAFKA_GROUP.getDesc(),
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), param.getPartitionId()),
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
);
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);

return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

private Result<Void> deleteGroupByClient(VersionItemParam itemParam) {
/**************************************************** private method ****************************************************/

private Result<Void> deleteGroupOffsetByClient(VersionItemParam itemParam) {
DeleteGroupParam param = (DeleteGroupParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
Expand All @@ -156,7 +173,7 @@ private Result<Void> deleteGroupByClient(VersionItemParam itemParam) {
deleteConsumerGroupsResult.all().get();
} catch (Exception e) {
LOGGER.error(
"method=deleteGroupByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!",
"method=deleteGroupOffsetByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!",
param.getClusterPhyId(), param.getGroupName(), e
);

Expand All @@ -172,7 +189,7 @@ private Result<Void> deleteGroupTopicOffsetByClient(VersionItemParam itemParam)
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());

DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(
param.getTopicName()),
param.getTopicName()),
new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";

private static final String FE_DELETE_GROUP_OFFSET = "FEDeleteGroupOffset";
private static final String FE_DELETE_GROUP_TOPIC_OFFSET = "FEDeleteGroupTopicOffset";
private static final String FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET = "FEDeleteGroupTopicPartitionOffset";

public FrontEndControlVersionItems(){}
public FrontEndControlVersionItems() {
// ignore
}

@Override
public int versionItemType() {
Expand Down Expand Up @@ -97,9 +101,13 @@ public List<VersionControlItem> init(){
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
.name(FE_TRUNCATE_TOPIC).desc("清空Topic"));

// truncate topic
itemList.add(buildItem().minVersion(VersionEnum.V_1_1_0).maxVersion(VersionEnum.V_MAX)
// 删除Offset
itemList.add(buildItem().minVersion(VersionEnum.V_2_0_0).maxVersion(VersionEnum.V_MAX)
.name(FE_DELETE_GROUP_OFFSET).desc("删除GroupOffset"));
itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX)
.name(FE_DELETE_GROUP_TOPIC_OFFSET).desc("删除GroupTopicOffset"));
itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX)
.name(FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET).desc("删除GroupTopicPartitionOffset"));
return itemList;
}
}
10 changes: 10 additions & 0 deletions km-persistence/src/main/resources/sql/dml-logi.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,13 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2040', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming');


-- 多集群管理权限2023-07-06新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming');

INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');