一、 DataStreamingAPI方式
網(wǎng)上挺多flinksql方式同步數(shù)據(jù),但是遇到數(shù)據(jù)比較雜亂,會經(jīng)常無緣無故報錯,筆者被逼無奈,采用API方式處理數(shù)據(jù)后同步,不知為何API資料筆者找到的資料很少,還很不全,摸著石頭過河總算完成任務(wù),收獲頗豐,以此分享給大家。
pom.xml
<modelVersion>4.0.0</modelVersion>
<groupId>com.cece</groupId>
<artifactId>Mongo-ES</artifactId>
<version>1.0</version>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包時不復(fù)制META-INF下的簽名文件,避免報非法簽名文件的SecurityExceptions異常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依賴的工廠類打包時會相互覆蓋,需要使用ServicesResourceTransformer解決-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
二、主程序-配置
public class Config {
public static final String MONGODB_URL = "1xxx";
public static final String MONGODB_USER = "sxx";
public static final String MONGODB_PWD = "xx";
public static final String MONGODB_DATABASE = "xx";
public static final String MONGODB_COLLECTION = "xx";
public static final String ES_URL = "xx";
public static final int ES_PORT = xx;
public static final String ES_USER = "xxx";
public static final String ES_PWD = "xxxx";
public static final String ES_INDEX = "xxxx";
public static final int BUFFER_TIMEOUT_MS =100;
public static final int CHECKPOINT_INTERVAL_MS =3*60*1000;
public static final int CHECKPOINT_TIMEOUT_MS = 1*60*1000;
public static final int CHECKPOINT_MIN_PAUSE_MS = 1*60*1000;
}
三、主程序
public class FlinkCdcSyn_API {
public static void main(String[] args) throws Exception {
//1.構(gòu)建flink環(huán)境及配置checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setBufferTimeout(BUFFER_TIMEOUT_MS);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(10)));
//2.通過FlinkCDC構(gòu)建SourceFunction
SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
.hosts(MONGODB_URL)
.username(MONGODB_USER)
.password(MONGODB_PWD)
.databaseList(MONGODB_DATABASE)
.collectionList(MONGODB_COLLECTION)
.deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new CoustomParse())
.build();
//3.數(shù)據(jù)初步處理,因為es的keyword最大不能超過32766
SingleOutputStreamOperator<String> stream = env.addSource(mongoDBSourceFunction)
.setParallelism(1)
.name("mongo_to_es")
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
try {
//判斷是否是json格式,不是過濾掉
JSONObject obj = JSON.parseObject(s);
return true;
} catch (Exception e) {
System.out.println("json格式錯誤:"+ s) ;
return false;
}
}
//不處理會報whose UTF8 encoding is longer than the max length 32766,將過大的字段過濾掉
}).map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject obj = JSON.parseObject(s);
String str = obj.getString("operationType");
if("insert".equals(str) || "update".equals(str)){
JSONObject obj1 = obj.getJSONObject("fullDocument");
if(obj1.toString().getBytes("utf-8").length > 36000){
Set<Map.Entry<String, Object>> entries = obj1.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while(iterator.hasNext()){
// String s1 = iterator.next().toString();
// System.out.println("iterator含義:" + s1);
if(iterator.next().toString().getBytes("utf-8").length > 30000) {
iterator.remove();
}
}
obj.fluentPut("fullDocument",obj1.toString());
}
}
return obj.toString();
}
});
List<HttpHost> httpHosts = new ArrayList<>();
//4.對insert/update/delete分別處理
httpHosts.add(new HttpHost(ES_URL, ES_PORT, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
httpHosts, new ElasticsearchSinkFunction<String>() {
public ActionRequest createIndexRequest(String element) {
JSONObject obj = JSON.parseObject(element);
// System.out.println("create:" + obj.toString());
String str = obj.getString("operationType");
// Map<String, String> json = new HashMap<>();
String id = null;
try {
id = obj.getJSONObject("documentKey").getJSONObject("_id").getString("$oid");
} catch (Exception e) {
try {
id = obj.getJSONObject("documentKey").getString("_id");
} catch (Exception ex) {
System.out.println("格式不對:" + obj);
}
}
if("insert".equals(str)){
JSONObject data = obj.getJSONObject("fullDocument");
data.remove("_id").toString();
//Object o = data.toJSON(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
Document parse = Document.parse(data.toString());
String s = parse.toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
return Requests.indexRequest()
.index(ES_INDEX)
.id(id)
.source(s,XContentType.JSON);
}else if("update".equals(str)){
JSONObject data = obj.getJSONObject("fullDocument");
data.remove("_id").toString();
Document parse = Document.parse(data.toString());
String s = parse.toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
return new UpdateRequest(ES_INDEX,id).doc(s,XContentType.JSON).retryOnConflict(3);
}else{
DeleteRequest deleteRequest = new DeleteRequest(ES_INDEX,id);
// System.out.println("delete語句:" + obj.toString());
return deleteRequest;
}
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
//System.out.println(element);
// indexer.add(createIndexRequest(element));
indexer.add(createIndexRequest(element));
// indexer.add();
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(ES_USER, ES_PWD));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
});
esSinkBuilder.setBulkFlushInterval(2000);
//5.處理報錯的數(shù)據(jù),因為延遲產(chǎn)生的增加到隊列重新寫入,其他的過濾掉
esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
System.out.println("失敗語句:"+action.toString());
}
}
});
stream.addSink(esSinkBuilder.build());
env.execute();
}
}
四、其他問題
有個大坑,我用該程序監(jiān)控mongodb只能監(jiān)控一個表,多個表監(jiān)控一致報如下錯誤:文章來源:http://www.zghlxwxcb.cn/news/detail-508339.html
Caused by: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}
org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}
Caused by: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}
一直以為是版本不匹配,jar沖突,多次調(diào)整,排除依賴,耗盡心力,陸陸續(xù)續(xù)研究了一個多月,從依賴缺少,研究到依賴沖突,從權(quán)限問題到自己安裝mongodb。其間各種求人,請教,最后的最后。。。。
有幸加入cdc開發(fā)者群,開發(fā)人員告訴我mongodb-3.6.3不支持changstream的多表監(jiān)控,我的心啊。。。。。后來人,留著記錄給你文章來源地址http://www.zghlxwxcb.cn/news/detail-508339.html
到了這里,關(guān)于FlinkCDC從Mongodb同步數(shù)據(jù)至elasticsearch(ES) 新版的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!