Elastic MySQL 連接器是 MySQL 數(shù)據(jù)源的連接器。它可以幫我們把 MySQL 里的數(shù)據(jù)同步到 Elasticsearch 中去。在今天的文章里,我來詳細(xì)地描述如何一步一步地實(shí)現(xiàn)。
在下面的展示中,我將使用 Elastic Stack 8.8.2 來進(jìn)行展示。
無縫集成:將 Elasticsearch 連接到 MongoDB
Enterprise:使用 MySQL connector 同步 MySQL 數(shù)據(jù)到 Elasticsearch
可用性和先決條件
此連接器在 Elastic 版本 8.5.0 及更高版本中作為本機(jī)連接器提供。 要將此連接器用作本機(jī)連接器,請滿足所有本機(jī)連接器?(Native Connector)要求。
此連接器也可用作 Python 連接器框架的連接器客戶端。 要將此連接器用作連接器客戶端,請滿足所有連接器客戶端要求。
除了上面鏈接的共享要求之外,此連接器沒有其他先決條件。
用法
要將此連接器用作本機(jī)連接器,請使用連接器工作流程。 請參閱本機(jī)連接器。
要將此連接器用作連接器客戶端,請參閱連接器客戶端和框架。
在如下的展示中,我將使用連接器客戶端來進(jìn)行使用。
安裝
Elasticsearch
我們可參考我之前的文章 “如何在 Linux,MacOS 及 Windows 上進(jìn)行安裝 Elasticsearch” 來安裝 Elasticsearch。特別地,我們需要按照 Elastic Stack 8.x 的安裝指南來進(jìn)行安裝。
在 Elasticsearch 終端輸出中,找到 elastic 用戶的密碼和 Kibana 的注冊令牌。 這些是在 Elasticsearch 第一次啟動時打印的。
我們記下這個密碼,并在下面的配置中進(jìn)行使用。同時它也會生成相應(yīng)的證書文件:
$ pwd
/Users/liuxg/elastic/elasticsearch-8.8.2/config/certs
$ ls
http.p12 http_ca.crt transport.p12
為了方便下面的配置,我們把 http_ca.crt 證書拷貝到如下的一個目錄中:
mkdir -p ~/connectors-python-config
cp http_ca.crt ~/connectors-python-config
保存密碼、注冊令牌和證書路徑名。 你將在后面的步驟中需要它們。如果你對這些操作還不是很熟的話,請參考我之前的文章 “Elastic Stack 8.0 安裝 - 保護(hù)你的 Elastic Stack 現(xiàn)在比以往任何時候都簡單”。
安裝 Kibana
我們接下來安裝 Kibana。我們可以參考我之前的文章 “如何在 Linux,MacOS 及 Windows 上安裝 Elastic 棧中的 Kibana” 來進(jìn)行我們的安裝。特別地,我們需要安裝 Kibana 8.2 版本。如果你還不清楚如何安裝 Kibana 8.2,那么請閱讀我之前的文章 “Elastic Stack 8.0 安裝 - 保護(hù)你的 Elastic Stack 現(xiàn)在比以往任何時候都簡單”。在啟動 Kibana 之前,我們可以修改 Kibana 的配置文件如下。添加如下的句子到 config/kibana.yml 中去:
config/kibana.yml
enterpriseSearch.host: http://localhost:3002
然后,我們使用如下的命令來啟動 Kibana:
bin/kibana
我們在瀏覽器中輸入上面輸出的地址然后輸入相應(yīng)的 enrollment token 就可以把 Kibana 啟動起來。
Java安裝
你需要安裝 Java。版本在?Java 8?或者?Java 11。我們可以參考鏈接來查找需要的 Java 版本。
Enterprise search?安裝
我們在地址 Download Elastic Enterprise Search | Elastic 找到我們需要的版本進(jìn)行下載。并按照頁面上相應(yīng)的指令來進(jìn)行按照。如果你想針對你以前的版本進(jìn)行安裝的話,請參閱地址 https://www.elastic.co/downloads/past-releases#app-search。
等我們下載完 Enterprise Search 的安裝包,我們可以使用如下的命令來進(jìn)行解壓縮:
$ pwd
/Users/liuxg/elastic
$ ls
elasticsearch-8.8.2 kibana-8.8.2
elasticsearch-8.8.2-darwin-aarch64.tar.gz kibana-8.8.2-darwin-aarch64.tar.gz
enterprise-search-8.8.2.tar.gz
$ tar xzf enterprise-search-8.8.2.tar.gz
$ cd enterprise-search-8.8.2
$ ls
LICENSE NOTICE.txt README.md bin config lib metricbeat
如上所示,它含有一個叫做 config 的目錄。我們在啟動? Enterprise Search 之前,必須做一些相應(yīng)的配置。我們需要修改 config/enterprise-search.yml 文件。在這個文件中添加如下的內(nèi)容:
config/enterprise-search.yml
allow_es_settings_modification: true
secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
elasticsearch.username: elastic
elasticsearch.password: "JUYrx8L3WOeG6zysQY2D"
elasticsearch.host: https://127.0.0.1:9200
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.8.2/config/certs/http_ca.crt
kibana.external_url: http://localhost:5601
在上面,請注意 elasticsearch.password 是我們在 Elasticsearch 安裝過程中生成的密碼。elasticsearch.ssl.certificate_authority 必須根據(jù)自己的 Elasticsearch 安裝路徑中生成的證書進(jìn)行配置。在上面的配資中,我們還沒有配置 secret_management.encryption_keys。我們可以使用上面的配置先運(yùn)行,然后讓系統(tǒng)幫我們生成。在配置上面的密碼時,我們需要添加上引號。我發(fā)現(xiàn)在密碼中含有 * 字符會有錯誤的信息。我們使用如下的命令來啟動:
bin/enterprise-search
在啟動的過程中,我們可以看到生成的用戶名及密碼信息:
username: enterprise_search
password: r9kcpyb5x2g9dken
我們記下這個用戶名及密碼。在啟動的過程中,我們還可以看到一個生成的?secret_session_key:
我們也把它拷貝下來,并添加到配置文件中去:
allow_es_settings_modification: true
secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
elasticsearch.username: elastic
elasticsearch.password: "JUYrx8L3WOeG6zysQY2D"
elasticsearch.host: https://127.0.0.1:9200
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.8.2/config/certs/http_ca.crt
kibana.external_url: http://localhost:5601
secret_session_key: 3a6d8ab8993a9818728eabd6513fd1c448263be6f5497c8d286bc8be05b87edffd95073582e3277f1e8fb8f753a3ab07a5749ce4394a16f69bdc4acb3d2826ae
feature_flag.elasticsearch_search_api: true
為了能夠使得我們能夠在 App Search 中使用 Elasticsearch 搜索,我們必須設(shè)置
feature_flag.elasticsearch_search_api: true。?我們再次重新啟動 enterprise search:
./bin/enterprise-search
這次啟動后,我們再也不會看到任何的配置輸出了。這樣我們的 enterprise search 就配置好了。
MySQL
對于本教程,你需要一個供 Logstash 讀取的源 MySQL 實(shí)例。 MySQL Community Downloads 站點(diǎn)的?MySQL Community Server?部分提供了免費(fèi)版本的 MySQL。我們可以通過如下的命令來登錄 MySQL:
mysql -u root -p
在上面,我們使用 root 的密碼來進(jìn)行登錄。針對我的情況,密碼為 1234。等我們登錄后,我們運(yùn)行如下的命令:
CREATE DATABASE sample_db;
USE sample_db;
CREATE TABLE person (
person_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
age INT
);
CREATE TABLE address (
address_id INT AUTO_INCREMENT PRIMARY KEY,
address VARCHAR(255)
);
INSERT INTO person (name, age) VALUES ('Alice', 30);
INSERT INTO person (name, age) VALUES ('Bob', 25);
INSERT INTO person (name, age) VALUES ('Carol', 35);
INSERT INTO address (address) VALUES ('123 Elm St');
INSERT INTO address (address) VALUES ('456 Oak St');
INSERT INTO address (address) VALUES ('789 Pine St');
在上面,我們創(chuàng)建了數(shù)據(jù)庫 sample_db,也同時創(chuàng)建了兩個表格 address 及 person。
?
同步數(shù)據(jù)到 Elasticsearch
步驟一:下載示例配置文件
下載示例配置文件。 你可以手動下載或運(yùn)行以下命令:
curl https://raw.githubusercontent.com/elastic/connectors-python/main/config.yml --output ~/connectors-python-config/config.yml
我們可以查看文件:
$ pwd
/Users/liuxg/connectors-python-config
$ ls
config.yml http_ca.crt
如果你的目錄名稱不同,或者你想使用不同的配置文件名,請記住更新 --output 參數(shù)值。
步驟二:更新自管理連接器的配置文件
使用以下設(shè)置更新配置文件以匹配你的環(huán)境:
elasticsearch.host
elasticsearch.password
connector_id
service_type
使用 mysql 作為 service_type 值。 不要忘記取消 yaml 文件源部分中 mysql 的注釋。
如果你針對 Elasticsearch 和 Kibana 的 Docker 化版本運(yùn)行連接器服務(wù),你的配置文件將如下所示:
elasticsearch:
host: http://host.docker.internal:9200
username: elastic
password: <YOUR_PASSWORD>
connector_id: <CONNECTOR_ID_FROM_KIBANA>
service_type: mysql
sources:
# UNCOMMENT "mysql" below to enable the MySQL connector
#mongodb: connectors.sources.mongo:MongoDataSource
#s3: connectors.sources.s3:S3DataSource
#dir: connectors.sources.directory:DirectoryDataSource
#mysql: connectors.sources.mysql:MySqlDataSource
#network_drive: connectors.sources.network_drive:NASDataSource
#google_cloud_storage: connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource
#azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
#postgresql: connectors.sources.postgresql:PostgreSQLDataSource
#oracle: connectors.sources.oracle:OracleDataSource
#mssql: connectors.sources.mssql:MSSQLDataSource
請注意,你下載的配置文件可能包含更多條目,因此你需要手動復(fù)制/更改適用于您的設(shè)置。 通常,你只需要更新 elasticsearch.host、elasticsearch.password、connector_id 和 service_type 即可運(yùn)行連接器服務(wù)。
我們來從 Kibana 界面得到這些配置:
?
~/connectors-python-config/config.yml
elasticsearch:
host: https://192.168.0.3:9200
api_key: "OUkyM1E0a0JrWktfLVd2OTRPZkE6TmkxbUNuN3dROGlrT2cwWlNVaEZKQQ=="
ca_certs: "/usr/share/certs/http_ca.crt"
ssl: true
bulk:
queue_max_size: 1024
queue_max_mem_size: 25
display_every: 100
chunk_size: 1000
max_concurrency: 5
chunk_max_mem_size: 5
concurrent_downloads: 10
request_timeout: 120
max_wait_duration: 120
initial_backoff_duration: 1
backoff_multiplier: 2
log_level: info
service:
idling: 30
heartbeat: 300
max_errors: 20
max_errors_span: 600
max_concurrent_content_syncs: 1
max_concurrent_access_control_syncs: 1
job_cleanup_interval: 300
log_level: INFO
connector_id: '8423Q4kBkZK_-Wv9z-en'
service_type: 'mysql'
sources:
# mongodb: connectors.sources.mongo:MongoDataSource
# s3: connectors.sources.s3:S3DataSource
# dir: connectors.sources.directory:DirectoryDataSource
mysql: connectors.sources.mysql:MySqlDataSource
# network_drive: connectors.sources.network_drive:NASDataSource
# google_cloud_storage: connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource
# google_drive: connectors.sources.google_drive:GoogleDriveDataSource
# azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
# postgresql: connectors.sources.postgresql:PostgreSQLDataSource
# oracle: connectors.sources.oracle:OracleDataSource
# sharepoint_server: connectors.sources.sharepoint_server:SharepointServerDataSource
# mssql: connectors.sources.mssql:MSSQLDataSource
# jira: connectors.sources.jira:JiraDataSource
# confluence: connectors.sources.confluence:ConfluenceDataSource
# dropbox: connectors.sources.dropbox:DropboxDataSource
# servicenow: connectors.sources.servicenow:ServiceNowDataSource
# sharepoint_online: connectors.sources.sharepoint_online:SharepointOnlineDataSource
# github: connectors.sources.github:GitHubDataSource
在上面,請注意:
-
host 是 Elasticsearch 的訪問地址
-
api_key 是用來訪問 Elasticsearch 的 API key。如果你使用用戶名和密碼組合,這個就不需要了
-
ca_certs 是用來訪問 Elasticsearch 的證書。這個是針對 self-managed 的 Elasticsearch 集群而言的
-
sevice_type 必須是 mysql
-
connector_id 是在上面的配置中生成的。用來標(biāo)識該連接器
步驟三:運(yùn)行 Docker 鏡像
docker run \
-v ~/connectors-python-config:/config \
--volume="$PWD/http_ca.crt:/usr/share/certs/http_ca.crt:ro" \
--network "elastic" \
--tty \
--rm \
docker.elastic.co/enterprise-search/elastic-connectors:8.8.2.0-SNAPSHOT \
/app/bin/elastic-ingest \
-c /config/config.yml
當(dāng)運(yùn)行完上面的命令后,我們再次回到 Kibana 的界面:
接下來我們來配置 MySQL。由于我們的連接器客戶端是在 docker 容器里運(yùn)行的,二我們的 MySQL 只能在 localhost:3306 進(jìn)行訪問。容器里的代碼是沒有辦法訪問到外面的 localhost 地址的。為此,我參照之前的文章 “Kibana:創(chuàng)建一個 webhook alert - Elastic Stack 8.2”。運(yùn)行如下的命令:
bore local 3306 --to bore.pub
這樣 MySQL 就可以被一個公網(wǎng)地址?bore.pub:3332 所訪問。我們接下來使用這個地址來進(jìn)行配置:
我們定于每天的 UTC 零點(diǎn)時間來進(jìn)行同步。當(dāng)然,我們也可以選擇不定時同步。點(diǎn)擊 Save:
我們點(diǎn)擊上面的 Sync:
為了驗(yàn)證它是否能夠正確地同步新的文檔,我們在 MySQL 中添加一個新的文檔:
我們在 Kibana 中再次手動 Sync:
?
由于一些原因,在測試中,我發(fā)現(xiàn)在最新的 connector 發(fā)布中,它含有 Sync rules,而在我運(yùn)行的版本中是沒有的。它需要在最新的發(fā)布版中才有,但是 snapshot 的運(yùn)行中有一個錯誤。如果你沒有看到這個 Sync rules,請使用如下的命令來啟動:
POST .elastic-connectors/_update/YOUR_CONNECTOR_ID
{
"doc": {
"features": {
"sync_rules": {
"advanced": {
"enabled": true
},
"basic": {
"enabled": true
}
}
}
}
}
我們可以通過? Sync rule 來同步我們需要的數(shù)據(jù),比如:
[
{
"tables": [
"person"
],
"query": "SELECT * FROM sample_db.person LIMIT 1;"
},
{
"tables": [
"address"
],
"query": "SELECT * FROM sample_db.address LIMIT 1;"
}
]
?
這樣,當(dāng)同步的時候,它只會同步 address 及 person 里的一條數(shù)據(jù)。?
?
同樣,我們可以定義如下的 WHERE query:
[
{
"tables": ["person"],
"query": "SELECT * FROM sample_db.person WHERE sample_db.person.age > 25;"
}
]
它只會同步年齡大于 25 歲的 person 里的文檔。我們甚至可以做 JOIN?query:
[
{
"tables": ["person", "tables"],
"query": "SELECT * FROM sample_db.person INNER JOIN sample_db.address ON sample_db.person.person_id = sample_db.address.address_id;"
}
]
在 Kibana 中查看同步的過來的文檔
我們可以通過如下的方法來查找索引:
GET _cat/indices
我們可以通過如下的命令來查看它的文檔:
GET search-mysql/_search
使用 Docker 來安裝 MySQL
在上面,我們使用本機(jī)來安裝 MySQL。在實(shí)際的測試中,我們可以使用 Docker 更為方便地安裝 MySQL:
docker run --name mysql_container -p 3306:3306 -e MYSQL_ROOT_PASSWORD=changeme -e MYSQL_USER=elastic -e MYSQL_PASSWORD=changeme -d mysql:latest
授予用戶權(quán)限:
docker exec -it mysql_container mysql -u root -p
GRANT ALL PRIVILEGES ON sample_db.* TO 'elastic'@'%';
FLUSH PRIVILEGES;
創(chuàng)建數(shù)據(jù)庫及表格:
CREATE DATABASE sample_db;
USE sample_db;
CREATE TABLE person (
person_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
age INT
);
CREATE TABLE address (
address_id INT AUTO_INCREMENT PRIMARY KEY,
address VARCHAR(255)
);
INSERT INTO person (name, age) VALUES ('Alice', 30);
INSERT INTO person (name, age) VALUES ('Bob', 25);
INSERT INTO person (name, age) VALUES ('Carol', 35);
INSERT INTO address (address) VALUES ('123 Elm St');
INSERT INTO address (address) VALUES ('456 Oak St');
INSERT INTO address (address) VALUES ('789 Pine St');
在配置的時候,我們可以參考如下的內(nèi)容來進(jìn)行配置:
由于 Connector 和 MySQL 都同時運(yùn)行于 Docker 中,我們可以使用地址??http://host.docker.internal?來填寫 MySQL 的訪問地址。
使用 Connector 源碼來運(yùn)行 Connector
在 Kibana 中,它也讓我們從源碼處下載源碼并編譯運(yùn)行 Connector。下面我們來展示如何操作。我們首先使用如下的命令來下載所需要版本的源碼:
git clone --branch 8.8 https://github.com/elastic/connectors-python
在上面,我們下載 8.8 版本的源碼。我們進(jìn)入到源碼的根目錄中,并拷貝相應(yīng)的證書到該目錄中:
cd connectors-python
$ pwd
/Users/liuxg/python/connectors-python
cp ~/elastic/elasticsearch-8.8.2/config/certs/http_ca.crt .
$ ls
CHANGELOG.rst elasticsearch_connectors.egg-info
Dockerfile http_ca.crt
LICENSE include
MANIFEST.in lib
Makefile logo-enterprise-search.png
NOTICE.txt pyrightconfig.json
README.md pyvenv.cfg
README.rst requirements
bin scripts
config.yml setup.cfg
connectors setup.py
docs
在我們運(yùn)行代碼之前,我們需要修改源碼中的如下文件:
connectors/kibana.py
我們接下來使用如下的命令來進(jìn)行編譯:
我們需要配置配置如下的文件:
config.yml?
elasticsearch:
host: https://192.168.0.3:9200
api_key: "UFpFTVpJa0JnaFo2aVBURUVxczc6TmcxZ0JpeDNTM1dXNXk0ak02d1piZw=="
ca_certs: "./http_ca.crt"
ssl: true
bulk:
queue_max_size: 1024
queue_max_mem_size: 25
display_every: 100
chunk_size: 1000
max_concurrency: 5
chunk_max_mem_size: 5
concurrent_downloads: 10
request_timeout: 120
max_wait_duration: 120
initial_backoff_duration: 1
backoff_multiplier: 2
log_level: info
service:
idling: 30
heartbeat: 300
max_errors: 20
max_errors_span: 600
max_concurrent_syncs: 1
job_cleanup_interval: 300
log_level: INFO
native_service_types:
- mongodb
- mysql
- network_drive
- s3
- google_cloud_storage
- azure_blob_storage
- postgresql
- oracle
- confluence
- dir
- sharepoint
- mssql
- jira
# Connector client settings
connector_id: 'PJELZIkBghZ6iPTE9qs_'
service_type: 'mysql'
sources:
# mongodb: connectors.sources.mongo:MongoDataSource
# s3: connectors.sources.s3:S3DataSource
# dir: connectors.sources.directory:DirectoryDataSource
mysql: connectors.sources.mysql:MySqlDataSource
# network_drive: connectors.sources.network_drive:NASDataSource
# google_cloud_storage: connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource
# azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
# postgresql: connectors.sources.postgresql:PostgreSQLDataSource
# oracle: connectors.sources.oracle:OracleDataSource
# sharepoint: connectors.sources.sharepoint:SharepointDataSource
# mssql: connectors.sources.mssql:MSSQLDataSource
# jira: connectors.sources.jira:JiraDataSource
# confluence: connectors.sources.confluence:ConfluenceDataSource
這個文件和之前的那個很相似,只是因?yàn)槲覀儾辉?docker 里運(yùn)行,所以 ca_certs 的路徑不同。
我們使用如下的命令來進(jìn)行運(yùn)行:
make run
我們可以在 Kibana 中進(jìn)行如下的配置:
我們接下來按照之前的步驟運(yùn)行即可。?
我們可以看到使用源碼運(yùn)行也可以是使得數(shù)據(jù)進(jìn)行同步。文章來源:http://www.zghlxwxcb.cn/news/detail-568010.html
總結(jié)?
在本文中,我們非常詳細(xì)地描述如何使用 MySQL connector 來同步 MySQL 和 Elasticsearch 的索引。它使用起來非常方便。如果大家對 Logstash 很熟悉的話,請參閱我之前的文章 “Elastic:開發(fā)者上手指南” 中的 “數(shù)據(jù)庫數(shù)據(jù)同步”?章節(jié)。我們還可以使用 Pipeline 對數(shù)據(jù)進(jìn)行清洗。這個就不做展示了。文章來源地址http://www.zghlxwxcb.cn/news/detail-568010.html
到了這里,關(guān)于Enterprise:使用 MySQL connector 同步 MySQL 數(shù)據(jù)到 Elasticsearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!