国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

FlinkCDC從Mongodb同步數(shù)據(jù)至elasticsearch(ES) 新版

這篇具有很好參考價值的文章主要介紹了FlinkCDC從Mongodb同步數(shù)據(jù)至elasticsearch(ES) 新版。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、 DataStreamingAPI方式

網(wǎng)上挺多flinksql方式同步數(shù)據(jù),但是遇到數(shù)據(jù)比較雜亂,會經(jīng)常無緣無故報錯,筆者被逼無奈,采用API方式處理數(shù)據(jù)后同步,不知為何API資料筆者找到的資料很少,還很不全,摸著石頭過河總算完成任務(wù),收獲頗豐,以此分享給大家。

pom.xml

  <modelVersion>4.0.0</modelVersion>

    <groupId>com.cece</groupId>
    <artifactId>Mongo-ES</artifactId>
    <version>1.0</version>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12</scala.version>
    </properties>
    <dependencies>







        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>






        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.11.0</version>
        </dependency>




        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

<!--         https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包時不復(fù)制META-INF下的簽名文件,避免報非法簽名文件的SecurityExceptions異常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依賴的工廠類打包時會相互覆蓋,需要使用ServicesResourceTransformer解決-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>

二、主程序-配置

public class Config {
    public static final String MONGODB_URL = "1xxx";
    public static final String MONGODB_USER = "sxx";
    public static final String MONGODB_PWD = "xx";



    public static final String MONGODB_DATABASE = "xx";
    public static final String MONGODB_COLLECTION = "xx";



    public static final String ES_URL = "xx";
    public static final int ES_PORT = xx;
    public static final String ES_USER = "xxx";
    public static final String ES_PWD = "xxxx";
    public static final String ES_INDEX = "xxxx";



    public static final int   BUFFER_TIMEOUT_MS =100;
    public static final int   CHECKPOINT_INTERVAL_MS =3*60*1000;
    public static final int   CHECKPOINT_TIMEOUT_MS = 1*60*1000;
    public static final int   CHECKPOINT_MIN_PAUSE_MS = 1*60*1000;
}

三、主程序

public class FlinkCdcSyn_API {


    public static void main(String[] args) throws Exception {
    //1.構(gòu)建flink環(huán)境及配置checkpoint
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        env.setBufferTimeout(BUFFER_TIMEOUT_MS);
        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(10)));

        //2.通過FlinkCDC構(gòu)建SourceFunction
        SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
                .hosts(MONGODB_URL)
                .username(MONGODB_USER)
                .password(MONGODB_PWD)

                .databaseList(MONGODB_DATABASE)
                .collectionList(MONGODB_COLLECTION)

                .deserializer(new JsonDebeziumDeserializationSchema())
//                .deserializer(new CoustomParse())
                .build();

//3.數(shù)據(jù)初步處理,因為es的keyword最大不能超過32766

        SingleOutputStreamOperator<String> stream = env.addSource(mongoDBSourceFunction)
                .setParallelism(1)
                .name("mongo_to_es")
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        try {
                        //判斷是否是json格式,不是過濾掉
                            JSONObject obj = JSON.parseObject(s);
                            return true;
                        } catch (Exception e) {
                            System.out.println("json格式錯誤:"+ s) ;
                            return  false;
                        }
                    }
                    //不處理會報whose UTF8 encoding is longer than the max length 32766,將過大的字段過濾掉
                }).map(new MapFunction<String, String>() {
                    @Override
                    public String map(String s) throws Exception {
                        JSONObject obj = JSON.parseObject(s);
                        String str = obj.getString("operationType");

                        if("insert".equals(str) || "update".equals(str)){
                            JSONObject obj1 = obj.getJSONObject("fullDocument");
                            if(obj1.toString().getBytes("utf-8").length > 36000){


                            Set<Map.Entry<String, Object>> entries = obj1.entrySet();
                            Iterator<Map.Entry<String, Object>> iterator = entries.iterator();

                            while(iterator.hasNext()){
//                                    String s1 = iterator.next().toString();
//                                    System.out.println("iterator含義:" + s1);
                                if(iterator.next().toString().getBytes("utf-8").length > 30000) {
                                    iterator.remove();
                                }
                            }
                            obj.fluentPut("fullDocument",obj1.toString());
                        }
                        }

                        return obj.toString();
                    }
                });
                  List<HttpHost> httpHosts = new ArrayList<>();


