背景:
1、文件數(shù)據(jù)在A服務(wù)器(windows)(不定期在指定目錄下生成),項目應(yīng)用部署在B服務(wù)器(Linux);
2、項目應(yīng)用在B服務(wù)器,監(jiān)聽A服務(wù)器指定目錄,有新生成文件,進行讀取文件信息,持久化數(shù)據(jù);
3、提供兩塊內(nèi)容,第一安裝windows FTP服務(wù);第二項目源碼,希望可以幫助到你。
共計4種方案,試錯采用了第三種方案,第四種方案沒有試。
1、使用jcsh.jar提供方法讀取文件信息,但需要A服務(wù)器開通SSH遠程連接,一般linux服務(wù)器都是默認開通的,可直接讀取連接讀取,windows系統(tǒng)需安裝SSH,因現(xiàn)場環(huán)境A服務(wù)器是windows2003,故放棄這種方法。
2、曲線救國,通過腳本(腳本監(jiān)聽比較困難,故放棄)把A服務(wù)器信息定時存入B服務(wù)器(Linux),再通過jcsh.jar讀取文件信息。
3、通過A服務(wù)器安裝FTP服務(wù),B服務(wù)器安裝FTP客戶端,使用java動態(tài)監(jiān)聽該目錄下生成文件讀取信息。
4、把A服務(wù)器指定目錄進行共享(等同于共享的這個目錄就是B服務(wù)的目錄了),再進行讀取,因第三種方案成功,故沒有嘗試第四種方案。文章來源:http://www.zghlxwxcb.cn/news/detail-778388.html
windows安裝FTP服務(wù)
1、開啟ftp服務(wù):控制面板–程序和功能–啟用或關(guān)閉windows功能–標(biāo)紅框全部打開–點擊確定
2、新建站點:
控制面板–大圖標(biāo)–管理工具
IIS管理器
網(wǎng)站–添加FTP站點
以上就是windows安裝FTP服務(wù)的過程,我這演示了匿名創(chuàng)建站點,誰都可以訪問,還可以新建用戶,需要用戶登錄才能訪問。文章來源地址http://www.zghlxwxcb.cn/news/detail-778388.html
源碼
引入該依賴
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
FileChangeData
@Data
public class FileChangeData {
/**
* 文件信息
* */
private FTPFile ftpFile;
/**
* 文件改變類型
* */
private FileChangeType eventType;
/**
* 文件名稱
* */
private String fileName;
/**
* 文件大小
* */
private Long fileSize;
/**
* FTPClient
* */
private FTPClient ftpClient;
/**
* 獲取文件輸入流
* @return InputStream
* */
public InputStream getInputStream(String filePathName) {
//如果是刪除事件則不能夠獲取流
if (Objects.equals(eventType, FileChangeType.FILE_DELETED)) {
return null;
}
try {
return ftpClient.retrieveFileStream(filePathName);
} catch (IOException e) {
return null;
}
}
}
FileChangeEvent
public interface FileChangeEvent {
/**
* 文件發(fā)生改變時觸發(fā)此方法
* @param fileChangeData 文件發(fā)生了改變
* */
@Function
void change(FileChangeData fileChangeData) throws IOException;
}
FTPService
public interface FTPService {
/**
* ftp登陸
* @return boolean 是否登陸成功
* */
FTPClient login();
/**
* ftp登出
* @return boolean 是否登出成功
* */
boolean loginOut();
/**
* 獲取文件列表
* @return FTPFile[] 文件列表
* */
FTPFile[] listFile();
/**
* 監(jiān)聽文件夾的改變
* @param fileChangeEvent 文件改變事件
* */
void addListenerFileChange(FileChangeEvent fileChangeEvent);
}
ListenerChangeRunnable
public interface ListenerChangeRunnable extends Runnable {
/**
* 停止監(jiān)聽文件
* @return boolean 是否停止成功
* */
boolean stopListener();
}
FTPServiceImpl
@Service
public class FTPServiceImpl implements FTPService {
@Autowired
private FTPConfig ftpConfig;
private String SPLIT = ":";
private ThreadLocal<FTPClient> currentFTPClient;
private ThreadLocal<ListenerChangeRunnable> currentListener;
public FTPServiceImpl() {
this.currentFTPClient = new ThreadLocal<>();
this.currentListener = new ThreadLocal<>();
}
@Override
public FTPClient login() {
FTPClient ftpClient = new FTPClient();
try {
ftpClient.connect(ftpConfig.getFtpIp(), ftpConfig.getFtpPort());
ftpClient.login(ftpConfig.getUsername(), ftpConfig.getPassword());
// ftpClient.setControlEncoding("gb2312");
this.currentFTPClient.set(ftpClient);
return ftpClient;
} catch (Exception e) {
return null;
}
}
@Override
public boolean loginOut() {
try {
currentFTPClient.get().logout();
currentFTPClient.get().disconnect();
return Boolean.TRUE;
} catch (Exception e) {
return Boolean.FALSE;
}
}
@Override
public FTPFile[] listFile() {
FTPClient ftpClient = this.currentFTPClient.get();
try {
return ftpClient.listFiles();
} catch (Exception e) {
return null;
}
}
@Override
public void addListenerFileChange(FileChangeEvent fileChangeEvent) {
FTPClient ftpClient = this.currentFTPClient.get();
ListenerFileChangeThreadRunnable listenerFileChangeThread = new ListenerFileChangeThreadRunnable(ftpClient, fileChangeEvent);
this.currentListener.set(listenerFileChangeThread);
new Thread(listenerFileChangeThread).start();
}
}
ListenerFileChangeThreadRunnable
@Slf4j
public class ListenerFileChangeThreadRunnable implements ListenerChangeRunnable {
private final FTPClient ftpClient;
private volatile boolean stop;
private final Map<String, Long> fileMemory;
private final FileChangeEvent fileChangeEvent;
public ListenerFileChangeThreadRunnable(FTPClient ftpClient, FileChangeEvent fileChangeEvent) {
this.ftpClient = ftpClient;
this.fileChangeEvent = fileChangeEvent;
this.fileMemory = new HashMap<>();
}
@Override
public void run() {
while (!stop) {
try {
FTPFile[] ftpFiles = ftpClient.listFiles();
//判斷文件被刪除
if (fileMemory.size() > 0) {
Set<String> fileNames = new HashSet<>();
for (FTPFile ftpFile : ftpFiles) {
if (ftpFile.isDirectory()) {
log.info("文件夾不做刪除判斷");
continue;
}
fileNames.add(ftpFile.getName());
}
Set<Map.Entry<String, Long>> entries = fileMemory.entrySet();
for (Map.Entry<String, Long> map : entries) {
if (!fileNames.contains(map.getKey())) {
log.info("文件{}被刪除了", map.getKey());
FileChangeData fileChangeData = new FileChangeData();
fileChangeData.setEventType(FileChangeType.FILE_DELETED);
fileChangeData.setFileName(map.getKey());
fileChangeData.setFileSize(map.getValue());
fileMemory.remove(map.getKey());
entries.remove(map.getKey());
fileChangeEvent.change(fileChangeData);
}
}
}
//判斷文件是否有更改或新增
for (FTPFile ftpFile: ftpFiles) {
//判斷是否為文件夾
if (ftpFile.isDirectory()) {
// log.info("{}為文件不進行監(jiān)聽操作", ftpFile.getName());
continue;
}
FileChangeData fileChangeData = new FileChangeData();
fileChangeData.setFileName(ftpFile.getName());
// fileChangeData.setFileName("D:\\ftptest\\aaa\\"+ftpFile.getName());
fileChangeData.setFileSize(ftpFile.getSize());
fileChangeData.setFtpFile(ftpFile);
//文件是否存在于緩存文件列表中
if (fileMemory.containsKey(ftpFile.getName())) {
// log.info("文件{}在內(nèi)存中已經(jīng)存在,進行大小判斷", ftpFile.getName());
if (!Objects.equals(fileMemory.get(ftpFile.getName()), ftpFile.getSize())) {
// log.info("文件{}在內(nèi)存中已經(jīng)存在且大小不一致,進行更新緩存操作", ftpFile.getName());
fileMemory.put(ftpFile.getName(), ftpFile.getSize());
fileChangeData.setEventType(FileChangeType.FILE_UPDATE);
fileChangeEvent.change(fileChangeData);
}
continue;
}
// log.info("文件{}在內(nèi)存中不存在進行緩存操作", ftpFile.getName());
fileMemory.put(ftpFile.getName(), ftpFile.getSize());
fileChangeData.setEventType(FileChangeType.FILE_ADD);
fileChangeEvent.change(fileChangeData);
}
} catch (Exception e) {
continue;
//throw new RuntimeException(e);
}
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
continue;
//throw new RuntimeException(e);
}
}
}
@Override
public boolean stopListener() {
this.stop = Boolean.TRUE;
this.fileMemory.clear();
return this.stop;
}
}
FileChangeType
public enum FileChangeType {
FILE_UPDATE(0, "文件更新"),
FILE_ADD(1, "文件添加"),
FILE_DELETED(2, "文件刪除");
@Getter
private Integer type;
@Getter
private String desc;
FileChangeType(Integer type, String desc) {
this.type = type;
this.desc = desc;
}
}
FTPConfig
@Data
@Configuration
public class FTPConfig {
@Value("${ftp.ip:ftp的ip}")
private String ftpIp;
@Value("${ftp.port:ftp端口,默認21}")
private Integer ftpPort;
@Value("${ftp.username:ftp創(chuàng)建的用戶名}")
private String username;
@Value("${ftp.password:ftp創(chuàng)建的用戶名密碼}")
private String password;
}
SendEmailApplicationTests
@Component
class SendEmailApplicationTests implements ApplicationRunner {
@Autowired
private FTPService ftpService;
void ftpTest() {
FTPClient ftpClient = ftpService.login();
ftpService.addListenerFileChange(ftpFile -> {
System.out.println(String.format("文件%s被改變了,文件改變類型%s", ftpFile.getFileName(), ftpFile.getEventType().getDesc()));
InputStream inputStream = ftpClient.retrieveFileStream("/"+ ftpFile.getFileName());
if(inputStream!=null){
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,"GBK"));
String s = null;
List<String> listStr = new ArrayList<>();//讀取的數(shù)據(jù)在listStr
while ((s = reader.readLine()) != null) {
System.out.println("===================>" + s);
listStr.add(s);
}
//處理業(yè)務(wù)邏輯
inputStream.close();
reader.close();
ftpClient.completePendingCommand();
}
});
}
@Override
public void run(ApplicationArguments args) throws Exception {
ftpTest();
}
}
到了這里,關(guān)于java通過FTP跨服務(wù)器動態(tài)監(jiān)聽讀取指定目錄下文件數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!