<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>5.6.133</version>
</dependency>
package gaei.cn.x5l.x5lhive2cos.utils;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
import com.qcloud.cos.region.Region;
import org.apache.commons.lang3.SystemUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class CosSampleDate2Mysql {
//Mysql連接信息系
private static String MYSQL_URL = "jdbc:mysql://10.1.1.1:3316/test?useSSL=false";
private static String MYSQL_USERNAME = "root";
private static String MYSQL_PASSWORD = "123456";
//Mysql目標(biāo)
private static String MYSQL_TABLE = "`test`.`test`";
private static Connection conn = null;
private static ResultSet rs = null;
private static PreparedStatement ps = null;
public static void main(String[] args) {
//Hive庫表
String dbName;
String tableName;
if (SystemUtils.IS_OS_WINDOWS_10) {
//cos中的庫名表名
dbName = "database";
tableName = "table_name";
} else {
dbName = args[0];
tableName = args[1];
}
// 1 初始化用戶身份信息(secretId, secretKey)。
String secretId = "*******************************";
String secretKey = "*******************************";
COSCredentials cred = new BasicCOSCredentials(secretId, secretKey);
// 2 設(shè)置 bucket 的地域。
Region region = new Region("ap-guangzhou");
ClientConfig clientConfig = new ClientConfig(region);
// 這里建議設(shè)置使用 https 協(xié)議(從 5.6.54 版本開始,默認(rèn)使用了 https)。
clientConfig.setHttpProtocol(HttpProtocol.https);
// 3 生成 cos 客戶端。
COSClient cosClient = new COSClient(cred, clientConfig);
// 遍歷 Bucket
// List<Bucket> buckets = cosClient.listBuckets();
// for (Bucket bucketElement : buckets) {
// String bucketName = bucketElement.getName();
// String bucketLocation = bucketElement.getLocation();
// System.out.println("bucketName: " + bucketName);
// System.out.println("bucketLocation: " + bucketLocation);
// }
// 4 遍歷 bucket 目錄
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
// 設(shè)置 bucket 名稱
String bucketName = "pro-x5l-1111111111";
listObjectsRequest.setBucketName(bucketName);
// prefix 表示列出的 object 的 key 以 prefix 開始(以/user/x5l/hive/ods_x5l/ods_hive_gb_and_bms_gb/開頭桶下的所有絕對路徑)
listObjectsRequest.setPrefix("/user/x5l/hive/" + dbName + "/" + tableName + "/");
// deliter 表示分隔符, 設(shè)置為/表示列出當(dāng)前目錄下的 object, 設(shè)置為空表示列出所有的 object
listObjectsRequest.setDelimiter("/");
// 設(shè)置最大遍歷出多少個(gè)對象, 一次 listobject 最大支持1000
listObjectsRequest.setMaxKeys(100000000);
ObjectListing objectListing = null;
List<String> commonPrefixs = null;
do {
try {
objectListing = cosClient.listObjects(listObjectsRequest);
} catch (CosServiceException e) {
e.printStackTrace();
return;
} catch (CosClientException e) {
e.printStackTrace();
return;
}
// common prefix 表示表示被 delimiter 截?cái)嗟穆窂? 如 delimter 設(shè)置為/, common prefix 則表示所有子目錄的路徑
commonPrefixs = objectListing.getCommonPrefixes();
// object summary 表示所有列出的 object 列表
// List<COSObjectSummary> cosObjectSummaries = objectListing.getObjectSummaries();
// for (COSObjectSummary cosObjectSummary : cosObjectSummaries) {
// // 文件的路徑 key
// String key = cosObjectSummary.getKey();
// // 文件的 etag
// String etag = cosObjectSummary.getETag();
// // 文件的長度
// long fileSize = cosObjectSummary.getSize();
// // 文件的存儲(chǔ)類型
// String storageClasses = cosObjectSummary.getStorageClass();
// }
// String nextMarker = objectListing.getNextMarker();
// listObjectsRequest.setMarker(nextMarker);
} while (objectListing.isTruncated());
//處理目錄結(jié)構(gòu),獲取分區(qū)時(shí)間目錄
List<String> sampleDates = new ArrayList<>();
for (String commonPrefix : commonPrefixs) {
sampleDates.add(commonPrefix.split("/")[5].split("=")[1]);
}
//獲取mysql中已有分區(qū)時(shí)間
List<String> mysqlSampleDates = new ArrayList<>();
try {
conn = DBConn.conn(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
ps = conn.prepareStatement("select sample_date from " + MYSQL_TABLE + " where `database` = '" + dbName + "' and table_name = '" + tableName + "'");
rs = ps.executeQuery();
while (rs.next()) {
String sampleDate = rs.getString("sample_date");
mysqlSampleDates.add(sampleDate);
// System.out.println("Mysql中已有的時(shí)間分區(qū):" + sampleDate);
}
} catch (Exception e) {
e.printStackTrace();
}
//獲取mysql中缺失分區(qū)時(shí)間
List<String> allSampleDates = sampleDates.stream().filter(item -> !mysqlSampleDates.contains(item)).collect(toList());
List<String> result = allSampleDates.stream().filter(item -> item.contains("202302") || item.contains("202303") || item.contains("202304") || item.contains("202305")).collect(toList());
//解析后的數(shù)據(jù)日期倒轉(zhuǎn)
Collections.reverse(result);
System.out.println("Mysql中需要補(bǔ)充的時(shí)間分區(qū):");
for (String element : result) {
System.out.println(element);
}
if (tableName.contains("dwd_hive_tbox_period_")) {
dbName = "dwd";
tableName = tableName.replace("dwd_hive_tbox_period_", "dwd_tsp_tbox_period_");
}
int count = 1;
String sql = "insert into " + MYSQL_TABLE + " (`database`, `table_name`, `sample_date`, `state`, `update_time`) values('" + dbName + "', '" + tableName + "' ,%s ,0 ,now())";
for (String resultSampleDate : result) {
try {
System.out.println("正在寫入第" + count++ + "條數(shù)據(jù)");
//獲取數(shù)據(jù)源
ps = conn.prepareStatement(String.format(sql, resultSampleDate));
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(tableName + "數(shù)據(jù)寫入完成,成功寫入的數(shù)據(jù)條數(shù)為:" + result.size());
cosClient.shutdown();
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-534812.html
文章來源:http://www.zghlxwxcb.cn/news/detail-534812.html
到了這里,關(guān)于【Hadoop-Cos】存儲(chǔ)對象Cos通過Java-SDK獲取目錄結(jié)構(gòu)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!