//4.對insert/update/delete分別處理
        httpHosts.add(new HttpHost(ES_URL, ES_PORT, "http"));
           ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
                httpHosts, new ElasticsearchSinkFunction<String>() {

            public ActionRequest createIndexRequest(String element) {



               JSONObject obj = JSON.parseObject(element);
              //  System.out.println("create:" + obj.toString());
              String str = obj.getString("operationType");
               // Map<String, String> json = new HashMap<>();
                String id = null;

                try {
                    id = obj.getJSONObject("documentKey").getJSONObject("_id").getString("$oid");
                } catch (Exception e) {
                    try {
                        id = obj.getJSONObject("documentKey").getString("_id");
                    } catch (Exception ex) {
                        System.out.println("格式不對:" + obj);
                    }

                }

                if("insert".equals(str)){
                    JSONObject data = obj.getJSONObject("fullDocument");

                    data.remove("_id").toString();
                    //Object o = data.toJSON(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
                    Document parse = Document.parse(data.toString());

                    String s = parse.toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
                      return Requests.indexRequest()
                            .index(ES_INDEX)
                            .id(id)
                            .source(s,XContentType.JSON);
                }else if("update".equals(str)){
                    JSONObject data = obj.getJSONObject("fullDocument");
                    data.remove("_id").toString();
                    Document parse = Document.parse(data.toString());
                    String s = parse.toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
                          return   new UpdateRequest(ES_INDEX,id).doc(s,XContentType.JSON).retryOnConflict(3);

                }else{
                    DeleteRequest deleteRequest = new DeleteRequest(ES_INDEX,id);
                   // System.out.println("delete語句:" + obj.toString());
                    return deleteRequest;
                }


            }
                @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                //System.out.println(element);
//                indexer.add(createIndexRequest(element));
                indexer.add(createIndexRequest(element));
//                indexer.add();


            }
        }

        );
        esSinkBuilder.setBulkFlushMaxActions(1);
        esSinkBuilder.setRestClientFactory(restClientBuilder -> {
                    restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                            BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(ES_USER, ES_PWD));

                            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        }

                    });
                });
        esSinkBuilder.setBulkFlushInterval(2000);
        //5.處理報錯的數(shù)據(jù),因為延遲產(chǎn)生的增加到隊列重新寫入,其他的過濾掉
        esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
                if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                    // full queue; re-add document for indexing
                    indexer.add(action);
                } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                    // malformed document; simply drop request without failing sink
                } else {
                    // for all other failures, fail the sink
                    // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                    System.out.println("失敗語句:"+action.toString());

                }
            }
        });
         stream.addSink(esSinkBuilder.build());

           env.execute();


        }
    }

四、其他問題

有個大坑,我用該程序監(jiān)控mongodb只能監(jiān)控一個表,多個表監(jiān)控一致報如下錯誤:

 Caused by: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}

