国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用FlinkCDC從mysql同步數(shù)據(jù)到ES,并實(shí)現(xiàn)數(shù)據(jù)檢索

這篇具有很好參考價值的文章主要介紹了使用FlinkCDC從mysql同步數(shù)據(jù)到ES,并實(shí)現(xiàn)數(shù)據(jù)檢索。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

一、背景

隨著公司的業(yè)務(wù)量越來越大,查詢需求越來越復(fù)雜,mysql已經(jīng)不支持變化多樣的復(fù)雜查詢了。

于是,使用cdc捕獲MySQL的數(shù)據(jù)變化,同步到ES中,進(jìn)行數(shù)據(jù)的檢索。

一、環(huán)境準(zhǔn)備

1、創(chuàng)建ES索引

// 創(chuàng)建索引并指定映射
PUT /course
{
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text"
			},
			"label": {
				"type": "text"
			},
			"content": {
			  "type": "text"
			}
		}
	}
}

// 查詢course下所有數(shù)據(jù)(備用)
GET /course/_search
// 刪除索引及數(shù)據(jù)(備用)
DELETE /course

2、創(chuàng)建mysql數(shù)據(jù)表

CREATE TABLE `course` (
  `id` varchar(32) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `label` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

二、使用FlinkCDC同步數(shù)據(jù)

1、導(dǎo)包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.18.0</version>
</dependency>



2、demo

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


/**
 * cdc
 */
public class CDCTest {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.10")
                .port(3306)
                .databaseList("mytest")
                .tableList("mytest.course")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 開啟檢查點(diǎn)
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // 1個并行任務(wù)
            .setParallelism(1)
            .addSink(new RichSinkFunction<String>() {
                private final static ElasticSearchUtil es = new ElasticSearchUtil("192.168.56.10");
                @Override
                public void invoke(String value, Context context) throws Exception {
                    super.invoke(value, context);
                    JSONObject jsonObject = JSON.parseObject(value);
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.setOp(jsonObject.getString("op"));
                    dataInfo.setBefore(jsonObject.getJSONObject("before"));
                    dataInfo.setAfter(jsonObject.getJSONObject("after"));
                    dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
                    dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));

                    if (dataInfo.getDb().equals("mytest") && dataInfo.getTable().equals("course")) {

                        String id = dataInfo.getAfter().get("id").toString();
                        if(dataInfo.getOp().equals("d")) {
                            es.deleteById("course", id);
                        } else {
                            es.put(dataInfo.getAfter(), "course", id);
                        }
                    }
                }
            })

            .setParallelism(1); // 對接收器使用并行性1來保持消息順序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.util.Map;

/**
 * 收集的數(shù)據(jù)類型
 * @author cuixiangfei
 * @since 20234-03-20
 */
public class DataInfo {

    // 操作 c是create;u是update;d是delete;r是read
    private String op;

    private String db;

    private String table;

    private Map<String, Object> before;

    private Map<String, Object> after;


    public String getOp() {
        return op;
    }

    public void setOp(String op) {
        this.op = op;
    }

    public String getDb() {
        return db;
    }

    public void setDb(String db) {
        this.db = db;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    public boolean checkOpt() {
        if (this.op.equals("r")) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "DataInfo{" +
                "op='" + op + '\'' +
                ", db='" + db + '\'' +
                ", table='" + table + '\'' +
                ", before=" + before +
                ", after=" + after +
                '}';
    }

    public static void main(String[] args) {
        String value = "{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";
        JSONObject jsonObject = JSON.parseObject(value);

        System.out.println(jsonObject.get("op"));
        System.out.println(jsonObject.get("before"));
        System.out.println(jsonObject.get("after"));
        System.out.println(jsonObject.getJSONObject("source").get("db"));
        System.out.println(jsonObject.getJSONObject("source").get("table"));
    }
}

3、es工具類

springboot集成elasticSearch(附帶工具類)

三、測試

1、先創(chuàng)建幾條數(shù)據(jù)

INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('1', '11', '111', '1111');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('2', '22 33', '222 333', '2222 3333');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('3', '33 44', '33 444', '3333 4444');

2、啟動cdc

3、查詢es

cdc mysql es,大數(shù)據(jù):Flink,mysql,elasticsearch,數(shù)據(jù)庫文章來源地址http://www.zghlxwxcb.cn/news/detail-850257.html

4、增刪改幾條數(shù)據(jù)進(jìn)行測驗(yàn)

到了這里,關(guān)于使用FlinkCDC從mysql同步數(shù)據(jù)到ES,并實(shí)現(xiàn)數(shù)據(jù)檢索的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • ES大量數(shù)據(jù)條件檢索準(zhǔn)確性問題

