序
本文主要研究一下PowerJob的SystemInfoController
SystemInfoController
tech/powerjob/server/web/controller/SystemInfoController.java
@Slf4j @RestController @RequestMapping("/system") @RequiredArgsConstructor public class SystemInfoController { private final JobInfoRepository jobInfoRepository; private final InstanceInfoRepository instanceInfoRepository; private final ServerInfoService serverInfoService; private final WorkerClusterQueryService workerClusterQueryService; @GetMapping("/listWorker") public ResultDTO> listWorker(Long appId) { List
workerInfos = workerClusterQueryService.getAllWorkers(appId); return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList())); } @GetMapping("/overview") public ResultDTO getSystemOverview(Long appId) { SystemOverviewVO overview = new SystemOverviewVO(); // 总任务数量 overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())); // 运行任务数 overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); // 近期失败任务数(24H内) Date date = DateUtils.addDays(new Date(), -1); overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); // 服务器时区 overview.setTimezone(TimeZone.getDefault().getDisplayName()); // 服务器时间 overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN)); overview.setServerInfo(serverInfoService.fetchServiceInfo()); return ResultDTO.success(overview); } }
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数
getAllWorkers
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
@DesignateServer public ListgetAllWorkers(Long appId) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); return workers; } private Map getWorkerInfosByAppId(Long appId) { ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyMap(); } return clusterStatusHolder.getAllWorkers(); } public Map getAppId2ClusterStatus() { return WorkerClusterManagerService.getAppId2ClusterStatus(); }
getAllWorkers通过getWorkerInfosByAppId获取WorkerInfo,然后根据getSystemMetrics().calculateScore()进行排序
WorkerClusterManagerService
tech/powerjob/server/remote/worker/WorkerClusterManagerService.java
@Slf4j public class WorkerClusterManagerService { /** * 存储Worker健康信息,appId -> ClusterStatusHolder */ private static final MapAPP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap(); /** * 更新状态 * @param heartbeat Worker的心跳包 */ public static void updateStatus(WorkerHeartbeat heartbeat) { Long appId = heartbeat.getAppId(); String appName = heartbeat.getAppName(); ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); clusterStatusHolder.updateStatus(heartbeat); } /** * 清理不需要的worker信息 * @param usingAppIds 需要维护的appId,其余的数据将被删除 */ public static void clean(List usingAppIds) { Set keys = Sets.newHashSet(usingAppIds); APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } /** * 清理缓存信息,防止 OOM */ public static void cleanUp() { APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release); } protected static Map getAppId2ClusterStatus() { return APP_ID_2_CLUSTER_STATUS; } }
WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat)
updateStatus
tech/powerjob/server/remote/worker/ClusterStatusHolder.java
public void updateStatus(WorkerHeartbeat heartbeat) { String workerAddress = heartbeat.getWorkerAddress(); long heartbeatTime = heartbeat.getHeartbeatTime(); WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> { WorkerInfo wf = new WorkerInfo(); wf.refresh(heartbeat); return wf; }); long oldTime = workerInfo.getLastActiveTime(); if (heartbeatTime < oldTime) { log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime()); return; } workerInfo.refresh(heartbeat); ListcontainerInfos = heartbeat.getContainerInfos(); if (!CollectionUtils.isEmpty(containerInfos)) { containerInfos.forEach(containerInfo -> { Map infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap()); infos.put(workerAddress, containerInfo); }); } }
updateStatus方法先根据workerAddress获取workerInfo,若heartbeatTime大于等于lastActiveTime则执行workerInfo.refresh(heartbeat),同时更新containerInfos
getSystemMetrics
tech/powerjob/worker/common/utils/SystemInfoUtils.java
public class SystemInfoUtils { private static final NumberFormat NF = NumberFormat.getNumberInstance(); static { NF.setMaximumFractionDigits(4); NF.setMinimumFractionDigits(4); NF.setRoundingMode(RoundingMode.HALF_UP); // 不按照千分位输出 NF.setGroupingUsed(false); } // JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway. private static final Runtime runtime = Runtime.getRuntime(); private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean(); public static SystemMetrics getSystemMetrics() { SystemMetrics metrics = new SystemMetrics(); fillCPUInfo(metrics); fillMemoryInfo(metrics); fillDiskInfo(metrics); // 在Worker完成分数计算,减小Server压力 metrics.calculateScore(); return metrics; } private static void fillCPUInfo(SystemMetrics metrics) { metrics.setCpuProcessors(osMXBean.getAvailableProcessors()); metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage())); } private static void fillMemoryInfo(SystemMetrics metrics) { // JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存) long maxMemory = runtime.maxMemory(); long usedMemory = runtime.totalMemory() - runtime.freeMemory(); metrics.setJvmMaxMemory(bytes2GB(maxMemory)); // 已使用内存:当前申请总量 - 当前空余量 metrics.setJvmUsedMemory(bytes2GB(usedMemory)); // 已用内存比例 metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory)); } private static void fillDiskInfo(SystemMetrics metrics) { long free = 0; long total = 0; File[] roots = File.listRoots(); for (File file : roots) { free += file.getFreeSpace(); total += file.getTotalSpace(); } metrics.setDiskUsed(bytes2GB(total - free)); metrics.setDiskTotal(bytes2GB(total)); metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal())); } private static double bytes2GB(long bytes) { return miniDouble(bytes / 1024.0 / 1024 / 1024); } private static double miniDouble(double origin) { return Double.parseDouble(NF.format(origin)); } }
SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore();cpu信息通过osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()获取;memory信息通过Runtime获取;disk信息则通过遍历File.listRoots()去统计freeSpace及totalSpace
calculateScore
tech/powerjob/common/model/SystemMetrics.java
public int calculateScore() { if (score > 0) { return score; } // Memory is vital to TaskTracker, so we set the multiplier factor as 2. double memScore = (jvmMaxMemory - jvmUsedMemory) * 2; // Calculate the remaining load of CPU. Multiplier is set as 1. double cpuScore = cpuProcessors - cpuLoad; // Windows can not fetch CPU load, set cpuScore as 1. if (cpuScore > cpuProcessors) { cpuScore = 1; } score = (int) (memScore + cpuScore); return score; }
SystemMetrics的calculateScore则是由memScore、cpuScore两部分相加而成;memScore为(jvmMaxMemory - jvmUsedMemory) * 2,cpuScore为cpuProcessors - cpuLoad
小结
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数;WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore()。
猜你喜欢
- 22天前(a级景区评定机构)全国A级旅游景区创建与提升培训班在敦煌市举办
- 22天前(鄂尔多斯航空公司客服电话)架起“北方之路” ,中国联合航空带您飞向鄂尔多斯重回1倍速
- 22天前(香港航空三亚航线复航了吗)香港航空三亚航线复航
- 22天前(屿见不一样是哪个酒店)屿见白纱,遇见自己 “佳能PhotoGirls屿见白纱”摄影派对玩转海岛
- 22天前(罗马尼亚的匈牙利族自治)江苏赴匈牙利、罗马尼亚开展文旅交流推广活动
- 22天前(“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕)“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕
- 22天前(云南滇陇工程咨询有限公司)陇滇携手谋发展 文旅合作谱新篇
- 22天前(曼谷丽思卡尔顿公寓价格)在曼谷丽思卡尔顿酒店CALEŌ 邂逅鸡尾酒的浪漫艺术
- 22天前(曼谷丽思卡尔顿公寓价格)曼谷丽思卡尔顿酒店盛大启幕,开创泰国奢华雅致新纪元
- 22天前(澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布)澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章