org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}
Caused by: com.mongodb.MongoCommandException: Command failed with error 40415 (Location40415): 'BSON field '$changeStream.allChangesForCluster' is an unknown field.' on server common1:27017. The full response is {"operationTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "ok": 0.0, "errmsg": "BSON field '$changeStream.allChangesForCluster' is an unknown field.", "code": 40415, "codeName": "Location40415", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1668869353, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "mvX+tJx602hlN5SW61z3sqMI6bI=", "subType": "00"}}, "keyId": 7165073358986412034}}}



一直以為是版本不匹配,jar沖突,多次調(diào)整,排除依賴,耗盡心力,陸陸續(xù)續(xù)研究了一個多月,從依賴缺少,研究到依賴沖突,從權(quán)限問題到自己安裝mongodb。其間各種求人,請教,最后的最后。。。。
有幸加入cdc開發(fā)者群,開發(fā)人員告訴我mongodb-3.6.3不支持changstream的多表監(jiān)控,我的心啊。。。。。后來人,留著記錄給你文章來源地址http://www.zghlxwxcb.cn/news/detail-508339.html

到了這里,關(guān)于FlinkCDC從Mongodb同步數(shù)據(jù)至elasticsearch(ES) 新版的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Elasticsearch實戰(zhàn)-數(shù)據(jù)同步(解決es數(shù)據(jù)增量同步)

    Elasticsearch實戰(zhàn)-數(shù)據(jù)同步(解決es數(shù)據(jù)增量同步)

    之前測試的數(shù)據(jù)都是一次從mysql導(dǎo)入到es,隨著時間的推移,每天都有可能發(fā)生增刪改查,不可能每次都全量同步,所以需要考慮增量同步問題。 缺點: 耦合性高,服務(wù)之間會相互影響 依賴消息隊列的可靠性 啟動:端口8099

    2024年02月11日
    瀏覽(22)
  • 一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實現(xiàn)辦法和系統(tǒng)

    一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實現(xiàn)辦法和系統(tǒng)

    本文分享自天翼云開發(fā)者社區(qū)《一種Mysql和Mongodb數(shù)據(jù)同步到Elasticsearch的實現(xiàn)辦法和系統(tǒng)》,作者:l****n 核心流程如下: ? 核心邏輯說明: MySQL Binlog解析 : 首先,從MySQL的二進(jìn)制日志(Binlog)中解析出表名。這一步驟非常關(guān)鍵,因為我們只關(guān)注特定表的數(shù)據(jù)變更。 進(jìn)一步,我

    2024年02月05日
    瀏覽(16)
  • Elasticsearch 系列(六)- ES數(shù)據(jù)同步和ES集群

    Elasticsearch 系列(六)- ES數(shù)據(jù)同步和ES集群

    本章將和大家分享ES的數(shù)據(jù)同步方案和ES集群相關(guān)知識。廢話不多說,下面我們直接進(jìn)入主題。 1、數(shù)據(jù)同步問題 Elasticsearch中的酒店數(shù)據(jù)來自于mysql數(shù)據(jù)庫,因此mysql數(shù)據(jù)發(fā)生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與mysql之間的數(shù)據(jù)同步。 在微服務(wù)中,負(fù)責(zé)酒

    2024年04月28日
    瀏覽(23)
  • flinkcdc同步完全量數(shù)據(jù)就不同步增量數(shù)據(jù)了

    使用flinkcdc同步mysql數(shù)據(jù),使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量階段同步完成之后,發(fā)現(xiàn)并不開始同步增量數(shù)據(jù),原因有以下兩個: 1.mysql中對應(yīng)的數(shù)據(jù)庫沒有開啟binlog 在/etc/my.cnf配置文件中,在[ mysqld ]添加以下內(nèi)容 然后重啟數(shù)據(jù)庫 ,執(zhí)行命令 和chec

    2024年02月11日
    瀏覽(18)
  • DataX實現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    DataX實現(xiàn)Mysql與ElasticSearch(ES)數(shù)據(jù)同步

    jdk1.8及以上 python2 查看是否安裝成功 查看python版本號,判斷是否安裝成功 在datax/job下,json格式,具體內(nèi)容及主要配置含義如下 mysqlreader為讀取mysql數(shù)據(jù)部分,配置mysql相關(guān)信息 username,password為數(shù)據(jù)庫賬號密碼 querySql:需要查詢數(shù)據(jù)的sql,也可通過colums指定需要查找的字段(

    2024年02月05日
    瀏覽(22)
  • elasticsearch(ES)分布式搜索引擎04——(數(shù)據(jù)聚合,自動補(bǔ)全,數(shù)據(jù)同步,ES集群)

    elasticsearch(ES)分布式搜索引擎04——(數(shù)據(jù)聚合,自動補(bǔ)全,數(shù)據(jù)同步,ES集群)

    **聚合(aggregations)**可以讓我們極其方便的實現(xiàn)對數(shù)據(jù)的統(tǒng)計、分析、運(yùn)算。例如: 什么品牌的手機(jī)最受歡迎? 這些手機(jī)的平均價格、最高價格、最低價格? 這些手機(jī)每月的銷售情況如何? 實現(xiàn)這些統(tǒng)計功能的比數(shù)據(jù)庫的sql要方便的多,而且查詢速度非???,可以實現(xiàn)近

    2024年02月08日
    瀏覽(36)
  • FlinkCDC 入門之?dāng)?shù)據(jù)同步和故障恢復(fù)

    FlinkCDC 入門之?dāng)?shù)據(jù)同步和故障恢復(fù)

    FlinkCDC 是一款基于 Change Data Capture(CDC)技術(shù)的數(shù)據(jù)同步工具,可以用于將關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)實時同步到 Flink 流處理中進(jìn)行實時計算和分析,下圖來自官網(wǎng)的介紹。 下圖 1 是 FlinkCDC 與其它常見 開源 CDC 方案的對比: 可以看見的是相比于其它開源產(chǎn)品,F(xiàn)linkCDC 不僅支持增

    2024年02月07日
    瀏覽(17)
  • 【ElasticSearch】ES與MySQL數(shù)據(jù)同步方案及Java實現(xiàn)

    【ElasticSearch】ES與MySQL數(shù)據(jù)同步方案及Java實現(xiàn)

    elasticsearch中的酒店數(shù)據(jù)來自于mysql數(shù)據(jù)庫,當(dāng)mysql中的數(shù)據(jù)發(fā)生改變時,es中的數(shù)據(jù)也要跟著改變,即es與mysql之間的數(shù)據(jù)同步。 操作mysql的微服務(wù)hotel-admin不能直接更新es的索引庫,那就由操作es索引庫的微服務(wù)hotel-demo來暴露一個更新索引庫的接口給hotel-admin調(diào)用 同步調(diào)用方式

    2024年02月15日
    瀏覽(25)
  • 使用kettle同步全量數(shù)據(jù)到Elasticsearch(es)--elasticsearch-bulk-insert-plugin應(yīng)用

    使用kettle同步全量數(shù)據(jù)到Elasticsearch(es)--elasticsearch-bulk-insert-plugin應(yīng)用

    為了前端更快地進(jìn)行數(shù)據(jù)檢索,需要將數(shù)據(jù)存儲到es中是一個很不錯的選擇。由于公司etl主要工具是kettle,這里介紹如何基于kettle的elasticsearch-bulk-insert-plugin插件將數(shù)據(jù)導(dǎo)入es。在實施過程中會遇到一些坑,這里記錄解決方案。 可能會遇到的報錯: 1、No elasticSearch nodes found 2、

    2024年02月01日
    瀏覽(27)
  • FlinkCDC系列:數(shù)據(jù)同步對部分字段的處理,只更新部分字段

    在flinkCDC源數(shù)據(jù)配置中,只對表中的部分字段關(guān)注,通過監(jiān)控部分字段進(jìn)行數(shù)據(jù)更新或者不更新,對數(shù)據(jù)進(jìn)行同步。主要通過以下兩個參數(shù): column.exclude.list 默認(rèn): 空字符串 一個可選的、以逗號分隔的正則表達(dá)式列表,與列的完全限定名稱匹配以從更改事件記錄值中排除。列

    2024年02月06日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包