前兩天由于項(xiàng)目需要,一個(gè)windows上的批處理任務(wù)(kitchen.bat),需要接到mq的消息通知后執(zhí)行,為了快速實(shí)現(xiàn)這里我們通過(guò)springboot寫(xiě)了一個(gè)jar程序,用于接收mq的消息,并調(diào)用bat文件。
本程序需要實(shí)現(xiàn)的功能
- 調(diào)用windows的批處理腳本bat,并支持傳參
- 可根據(jù)配置設(shè)置并發(fā),同時(shí)消費(fèi)多個(gè)mq消息調(diào)用多個(gè)批處理腳本
- 確保java程序能一直正常運(yùn)行(如果有假死或者宕機(jī)了可以自動(dòng)重啟)
- 批處理腳本執(zhí)行失敗了,則再將信息重新放回到mq的隊(duì)列尾部,等待下次執(zhí)行
需要用的技術(shù)
- Java的java.lang.Runtime類 用于調(diào)用windows服務(wù)器命令
- 通過(guò)環(huán)境變量配置程序運(yùn)行的參數(shù),如mq信息、和執(zhí)行的批處理腳本命令路徑、并發(fā)等
- 通過(guò)rabbitmq的手工ack來(lái)確定消息是否處理成功,及并發(fā)實(shí)現(xiàn)
- 通過(guò)
actuator
來(lái)判斷java程序是否健康 - 通過(guò)windows定時(shí)任務(wù)來(lái)定時(shí)檢查java程序是否正常提供服務(wù),如果不正常則觸發(fā)重啟jar應(yīng)用
- 通過(guò)maven+ant打包程序,將可執(zhí)行程序jar及相關(guān)腳本打包成一個(gè)zip文件,方便發(fā)給使用方使用
主要實(shí)現(xiàn)邏輯
開(kāi)發(fā)環(huán)境:jdk1.8 + maven3.x + rabbitmq
運(yùn)行環(huán)境:windows + jre1.8
Java調(diào)用bat批處理文件
package cn.iccboy.kitchen.common;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* @author iccboy
*/@Slf4j
public class CmdUtil {
/**
* 處理執(zhí)行進(jìn)程的流
*
* @param inputStream
* InputStream 執(zhí)行進(jìn)程的流
* @param tag
* int 標(biāo)志:1--InputStream;2--ErrorStream
*/
private static void processStreamHandler(final InputStream inputStream, int tag) {
// 處理流的線程
new Thread(() -> {
String line;
try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
while ((line = bufferedReader.readLine()) != null) {
if(tag == 1) {
log.info(line);
} else {
log.error(line);
}
}
} catch (Exception e) {
log.error("【異?!棵顖?zhí)行異常:{}", e.getMessage());
}
}).start();
}
public static int exec(String command, String... args) throws IOException {
String cmd = StrUtil.splicingWithSpace(command, args);
log.info("執(zhí)行命令:{}", cmd);
int ret = 99;
Process process = Runtime.getRuntime().exec(cmd);
processStreamHandler(process.getInputStream(), 1);
processStreamHandler(process.getErrorStream(), 2);
try {
ret = process.waitFor();
} catch (InterruptedException e) {
log.error("【異?!縫rocess.waitFor:{}" , e.getMessage());
}
log.info("執(zhí)行命令:{}, 返回狀態(tài)碼={}", cmd, ret);
return ret;
}
}
上面的程序中,一定要注意的是process.getErrorStream()
和 process.getInputStream()
一定要將命令行執(zhí)行輸出的信息(輸出流)和錯(cuò)誤信息(錯(cuò)誤流)都從緩沖區(qū)讀取出來(lái),不然會(huì)導(dǎo)致程序執(zhí)行阻塞。
process的阻塞: 在runtime執(zhí)行大點(diǎn)的命令中,輸入流和錯(cuò)誤流會(huì)不斷有流進(jìn)入存儲(chǔ)在JVM的緩沖區(qū)中,如果緩沖區(qū)的流不被讀取被填滿時(shí),就會(huì)造成runtime的阻塞。所以在進(jìn)行比如:大文件復(fù)制等的操作時(shí),需要不斷的去讀取JVM中的緩沖區(qū)的流,防止Runtime的死鎖阻塞。
程序健康檢查
這里通過(guò)actuator
來(lái)實(shí)現(xiàn),首先程序集成actuator
,由于是springboot項(xiàng)目,所以很方便。然后通過(guò)一個(gè)簡(jiǎn)單的java程序(CheckActuator)來(lái)訪問(wèn)actuator
的http地址,通過(guò)返回值來(lái)判斷jar程序是否運(yùn)行正常,然后通過(guò)windows的腳本(checkHealth.bat
)來(lái)調(diào)用CheckActuator
,根據(jù)返回值在進(jìn)行java程序的重啟等操作。
1. pom.xml增加actuator及prometheus的配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
上的版本會(huì)根據(jù)springboot對(duì)應(yīng)版本自動(dòng)集成
2. 配置actuator
在application.yml中增加如下配置
management:
health:
rabbit:
enabled: true
endpoints:
web:
exposure:
include: ["prometheus","health"]
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
jmx:
enabled: true
3. 編寫(xiě)CheckActuator.java程序
當(dāng)然也可以通過(guò)windows的批處理命令直接訪問(wèn)actuator的地址,來(lái)判斷服務(wù)是否正常。
/**
* 注意:該類不能刪除?。。?! 不能改名?。。?!不能移動(dòng)位置?。。?!
*/
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
/**
* ===================================================
* 注意:該類不能刪除!?。?! 不能改名?。。?!不能移動(dòng)位置?。。?!
*
* 該類用于檢查程序是否健康(通過(guò)actuator進(jìn)行判斷是否健康)
*
* 主要供腳本checkHealth.bat進(jìn)行調(diào)用
* ===================================================
*/
public class CheckActuator {
private static final String HEALTH_FLAG = "\"status\":\"UP\"";
public static void main(String[] args) {
String url = "http://127.0.0.1:8000/actuator/health";
if(args != null && args.length != 0) {
url = args[0];
}
testUrlWithTimeOut(url);
}
public static void testUrlWithTimeOut(String urlString){
int timeOutMillSeconds = 2000;
URL url;
try {
url = new URL(urlString);
URLConnection conn = url.openConnection();
conn.setConnectTimeout(timeOutMillSeconds);
conn.connect();
InputStream in = conn.getInputStream();
BufferedReader reader = new BufferedReader( new InputStreamReader(in));
String line;
StringBuilder sb = new StringBuilder();
while ((line = reader.readLine()) != null) {
sb.append(line);
}
boolean healthFlag = sb.toString().contains(HEALTH_FLAG);
if(healthFlag) {
System.exit(0);
} else {
System.out.println("健康檢查異常:" + sb);
System.exit(1);
}
} catch (Exception e) {
System.out.println("網(wǎng)絡(luò)連接異常: e=" + e.getMessage());
System.exit(1);
}
}
}
我將上面的CheckActuator.java文件放到maven項(xiàng)目的test/java/跟目錄下,后面會(huì)通過(guò)ant命令將.class移動(dòng)到指定位置
- 健康檢測(cè)腳本checkHealth.bat
上面的springboot項(xiàng)目會(huì)通過(guò)http服務(wù),其運(yùn)行的端口是8000,下面腳本會(huì)通過(guò)8000端口來(lái)獲取對(duì)應(yīng)的進(jìn)程pid
::存活監(jiān)控!
@echo off
set strPath=%~dp0
echo %strPath%
mkdir %strPath%log
set "yMd=%date:~0,4%-%date:~5,2%-%date:~8,2% %time:~0,8%"
set strFile=%strPath%log/checkHealth-%date:~0,4%%date:~5,2%%date:~8,2%.log
java -classpath %strPath% CheckActuator
if ERRORLEVEL 1 (goto err) else (goto ok)
:err
echo %yMd% 程序連接失敗,進(jìn)行重啟! >> %strFile%
set port=8000
for /f "tokens=1-5" %%i in ('netstat -ano^|findstr ":%port%"') do (
echo kill the process %%m who use the port
taskkill /pid %%m -t -f
)
goto start
exit
:ok
echo %yMd% 程序運(yùn)行正常 >> %strFile%
exit
:start
chcp 65001
setlocal enabledelayedexpansion
set filename=""
for /f %%a in ('dir strPath *.jar /o-d /tc /b ') do (
set filename=%%~na%%~xa
echo 文件名: !filename!, 最新創(chuàng)建時(shí)間: %%~ta >> %strFile%
if not !filename! == "" (
goto startjar
)
)
:startjar
rem 注釋:查找最新文件結(jié)束,最新文件名為:%filename%
java -jar %strPath%%filename%
windows定時(shí)任務(wù)配置
- 新增-健康檢查定時(shí)任務(wù).bat
@echo off
set strPath=%~dp0
set checkBat=%strPath%checkHealth.bat
schtasks /create /tn xxx-health-check /tr %checkBat% /sc minute /mo 2
pause
上面的xxx-health-check
是定時(shí)任務(wù)的名字; /sc minute /mo 2
表示每2分鐘執(zhí)行一次命令。上面是通過(guò)命令配置的定時(shí)任務(wù),也可以通過(guò)windows的圖形管理界面【計(jì)劃任務(wù)】配置。
- 移除-健康檢查定時(shí)任務(wù).bat
@echo off
pause
schtasks /delete /tn xxx-health-check /f
pause
- 查看-健康檢查定時(shí)任務(wù).bat
@echo off
schtasks /query /V /FO LIST /tn xxx-health-check
pause
通過(guò)windows環(huán)境變量設(shè)置java程序的配置
application.yml 部分配置如下:
server:
port: ${K_PORT:8000}
servlet:
context-path: /
spring:
application:
name: xxx
rabbitmq:
host: ${K_MQ_HOST:172.18.1.100}
password: ${K_MQ_PASSWORD:123456}
port: ${K_MQ_PORT:5672}
username: ${K_MQ_USERNAME:mq}
connection-timeout: 15000
listener:
simple:
acknowledge-mode: manual #開(kāi)啟手動(dòng)ACK
concurrency: ${K_WORKS:1} # 并發(fā)
max-concurrency: ${K_WORKS:1} # 最大并發(fā)
prefetch: 1 # 每個(gè)消費(fèi)每次預(yù)去取幾個(gè)消息
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
shell:
paths: ${K_BAT_PATHS:C:\invoke.bat}
可通過(guò)設(shè)置系統(tǒng)的環(huán)境變量來(lái)改變配置,可設(shè)置的變量包含:
變量 | 說(shuō)明 | 默認(rèn)值 |
---|---|---|
K_PORT | 程序運(yùn)行的http服務(wù)端口 | 8000 |
K_MQ_HOST | rabbitmq 服務(wù)ip | 172.18.1.100 |
K_MQ_PORT | rabbitmq 服務(wù)端口 | 5672 |
K_MQ_USERNAME | rabbitmq 用戶名 | mq |
K_MQ_PASSWORD | rabbitmq 密碼 | 123456 |
K_BAT_PATHS | bat腳本路徑,可以配置多個(gè),通過(guò)英文逗號(hào)分隔,配置多個(gè)就會(huì)啟動(dòng)多個(gè)消費(fèi)者,如:C:\invoke_1.bat,C:\invoke_2.bat | C:\invoke.bat |
K_WORKS | 每個(gè)消費(fèi)者的并發(fā)數(shù)。如:K_BAT_PATHS配置了3個(gè)命令,K_WORKS 配置了 2 ,這表示有3*2=6個(gè)消費(fèi)者 | 1 |
消費(fèi)mq消息并執(zhí)行bat文件
package cn.iccboy.kitchen.mq;
import cn.iccboy.kitchen.common.CmdUtil;
import cn.iccboy.kitchen.common.ThreadUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static cn.iccboy.kitchen.mq.TopicRabbitMqConfig.EXCHANGE_DATA;
import static cn.iccboy.kitchen.mq.TopicRabbitMqConfig.KEY_INDEX_PROCESS;
/**
* @author iccboy
* @date 2023-08-05 15:35
*/
@Slf4j
public class CmdMqReceive {
@Setter
private String batPath;
@Setter
private Integer seq;
@RabbitListener(queues = TopicRabbitMqConfig.QUEUE_INDEX_PROCESS)
public void receive(Message<String> message, @Headers Map<String,Object> headers, Channel channel) throws IOException {
long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
log.info("[start]第{}執(zhí)行器,消息內(nèi)容:{}", seq, message.getPayload());
int status = CmdUtil.exec(batPath, message.getPayload());
if(status != 0) {
log.info("[err_1]第{}執(zhí)行器,消息內(nèi)容:{}加工腳本執(zhí)行異常,狀態(tài)碼={}",seq, message.getPayload(), status);
throw new RuntimeException("腳本執(zhí)行異常");
}
log.info("[end]第{}執(zhí)行器執(zhí)行完成:{}", seq, message.getPayload());
} catch (Exception e) {
ThreadUtil.sleep(1000);
log.error("[err]第{}執(zhí)行器,執(zhí)行異常重新進(jìn)入隊(duì)列:{}", seq, message.getPayload(), e);
//channel.basicNack(deliveryTag, false, true);
// 將處理錯(cuò)誤的消息放到重新隊(duì)列尾部
channel.basicPublish(EXCHANGE_DATA,
KEY_INDEX_PROCESS, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getPayload().getBytes(StandardCharsets.UTF_8));
} finally {
// 確認(rèn)已處理
channel.basicAck(deliveryTag,false);
}
}
}
通過(guò)批處理命令配置個(gè)數(shù),動(dòng)態(tài)生成對(duì)應(yīng)個(gè)數(shù)消費(fèi)者
package cn.iccboy.kitchen.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import java.util.List;
@Slf4j
@Configuration
@Import(DynamicBuildMqReceiveBean.ImportConfig.class)
public class DynamicBuildMqReceiveBean {
public static class ImportConfig implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private List<String> batPaths;
@Override
public void setEnvironment(Environment environment) {
try {
batPaths = environment.getProperty("shell.paths", List.class);
} catch (Exception ex) {
log.error("參數(shù)綁定", ex);
}
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
int seq = 0;
for (String batPath : batPaths) {
seq++;
// 注冊(cè)bean
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(CmdMqReceive.class);
MutablePropertyValues values = new MutablePropertyValues();
values.addPropertyValue("batPath", batPath);
values.addPropertyValue("seq", seq);
beanDefinition.setPropertyValues(values);
registry.registerBeanDefinition(CmdMqReceive.class.getName() + "#" + seq, beanDefinition);
}
}
}
}
上面通過(guò)ImportBeanDefinitionRegistrar
的方式 實(shí)現(xiàn)了動(dòng)態(tài)bean的生成
通過(guò)maven的ant插件實(shí)現(xiàn)打包
在項(xiàng)目的 pom.xml文件中增加如下配置
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>clean</id>
<phase>clean</phase>
<configuration>
<target>
<delete file="${basedir}/shell/CheckActuator.class"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<configuration>
<target>
<copy overwrite="true" file="${project.build.directory}/test-classes/CheckActuator.class"
todir="${basedir}/shell" />
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<id>package</id>
<phase>package</phase>
<configuration>
<target>
<delete dir="${project.build.directory}/kitchen-mq-bin"/>
<mkdir dir="${project.build.directory}/kitchen-mq-bin"/>
<copy todir="${project.build.directory}/kitchen-mq-bin" overwrite="true">
<fileset dir="${basedir}/shell" erroronmissingdir="false">
<include name="*"/>
</fileset>
</copy>
<copy overwrite="true" file="${project.build.directory}/${project.name}-${project.version}.jar" todir="${project.build.directory}/kitchen-mq-bin" />
<zip destfile="${basedir}/kitchen-mq-bin.zip">
<fileset dir="${project.build.directory}/kitchen-mq-bin">
<include name="*"/>
</fileset>
</zip>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
項(xiàng)目結(jié)構(gòu)如下圖:
獲取執(zhí)行包文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-660251.html
- 執(zhí)行打包命令
mvn clean package
- 上面命令執(zhí)行完成后,在項(xiàng)目的跟目錄會(huì)產(chǎn)生一個(gè)壓縮包
kitchen-mq-bin.zip
,將壓縮包直接拷貝到目標(biāo)服務(wù)器,解壓即可。 - 解壓后,直接執(zhí)行
新增-健康檢查定時(shí)任務(wù).bat
即可。2分鐘后就會(huì)啟動(dòng)程序。
下圖是執(zhí)行命令后,多出的 zip文件包,以及包里面的文件文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-660251.html
到了這里,關(guān)于windows服務(wù)器下java程序健康檢測(cè)及假死崩潰后自動(dòng)重啟應(yīng)用、開(kāi)機(jī)自動(dòng)啟動(dòng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!