    事例:如查詢0~1000的結(jié)果集時,分頁查詢亂序,會搜索出來,也會搜索不出來,目前查詢到的結(jié)果是分片不一致導(dǎo)致的,需要指定唯一分片查詢 ES中基于分片的搜索方式,是分2個階段進(jìn)行的,即Query階段和Fetch階段。 ES的搜索類型有2種; query then fetch(默認(rèn)的搜索方式) 基于

    2024年02月04日
    瀏覽(13)
  • 向量數(shù)據(jù)庫入坑:傳統(tǒng)文本檢索方式的降維打擊,使用 Faiss 實(shí)現(xiàn)向量語義檢索

    在上一篇文章《聊聊來自元宇宙大廠 Meta 的相似度檢索技術(shù) Faiss》中,我們有聊到如何快速入門向量檢索技術(shù),借助 Meta AI(Facebook Research)出品的 faiss 實(shí)現(xiàn)“最基礎(chǔ)的文本內(nèi)容相似度檢索工具”,初步接觸到了“語義檢索”這種對于傳統(tǒng)文本檢索方式具備“降維打擊”的新

    2024年02月16日
    瀏覽(96)
  • 向量數(shù)據(jù)庫:usearch的簡單使用+實(shí)現(xiàn)圖片檢索應(yīng)用

    向量數(shù)據(jù)庫:usearch的簡單使用+實(shí)現(xiàn)圖片檢索應(yīng)用

    usearch是快速開源搜索和聚類引擎×,用于C++、C、Python、JavaScript、Rust、Java、Objective-C、Swift、C#、GoLang和Wolfram ??中的向量和??字符串× 一個簡單的例子(注:本例子在運(yùn)行時向index中不斷添加項目,并將最后的index持久化為一個文件,在運(yùn)行時由于添加項目內(nèi)存占用會不斷增

    2024年02月02日
    瀏覽(97)
  • MySQL檢索數(shù)據(jù)和排序數(shù)據(jù)

    MySQL檢索數(shù)據(jù)和排序數(shù)據(jù)

    目錄 一、select語句 1.檢索單個列(SELECT 列名 FROM 表名;) 2.檢索多個列(SELECT 列名1,列名2,列名3? FROM 表名;) ?3.檢索所有的列(SELECT * FROM 表名;) 4.檢索不同的行(SELECT 列名 FROM 表名;) 5.限制結(jié)果(SELECT 列名 FROM 表名 LIMIT 行數(shù);) 6.使用完全限定的表名(SELECT 表名.列名 F

    2024年02月15日
    瀏覽(26)
  • Spring AI - 使用向量數(shù)據(jù)庫實(shí)現(xiàn)檢索式AI對話

    Spring AI - 使用向量數(shù)據(jù)庫實(shí)現(xiàn)檢索式AI對話

    ?Spring AI 并不僅限于針對大語言模型對話API進(jìn)行了統(tǒng)一封裝,它還可以通過簡單的方式實(shí)現(xiàn)LangChain的一些功能。本篇將帶領(lǐng)讀者實(shí)現(xiàn)一個簡單的檢索式AI對話接口。 ?在一些場景下,我們想讓AI根據(jù)我們提供的數(shù)據(jù)進(jìn)行回復(fù)。因?yàn)閷υ捰凶畲骉oken的限制,因此很多場景下我們

