Skip to content

Commit

Permalink
[Feature] #1121 support clearInactiveClusterPhyBrokers
Browse files Browse the repository at this point in the history
  • Loading branch information
chang-wd committed Nov 20, 2024
1 parent 4c10b4c commit 43aca84
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ public interface ClusterBrokersManager {
*/
PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto);

/**
* 删除status == 0 的所有broker -> 获取缓存查询结果 & broker 表查询结果并集
* 获取缓存查询结果 & broker 表查询结果并集
* @param clusterPhyId kafka 物理集群 id
* @param dto 封装分页查询参数对象
* @return 返回获取到的缓存查询结果 & broker 表查询结果并集
*/
PaginationResult<ClusterBrokersOverviewVO> deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto);


/**
* 根据物理集群id获取集群对应broker状态信息
* @param clusterPhyId 物理集群 id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
);
}

@Override
public PaginationResult<ClusterBrokersOverviewVO> deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
brokerService.deleteInactiveClusterPhyBrokers(clusterPhyId);
return this.getClusterPhyBrokersOverview(clusterPhyId, dto);
}

@Override
public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO();
Expand Down
2 changes: 2 additions & 0 deletions km-console/packages/layout-clusters-fe/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const api = {
getTopicMetricPoints: (clusterPhyId: number, topicName: string) => getApi(`/clusters/${clusterPhyId}/topics/${topicName}/metric-points`),
// Broker列表接口
getBrokersList: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-overview`),
// 删除失效Broker
clearInactiveBrokers: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-clear`),
// Broker列表页健康检查指标
getBrokerMetricPoints: (clusterPhyId: number) => getApi(`/physical-clusters/${clusterPhyId}/latest-metrics`),
// Controller列表接口 /api/v3/clusters/{clusterPhyId}/controller-history「controller-change-log」
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ const BrokerList: React.FC = (props: any) => {
});
};

// 请求接口获取数据
const clearInactiveBrokers = async ({ pageNo, pageSize, filters, sorter }: any) => {
if (urlParams?.clusterId === undefined) return;
// filters = filters || filteredInfo;
setLoading(true);
const params = {
searchKeywords: searchKeywords.slice(0, 128),
pageNo,
pageSize,
latestMetricNames: ['PartitionsSkew', 'Leaders', 'LeadersSkew', 'LogSize'],
sortField: sorter?.field || 'brokerId',
sortType: sorter?.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : 'asc',
};

request(API.clearInactiveBrokers(urlParams?.clusterId), { method: 'POST', data: params })
.then((res: any) => {
setPagination({
current: res.pagination?.pageNo,
pageSize: res.pagination?.pageSize,
total: res.pagination?.total,
});
const newData =
res?.bizData.map((item: any) => {
return {
...item,
...item?.latestMetrics?.metrics,
};
}) || [];
setData(newData);
setLoading(false);
})
.catch((err) => {
setLoading(false);
});
};

const onTableChange = (pagination: any, filters: any, sorter: any) => {
// setFilteredInfo(filters);
genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter });
Expand Down Expand Up @@ -107,6 +143,12 @@ const BrokerList: React.FC = (props: any) => {
>
<IconFont className={`${tableHeaderPrefix}-left-refresh-icon`} type="icon-shuaxin1" />
</div>
<div
className={`${tableHeaderPrefix}-left-clear`}
onClick={() => clearInactiveBrokers({ pageNo: pagination.current, pageSize: pagination.pageSize })}
>
<IconFont className={`${tableHeaderPrefix}-left-clear-icon`} type="icon-Operation" />
</div>
</div>
<div className={`${tableHeaderPrefix}-right`}>
<SearchInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ public interface BrokerService {
boolean allServerDown(Long clusterPhyId);

boolean existServerDown(Long clusterPhyId);

void clearInactiveClusterPhyBrokers(Long clusterPhyId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ public List<Broker> listNotAliveBrokersFromDB(Long clusterPhyId) {
return this.listAllBrokersAndUpdateCache(clusterPhyId).stream().filter( elem -> !elem.alive()).collect(Collectors.toList());
}

/**
* 清理对应集群中下线的broker记录
* @param clusterPhyId
*/
@Override
public void clearInactiveClusterPhyBrokers(Long clusterPhyId) {
try {
this.getAllBrokerPOsFromDB(clusterPhyId).stream()
.filter(elem -> elem.getStatus().equals(Constant.DOWN))
.forEach(elem -> brokerDAO.deleteById(elem.getId()));
} catch (Exception e) {
log.error("method=deleteInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
}
}
@Override
public List<Broker> listAllBrokersFromDB(Long clusterPhyId) {
return this.listAllBrokersAndUpdateCache(clusterPhyId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(@
@RequestBody ClusterBrokersOverviewDTO dto) {
return clusterBrokersManager.getClusterPhyBrokersOverview(clusterPhyId, dto);
}

@ApiOperation(value = "集群无效brokers清理")
@PostMapping(value = "clusters/{clusterPhyId}/brokers-clear")
@ResponseBody
public PaginationResult<ClusterBrokersOverviewVO> clearInactiveClusterPhyBrokers(@PathVariable Long clusterPhyId,
@RequestBody ClusterBrokersOverviewDTO dto) {
return clusterBrokersManager.clearInactiveClusterPhyBrokers(clusterPhyId, dto);
}
}

0 comments on commit 43aca84

Please sign in to comment.