    2024年04月14日
    瀏覽(93)
  • 深入學(xué)習(xí)MYSQL-數(shù)據(jù)檢索

    深入學(xué)習(xí)MYSQL-數(shù)據(jù)檢索

    前言 由于大部分基礎(chǔ)知識都已經(jīng)學(xué)過了,這里只把覺得應(yīng)該記錄一下的知識點(diǎn)做個筆記。然后以下筆記和sql均來自書籍(MYSQL必會知識),會根據(jù)看的其它書記繼續(xù)調(diào)整和優(yōu)化筆記。 LIMIT 注:這個平時的SQL查詢沒有什么區(qū)別,我主要展示一下在命令行里面怎么展示結(jié)果。 總共8條

    2024年02月05日
    瀏覽(23)
  • MySQL正則表達(dá)式檢索數(shù)據(jù)

    MySQL正則表達(dá)式檢索數(shù)據(jù)

    目錄 一、使用正則表達(dá)式進(jìn)行基本字符匹配 1.使用regexp 2.使用正則表達(dá)式? .? 二、進(jìn)行OR匹配 1.為搜索兩個串之一,使用? ?|?? 2.匹配幾個字符之一[] 3.匹配范圍 ?4.匹配特殊字符 過濾數(shù)據(jù)允許使用 匹配、比較、通配符 操作來尋找數(shù)據(jù),但是隨著過濾條件的復(fù)雜性增

    2024年02月14日
    瀏覽(24)
  • 【MySQL】一文帶你了解檢索數(shù)據(jù)

    【MySQL】一文帶你了解檢索數(shù)據(jù)

    ?? 博客主頁:博主鏈接 ?? 本文由 M malloc 原創(chuàng),首發(fā)于 CSDN?? ?? 學(xué)習(xí)專欄推薦:LeetCode刷題集! ?? 歡迎點(diǎn)贊 ?? 收藏 ?留言 ?? 如有錯誤敬請指正! ?? 未來很長,值得我們?nèi)Ρ几案篮玫纳? ------------------??分割線??------------------------- —————————

    2024年02月09日
    瀏覽(20)
  • 【MySQL多表查詢】:讓你的數(shù)據(jù)檢索更高效

    【MySQL多表查詢】:讓你的數(shù)據(jù)檢索更高效

    前言 ? 歡迎來到小K的MySQL專欄,本節(jié)將為大家?guī)?MySQL 中多表查詢相關(guān)知識的講解 一、多表關(guān)系 ?項目開發(fā)中,在進(jìn)行數(shù)據(jù)庫表結(jié)構(gòu)設(shè)計時,會根據(jù)業(yè)務(wù)需求及業(yè)務(wù)模塊之間的關(guān)系,分析并設(shè)計表結(jié)構(gòu),由于業(yè)務(wù)之間相互關(guān)聯(lián),所以各個表結(jié)構(gòu)之間也存在著各種聯(lián)系,基

    2024年02月09日
    瀏覽(20)
  • 這些年Web前端面試的那些套路,優(yōu)化后,ES-做到了幾十億數(shù)據(jù)檢索-3-秒返回,前端音頻框架

    這些年Web前端面試的那些套路,優(yōu)化后,ES-做到了幾十億數(shù)據(jù)檢索-3-秒返回,前端音頻框架

    默認(rèn)情況下 routing參數(shù)是文檔ID (murmurhash3),可通過 URL中的 _routing 參數(shù)指定數(shù)據(jù)分布在同一個分片中,index和search的時候都需要一致才能找到數(shù)據(jù)。 如果能明確根據(jù)_routing進(jìn)行數(shù)據(jù)分區(qū),則可減少分片的檢索工作,以提高性能 。 在我們的案例中,查詢字段都是固定的,不提供全

    2024年04月26日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包