国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Apache Kafka 基于 S3 的數(shù)據(jù)導出、導入、備份、還原、遷移方案

這篇具有很好參考價值的文章主要介紹了Apache Kafka 基于 S3 的數(shù)據(jù)導出、導入、備份、還原、遷移方案。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

s3數(shù)據(jù)遷移,大數(shù)據(jù)專題,kafka,s3,導出,導入,備份,還原,遷移 博主歷時三年精心創(chuàng)作的《大數(shù)據(jù)平臺架構與原型實現(xiàn):數(shù)據(jù)中臺建設實戰(zhàn)》一書現(xiàn)已由知名IT圖書品牌電子工業(yè)出版社博文視點出版發(fā)行,點擊《重磅推薦:建大數(shù)據(jù)平臺太難了!給我發(fā)個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。
在系統(tǒng)升級或遷移時,用戶常常需要將一個 Kafka 集群中的數(shù)據(jù)導出(備份),然后在新集群或另一個集群中再將數(shù)據(jù)導入(還原)。通常,Kafka集群間的數(shù)據(jù)復制和同步多采用 Kafka MirrorMaker,但是,在某些場景中,受環(huán)境限制,兩個于 Kafka 集群之間的網(wǎng)絡可能無法連通,亦或是需要將 Kafka 的數(shù)據(jù)沉淀為文件存儲以備他用。此時,基于 Kafka Connect S3 Source / Sink Connector 的方案會是一種較為合適的選擇,本文就將介紹一下這一方案的具體實現(xiàn)。

數(shù)據(jù)的導出、導入、備份、還原通常都是一次性操作,為此搭建完備持久的基礎設施并無太大必要,省時省力,簡單便捷才是優(yōu)先的考量因素。為此,本文將提供一套開箱即用的解決方案,方案使用 Docker 搭建 Kafka Connect,所有操作均配備自動化 Shell 腳本,用戶只需設置一些環(huán)境變量并執(zhí)行相應腳本即可完成全部工作。這種基于 Docker 的單體模式可以應對中小型規(guī)模的數(shù)據(jù)同步和遷移,如果要尋求穩(wěn)定、健壯的解決方案,可以考慮將 Docker 版本的 Kafka Connect 遷移到 Kubernetes 或 Amamon MSK Connect,實現(xiàn)集群化部署。

1. 整體架構

首先介紹一下方案的整體架構。導出/導入和備份/還原其實是兩種高度類似的場景,但為了描述清晰,我們還是分開討論。先看一下導出/導入的架構示意圖:

s3數(shù)據(jù)遷移,大數(shù)據(jù)專題,kafka,s3,導出,導入,備份,還原,遷移

圖1. Kafka集群間的數(shù)據(jù)導出/導入

在這個架構中,Source 端的 Kafka 是數(shù)據(jù)流的起點,安裝了 S3 Sink Connector 的 Kafka Connect 會從 Source 端的 Kafka 中提取指定 Topic 的數(shù)據(jù),然后以 Json 或 Avro 文件的形式存儲到 S3 上;同時,另一個安裝了 S3 Source Connector 的 Kafka Connect 會從 S3 上讀取這些 Json 或 Avro 文件,然后寫入到 Sink 端 Kafka 的對應 Topic 中。如果 Source 端和 Sink 端的 Kafka 集群不在同一個 Region,可以在各自的 Region 分別完成導入和導出,然后在兩個 Region 之間使用 S3 的 Cross-Rejion Replication 進行數(shù)據(jù)同步。

該架構只需進行簡單的調整,即可用于 Kafka 集群的備份/還原,如下圖所示:先將 Kafka 集群的數(shù)據(jù)備份到 S3 上,待完成集群的升級、遷移或重建工作后,再從 S3 上將數(shù)據(jù)恢復到新建集群即可。
s3數(shù)據(jù)遷移,大數(shù)據(jù)專題,kafka,s3,導出,導入,備份,還原,遷移

圖2. Kafka集群的數(shù)據(jù)備份/還原

本文將以圖1所示的導出/導入架構為準給出完整的環(huán)境搭建說明和實操腳本,圖2所示的備份/還原架構同樣可以基于本文提供的指導和腳本實現(xiàn)。

2. 預設條件

本文聚焦于 Kafka Connect 的數(shù)據(jù)導出/導入和備份/還原操作,限于篇幅,無法詳細介紹架構中每個組件的搭建和配置方法,因此有如下預設條件需讀者在個人環(huán)境中提前準備:

① 一臺基于 Amazon Linux2 的 EC2 實例(建議新建純凈實例),本文所有的實操腳本都將在該實例上執(zhí)行,該實例也是運行 Kafka Connect Docker Container 的宿主機

② 兩個 Kafka 集群,一個作為 Source,一個作為 Sink;如果只有一個 Kafka 集群也可完成驗證,該集群將既作 Source 又作Sink

③ 為聚焦 Kafka Connect S3 Source / Sink Connector 的核心配置,我們預設 Kafka 集群沒有開啟身份認證(即認證類型為 Unauthenticated),數(shù)據(jù)傳輸方式為 PLAINTEXT,以便簡化 Kafka Connect 的連接配置

④ 網(wǎng)絡連通性上要求 EC2 實例能訪問 S3、Source 端 Kafka 集群、Sink 端 Kafka 集群 。如果在實際環(huán)境中無法同時連通 Source 端和 Sink 端,則可以在兩臺分屬于不同網(wǎng)絡的 EC2 上進行操作,但它們必須都能訪問 S3。如果是跨 Region 或賬號隔離,則另需配置 S3 Cross-Region Replication 或手動拷貝數(shù)據(jù)文件

3. 全局配置

由于實際操作將不可避免地依賴到具體的 AWS 賬號以及本地環(huán)境里的各項信息(如AKSK,服務地址,各類路徑,Topic 名稱等),為了保證本文給出的操作腳本具有良好的可移植性,我們將所有與環(huán)境相關的信息抽離出來,以全局變量的形式在實操前集中配置。以下就是全局變量的配置腳本,讀者需要根據(jù)個人環(huán)境設定這些變量的取值:

# account-specific configs
export REGION="<your-region>"
export S3_BUCKET="<your-s3-bucket>"
export AWS_ACCESS_KEY_ID="<your-aws-access-key-id>"
export AWS_SECRET_ACCESS_KEY="<your-aws-secret-access-key>"
export SOURCE_KAFKA_BOOTSTRAP_SEVERS="<your-source-kafka-bootstrap-servers>"
export SINK_KAFKA_BOOTSTRAP_SEVERS="<your-sink-kafka-bootstrap-servers>"
# kafka topics import and export configs
export SOURCE_TOPICS_LIST="<your-source-topic-list>"
export SINK_TOPICS_LIST="<your-sink-topic-list>"
export TOPIC_REGEX_LIST="<your-topic-regex-list>"
export SOURCE_TOPICS_REGEX="<your-source-topics-regex>"
export SINK_TOPICS_REPLACEMENT="<your-sink-topics-replacement>"    

為了便于演示和解讀,本文將使用下面的全局配置,其中前6項配置與賬號和環(huán)境強相關,仍需用戶自行修改,腳本中給出的僅為示意值,而后5項配置與 Kafka 數(shù)據(jù)的導入導出息息相關,不建議修改,因為后續(xù)的解讀將基于這里設定的值展開,待完成驗證后,您可再根據(jù)需要靈活修改后5項配置以完成實際的導入導出工作。

回到操作流程,登錄準備好的 EC2 實例,修改下面腳本中與賬號和環(huán)境相關的前6項配置,然后執(zhí)行修改后的腳本。此外,需要提醒注意的是:在后續(xù)操作中,部分腳本執(zhí)行后將不再返回,而是持續(xù)占用當前窗口輸出日志或 Kafka 消息,因此需要新開命令行窗口,每次新開窗口都需要執(zhí)行一次這里的全局配置腳本。

# 實操步驟(1): 全局配置
# account and environment configs
export REGION="us-east-1"
export S3_BUCKET="source-topics-data"
export AWS_ACCESS_KEY_ID="ABCDEFGHIGKLMNOPQRST"
export AWS_SECRET_ACCESS_KEY="abcdefghigklmnopqrstuvwxyz0123456789"
export SOURCE_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster1.6ww5j7.c1.kafka.us-east-1.amazonaws.com:9092"
export SINK_KAFKA_BOOTSTRAP_SEVERS="b-1.cluster2.2au4b8.c2.kafka.us-east-1.amazonaws.com:9092"
# kafka topics import and export configs
export SOURCE_TOPICS_LIST="source-topic-1,source-topic-2"
export SINK_TOPICS_LIST="sink-topic-1,sink-topic-2"
export TOPIC_REGEX_LIST="source-topic-1:.*,source-topic-2:.*"
export SOURCE_TOPICS_REGEX="source-topic-(\\\d)" # to be resolved to "source-topic-(\\d)" in json configs
export SINK_TOPICS_REPLACEMENT="sink-topic-\$1" # to be resolved to "sink-topic-$1" in json configs

關于上述腳本中的后5項配置,有如下詳細說明:

配置項 樣值 說明
SOURCE_TOPICS_LIST source-topic-1,source-topic-2 該值將賦給 S3 Sink Connector 的 topics 配置項,該配置用于指明要被導出的 Topic 列表(使用逗號分隔)
SINK_TOPICS_LIST sink-topic-1,sink-topic-2 該值是 Sink 端與 Source Topics 一一對應的 Sink Topics 列表(使用逗號分隔),但它并不會出現(xiàn)在 S3 Sink Connector 的配置中,因為 S3 Sink Connector 可從 S3 的目錄結構中獲知存在哪些 Source 端的 Topic,而 Sink 端的 Topic 名稱是在 Source 端 Topic 名稱基礎上使用正則表達式映射出來的,該值僅應用在創(chuàng)建 Sink 端的 Topic 的腳本中(備注:技術上是可以不設置該變量的,它的值可從SOURCE_TOPICS_LISTTOPIC_REGEX_LIST、SINK_TOPICS_REPLACEMENT解析出來,但是這樣會增加腳本的復雜度,給讀者閱讀和理解腳本造成不便)
TOPIC_REGEX_LIST source-topic-1:.*,source-topic-2:.* 該值將賦給 S3 Source Connector 的 topic.regex.list 配置項,它的格式是<topic1>:<regex1>,<topic2>:<regex2>,...,該配置的作用是告訴 S3 Source Connector 每一個 Topic 對應的哪些文件是數(shù)據(jù)文件,正則表達式用于匹配文件名(需要注意的是:正則表達式并不會用于匹配文件的中間路徑,中間路徑(例如partition=0) 是由配置項 partitioner.class 控制的, S3 Source Connector 必須使用和 S3 Sink Connector 一致的 Patitioner 才能正確匹配文件路徑
SOURCE_TOPICS_REGEX source-topic-(\\\d) 該值將賦給 S3 Source Connector 的 transforms.xxx.regex 配置項,它是 Source 端 Kafka 集群上所有 Topic 的正則表達式,該項值通常都會出現(xiàn)正則分組(group),與之關聯(lián)的SINK_TOPICS_REPLACEMENT表達式將會引用這些分組映射成 Sink 端的目標Topic
SINK_TOPICS_REPLACEMENT sink-topic-\$1 該值將賦給 S3 Source Connector 的 transforms.xxx.replacement 配置項,它是 Sink 端 Kafka 集群上所有 Topic 的正則表達式,它通常會引用SOURCE_TOPICS_REGEX中的正則分組以便映射到 Sink 端的目標 Topic 上

我們就以腳本中設定的值為例,解讀一下這5項配置聯(lián)合起來將要實現(xiàn)的功能,同時也是本文將演示的主要內容:

在 Source 端的 Kafka 集群上存在兩個名為:source-topic-1source-topic-2的Topic,通過安裝有 S3 Sink Connector 的 Kafka Connect (Docker 容器)將兩個 Topic 的數(shù)據(jù)導出到 S3 的指定存儲桶中,然后再通過安裝有 S3 Source Connector 的 Kafka Connect (Docker 容器,可以和 S3 Source Connector 共存為一個Docker 容器)將 S3 存儲桶中的數(shù)據(jù)寫入到 Sink 端的 Kafka 集群上,其中原source-topic-1的數(shù)據(jù)將被寫入sink-topic-1,原source-topic-2的數(shù)據(jù)將被寫入sink-topic-2

特別地,如果是備份/還原場景,需要保持導出/導入的 Topic 名稱一致,此時,可直接刪除 S3 Source Connector 中 以transforms開頭的4項配置(將在下文中出現(xiàn)),或者將下面兩項改為:

export SOURCE_TOPICS_REGEX=".*"
export SINK_TOPICS_REPLACEMENT="\$0"

如果您只有一個 Kafka 集群,同樣可以完成本文的驗證工作,只需將SOURCE_KAFKA_BOOTSTRAP_SEVERSSINK_KAFKA_BOOTSTRAP_SEVERS同時設置為該集群即可,這樣,該集群既是 Source 端又是 Sink 端,由于配置中的 Source Topics 和 Sink Topics 并不同名,所以不會產(chǎn)生沖突。

4. 環(huán)境準備

4.1. 安裝工具包

在 EC2 上執(zhí)行以下腳本,安裝并配置jq,yq,docker,jdk,kafka-console-client五個必須的軟件包,您可以根據(jù)自身 EC2 的情況酌情選擇安裝全部或部分軟件。建議使用純凈的 EC2 實例,完成全部的軟件安裝:

# 實操步驟(2): 安裝工具包
# install jq
sudo yum -y install jq
jq --version

# install yq
sudo wget https://github.com/mikefarah/yq/releases/download/v4.35.1/yq_linux_amd64 -O /usr/bin/yq
sudo chmod a+x /usr/bin/yq
yq --version

# install docker
sudo yum -y install docker
# enable & start docker
sudo systemctl enable docker
sudo systemctl start docker
sudo systemctl status docker
# configure docker, add current user to docker user group
# and refresh docker group to take effect immediately
sudo usermod -aG docker $USER
newgrp docker
docker --version

# install docker compose
dockerConfigDir=${dockerConfigDir:-$HOME/.docker}
mkdir -p $dockerConfigDir/cli-plugins
wget "https://github.com/docker/compose/releases/download/v2.20.3/docker-compose-$(uname -s)-$(uname -m)" -O $dockerConfigDir/cli-plugins/docker-compose
chmod a+x $dockerConfigDir/cli-plugins/docker-compose
docker compose version

# install jdk
sudo yum -y install java-1.8.0-openjdk-devel
# configure jdk
sudo tee /etc/profile.d/java.sh << EOF
export JAVA_HOME=/usr/lib/jvm/java
export PATH=\$JAVA_HOME/bin:\$PATH
EOF
# make current ssh session and other common linux users can run java cli
source /etc/profile.d/java.sh
sudo -i -u root source /etc/profile.d/java.sh || true
sudo -i -u ec2-user source /etc/profile.d/java.sh || true
java -version

# install kafka console client
kafkaClientUrl="https://archive.apache.org/dist/kafka/3.5.1/kafka_2.12-3.5.1.tgz"
kafkaClientPkg=$(basename $kafkaClientUrl)
kafkaClientDir=$(basename $kafkaClientUrl ".tgz")
wget $kafkaClientUrl -P /tmp/
sudo tar -xzf /tmp/$kafkaClientPkg -C /opt
sudo tee /etc/profile.d/kafka-client.sh << EOF
export KAFKA_CLIENT_HOME=/opt/$kafkaClientDir
export PATH=\$KAFKA_CLIENT_HOME/bin:\$PATH
EOF

# make current ssh session and other common linux users can run kakfa console cli
source /etc/profile.d/kafka-client.sh
sudo -i -u root source /etc/profile.d/kafka-client.sh || true
sudo -i -u ec2-user source /etc/profile.d/kafka-client.sh || true

# verify if kafka client available
kafka-console-consumer.sh --version

# set aksk for s3 and other aws operation
aws configure set default.region $REGION
aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY

4.2. 創(chuàng)建 S3 存儲桶

整個方案以 S3 作為數(shù)據(jù)轉儲媒介,為此需要在 S3 上創(chuàng)建一個存儲桶。Source 端 Kafka 集群的數(shù)據(jù)將會導出到該桶中并以 Json 文件形式保存,向 Sink 端 Kafka 集群導入數(shù)據(jù)時,讀取的也是存儲在該桶中的 Json 文件。

# 實操步驟(3): 創(chuàng)建 S3 存儲桶
aws s3 rm --recursive s3://$S3_BUCKET || aws s3 mb s3://$S3_BUCKET

4.3. 在源 Kafka 上創(chuàng)建 Source Topics

為了確保 Topics 數(shù)據(jù)能完整備份和還原,S3 Source Connector 建議 Sink Topics 的分區(qū)數(shù)最好與 Source Topics 保持一致(詳情參考 [ 官方文檔 ] ),如果讓 Kafka 自動創(chuàng)建 Topic,則很有可能會導致 Source Topics 和 Sink Topics 的分區(qū)數(shù)不對等,所以,我們選擇手動創(chuàng)建 Source Topics 和 Sink Topics,并確保它們的分區(qū)數(shù)一致。以下腳本將創(chuàng)建:source-topic-1source-topic-2兩個Topic,各含9個分區(qū):

# 實操步驟(4): 在源 Kafka 上創(chuàng)建 Source Topics
for topic in $(IFS=,; echo $SOURCE_TOPICS_LIST); do
    # create topic
    kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --create --topic $topic --replication-factor 3 --partitions 9
    # describe topic
    kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --describe --topic $topic
done

4.4. 在目標 Kafka 上創(chuàng)建 Sink Topics

原因同上,以下腳本將創(chuàng)建:sink-topic-1sink-topic-2兩個 Topic,各含9個分區(qū):

# 實操步驟(5): 在目標 Kafka 上創(chuàng)建 Sink Topics
for topic in $(IFS=,; echo $SINK_TOPICS_LIST); do
    # create topic
    kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --create --topic $topic --replication-factor 3 --partitions 9
    # describe topic
    kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --describe --topic $topic
done

5. 制作 Kafka Connect 鏡像

接下來是制作帶 S3 Sink Connector 和 S3 Source Connector 的 Kafka Connect 鏡像,鏡像和容器均以kafka-s3-syncer命名,以下是具體操作:

# 實操步驟(6): 制作 Kafka Connect 鏡像
# note: do NOT use current dir as building docker image context dir,
# it is advised to create a new clean dir as image building context folder.
export DOCKER_BUILDING_CONTEXT_DIR="/tmp/kafka-s3-syncer"
mkdir -p $DOCKER_BUILDING_CONTEXT_DIR

# download and unpackage s3 sink connector plugin
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.5.4/confluentinc-kafka-connect-s3-10.5.4.zip \
    -O $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-10.5.4.zip
unzip -o $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-10.5.4.zip -d $DOCKER_BUILDING_CONTEXT_DIR

# download and unpackage s3 source connector plugin
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3-source/versions/2.4.5/confluentinc-kafka-connect-s3-source-2.4.5.zip \
    -O $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-source-2.4.5.zip
unzip -o $DOCKER_BUILDING_CONTEXT_DIR/confluentinc-kafka-connect-s3-source-2.4.5.zip -d $DOCKER_BUILDING_CONTEXT_DIR

# make dockerfile
cat << EOF > Dockerfile
FROM confluentinc/cp-kafka-connect:7.5.0
# provision s3 sink connector
COPY confluentinc-kafka-connect-s3-10.5.4 /usr/share/java/confluentinc-kafka-connect-s3-10.5.4
# provision s3 source connector
COPY confluentinc-kafka-connect-s3-source-2.4.5 /usr/share/java/confluentinc-kafka-connect-s3-source-2.4.5
EOF

# build image
docker build -t kafka-s3-syncer -f Dockerfile $DOCKER_BUILDING_CONTEXT_DIR
# check if plugin is deployed in container
docker run -it --rm kafka-s3-syncer ls -al /usr/share/java/

6. 配置并啟動 Kafka Connect

鏡像制作完成后,就可以啟動了 Kafka Connect 了。Kafka Connect 有很多配置項,具體可參考其 [ 官方文檔 ] ,需要提醒注意的是:在下面的配置中,我們使用的是 Kafka Connect 內置的消息轉換器:JsonConverter,如果你的輸入/輸出格式是 Avro 或 Parquet,則需要另行安裝對應插件并設置正確的Converter Class。

# 實操步驟(7): 配置并啟動 Kafka Connect
cat << EOF > docker-compose.yml
services:
  kafka-s3-syncer:
    image: kafka-s3-syncer
    hostname: kafka-s3-syncer
    container_name: kafka-s3-syncer
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: $SOURCE_KAFKA_BOOTSTRAP_SEVERS
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-s3-syncer
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-s3-syncer
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-s3-syncer-configs
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-s3-syncer-offsets
      CONNECT_STATUS_STORAGE_TOPIC: kafka-s3-syncer-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 3
      CONNECT_PLUGIN_PATH: /usr/share/java
      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
EOF
# valid, format and print yaml with yq
yq . docker-compose.yml
docker compose up -d --wait
docker compose logs -f kafka-s3-syncer
# docker compose down # stop and remove container

上述腳本執(zhí)行后,命令窗口將不再返回,而是會持續(xù)輸出容器日志,因此下一步操作需要新開一個命令行窗口。

7. 配置并啟動 S3 Sink Connector

在第5節(jié)的操作中,我們已經(jīng)將 S3 Sink Connector 安裝到了 Kafka Connect 的 Docker 鏡像中,但是還需要顯式地配置并啟動它。新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1): 全局配置》,聲明全局變量,然后執(zhí)行以下腳本:

# 實操步驟(8): 配置并啟動 S3 Sink Connector
cat << EOF > s3-sink-connector.json
{
  "name": "s3-sink-connector",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "topics": "$SOURCE_TOPICS_LIST",
    "s3.region": "$REGION",
    "s3.bucket.name": "$S3_BUCKET",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
  }
}
EOF
# valid, format and print json with jq
jq . s3-sink-connector.json
# delete connector configs if exsiting
curl -X DELETE localhost:8083/connectors/s3-sink-connector
# submit connector configs
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @s3-sink-connector.json
# start connector
curl -X POST localhost:8083/connectors/s3-sink-connector/start
# check connector status
# very useful! if connector has errors, it will show in message.
curl -s http://localhost:8083/connectors/s3-sink-connector/status | jq

8. 配置并啟動 S3 Source Connector

同上,在第5節(jié)的操作中,我們已經(jīng)將 S3 Source Connector 安裝到了 Kafka Connect 的 Docker 鏡像中,同樣需要顯式地配置并啟動它:

# 實操步驟(9): 配置并啟動 S3 Source Connector
cat << EOF > s3-source-connector.json
{
  "name": "s3-source-connector",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "confluent.topic.bootstrap.servers": "$SOURCE_KAFKA_BOOTSTRAP_SEVERS",
    "mode": "RESTORE_BACKUP",
    "topics.dir": "topics",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "topic.regex.list": "$TOPIC_REGEX_LIST",
    "transforms": "mapping",
    "transforms.mapping.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.mapping.regex": "$SOURCE_TOPICS_REGEX",
    "transforms.mapping.replacement": "$SINK_TOPICS_REPLACEMENT",
    "s3.poll.interval.ms": "60000",
    "s3.bucket.name": "$S3_BUCKET",
    "s3.region": "$REGION"
  }
}
EOF
# valid, format and print json with jq
jq . s3-source-connector.json
# delete connector configs if exsiting
curl -X DELETE localhost:8083/connectors/s3-source-connector
# submit connector configs
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @s3-source-connector.json
# start connector
curl -X POST localhost:8083/connectors/s3-source-connector/start
# check connector status
# very useful! if connector has errors, it will show in message.
curl -s http://localhost:8083/connectors/s3-source-connector/status | jq

至此,整個環(huán)境搭建完畢,一個以 S3 作為中轉媒介的 Kafka 數(shù)據(jù)導出、導入、備份、還原鏈路已經(jīng)處于運行狀態(tài)。

9. 測試

現(xiàn)在,我們來驗證一下整個鏈路是否能正常工作。首先,使用kafka-console-consumer.sh監(jiān)控source-topic-1sink-topic-1兩個 Topic,然后使用腳本向source-topic-1持續(xù)寫入數(shù)據(jù),如果在sink-topic-1看到了相同的數(shù)據(jù)輸出,就說明數(shù)據(jù)成功地從source-topic-1導出然后又導入到了sink-topic-1中,相應的,在 S3 存儲桶中也能看到“沉淀”的數(shù)據(jù)文件。

9.1. 打開 Source Topic

新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1): 全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控source-topic-1中的數(shù)據(jù):

# 實操步驟(10): 打開 Source Topic
kafka-console-consumer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SOURCE_TOPICS_LIST%%,*}

9.2. 打開 Sink Topic

新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1): 全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控sink-topic-1中的數(shù)據(jù):

# 實操步驟(11): 打開 Sink Topic
kafka-console-consumer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SINK_TOPICS_LIST%%,*}

9.3. 向 Source Topic 寫入數(shù)據(jù)

新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1): 全局配置》,聲明全局變量,然后使用如下命令向source-topic-1中寫入數(shù)據(jù):

# 實操步驟(12): 向 Source Topic 寫入數(shù)據(jù)
# download a public dataset
wget https://data.ny.gov/api/views/5xaw-6ayf/rows.json?accessType=DOWNLOAD -O /tmp/sample.raw.json
# extract pure json data
jq -c .data /tmp/sample.raw.json > /tmp/sample.json
# feeding json records to kafka
for i in {1..100}; do
    kafka-console-producer.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --topic ${SOURCE_TOPICS_LIST%%,*} < /tmp/sample.json
done

9.4. 現(xiàn)象與結論

執(zhí)行上述寫入操作后,從監(jiān)控source-topic-1的命令行窗口中可以很快看到寫入的數(shù)據(jù),這說明 Source 端 Kafka 已經(jīng)開始持續(xù)產(chǎn)生數(shù)據(jù)了,隨后(約1分鐘),即可在監(jiān)控sink-topic-1的命令行窗口中看到相同的輸出數(shù)據(jù),這說明目標端的數(shù)據(jù)同步也已開始正常工作。此時,打開 S3 的存儲桶會發(fā)現(xiàn)大量 Json 文件,這些 Json 是由 S3 Sink Connector 從source-topic-1導出并存放到 S3 上的,然后 S3 Source Connector 又讀取了這些 Json 并寫入到了sink-topic-1中,至此,整個方案的演示與驗證工作全部結束。

10. 清理

在驗證過程中,我們可能需要多次調整并重試,每次重試最好恢復到初始狀態(tài),以下腳本會幫助我們清理所有已創(chuàng)建的資源:

# 實操步驟(13): 清理操作
docker compose down
aws s3 rm --recursive s3://$S3_BUCKET || aws s3 mb s3://$S3_BUCKET
kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --delete --topic 'sink.*|source.*|kafka-s3-syncer.*|_confluent-command'
kafka-topics.sh --bootstrap-server $SOURCE_KAFKA_BOOTSTRAP_SEVERS --list
kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --delete --topic 'sink.*|source.*|kafka-s3-syncer.*'
kafka-topics.sh --bootstrap-server $SINK_KAFKA_BOOTSTRAP_SEVERS --list

11. 小結

本方案主要定位于輕便易用,在 S3 Sink Connector 和 S3 Source Connector 中還有很多與性能、吞吐量相關的配置,例如:s3.part.sizeflush.size,s3.poll.interval.ms,tasks.max等,讀者可以在實際需要自行調整,此外, Kafka Connect 也可以方便地遷移到 Kuberetes 或 Amamon Kafka Connect 中以實現(xiàn)集群化部署。


附錄:常見錯誤

問題1:啟動 Kafka Connect 報錯:java.lang.NoSuchMethodError: 'void org.apache.kafka.connect.util.KafkaBasedLog.send

該問題發(fā)現(xiàn)于 confluentinc-kafka-connect-s3-source-2.5.7 + kafka-connect-7.5.0 上,NoSuchMethodError 錯誤一般是由于多個組件依賴到了同一個 Jar 包的不同版本,但是最終加載了低版本的 Jar 包導致的。由于Kafka Connect給出的日志信息有限,無法定位具體是哪個 Jar 包的問題,將 confluentinc-kafka-connect-s3-source 降級為 2.4.5,可解決此問題。

問題2:啟動 S3 Source Connector 時報錯:java.lang.IllegalArgumentException: Illegal group reference

該問題是由錯誤配置引起的,在配置 S3 Source Connector 時,將transforms.mapping.replacement 錯誤地配置為:sink-topic-$(1),正則分組的變量形式是:$0,$1,…,而不是:$(0), $(1),…,改為:sink-topic-$1 后問題解決

附錄:參考資料

Amazon S3 Sink Connector 官方文檔

Amazon S3 Source Connector 官方文檔

Kafka Connect Transformations :: RegexRouter文章來源地址http://www.zghlxwxcb.cn/news/detail-733531.html

到了這里,關于Apache Kafka 基于 S3 的數(shù)據(jù)導出、導入、備份、還原、遷移方案的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • mysql-DBA(1)-數(shù)據(jù)庫備份恢復-導入導出-日志解釋

    mysql-DBA(1)-數(shù)據(jù)庫備份恢復-導入導出-日志解釋

    log: hdd data :ssd? ,備份和導出都慢,緩沖池有污染。 邏輯備份:把所有的命令轉換成sql語句。 修改配置文件: -A 備份所有 -B 備份哪個數(shù)據(jù)庫 --master-data=1 同步 內容: 備份參數(shù): 1.備份成文件,里面就是sql語句 2.routine: 3.trigger 觸發(fā)器 4.event: 定時任務 5.-B 數(shù)據(jù)庫 1.有-B 表

    2024年03月09日
    瀏覽(99)
  • MySQL數(shù)據(jù)庫的備份、恢復、導出、導入(bin log和mydump)

    一、使用 bin log 來恢復數(shù)據(jù) 一、bin log的三種格式 1、statement:基于SQL語句的復制(statement-based replication,SBR) 2、row:基于行的復制(row-based replication,RBR) 3、mixed:混合模式復制(mixed-based replication,MBR) 4、查看模式和更改模式 二、配置bin log策略 三、獲取bin log文件列表

    2024年02月21日
    瀏覽(99)
  • Apache POI實現(xiàn)Excel導入讀取數(shù)據(jù)和寫入數(shù)據(jù)并導出

    Apache POI POI介紹 Apache POI是用Java編寫的免費開源的跨平臺的Java API,Apache POI提供API給Java程序對Microsoft Office格式檔案讀和寫的功能,其中使用最多的就是使用POI操作Excel文件。 maven坐標: POI結構: 入門案例 ExcelTest .java文件 從Excel文件讀取數(shù)據(jù)

    2024年02月12日
    瀏覽(32)
  • mysql 備份 導入 導出

    mysql 備份 導入 導出

    Navicat備份原理分析以及測試_nb3文件轉換為sql_李三光鐵粉的博客-CSDN博客 導出文件小 ,但是導入速度慢 。nb3文件 5m 解壓后是 元信息和gz壓縮包 , 里面存放了 表結構信息和需要插入的數(shù)據(jù) 導入流程是,先解壓文件,在執(zhí)行sql插入,所有速度比轉儲功能導入慢 導出文件的,

    2024年02月06日
    瀏覽(16)
  • Windows系統(tǒng)Outlook郵件備份導出與導入教程

    Windows系統(tǒng)Outlook郵件備份導出與導入教程

    注意:微軟商店UWP版本outlook客戶端暫時不支持郵件備份!而Microsoft Office2003-目前(2021)中的outlook客戶端才支持郵件備份。所以,想要備碧桂園集團郵箱郵件,請安裝或者登錄Microsoft?Office中的outlook客戶端以進行郵件的備份。 這個是Microsoft ?S tore里的U WP 版outlook客戶端,暫不支

    2024年01月18日
    瀏覽(30)
  • macOS系統(tǒng)Outlook郵件備份導出與導入教程

    macOS系統(tǒng)Outlook郵件備份導出與導入教程

    首先安裝Office?for?Mac客戶端,且里面包含Outlook應用程序且登錄了郵箱。 打開outlook客戶端,且保持在窗口最前,待上面導航出現(xiàn)outlook客戶端導航欄,點擊 文件 ,點擊 導出 。 根據(jù)需求,選擇您所要導出的內容,后點擊繼續(xù)。 在存儲為:修改存檔的名稱,在位置:選擇存放

    2024年01月17日
    瀏覽(32)
  • 基于EasyExcel的數(shù)據(jù)導入導出(復制可用)

    基于EasyExcel的數(shù)據(jù)導入導出(復制可用)

    目錄 ? 前言: 新建SpringBoot項目,引入下面的依賴 數(shù)據(jù)導入導出執(zhí)行原理和思路: 用戶端邏輯: 后臺開發(fā)邏輯: 代碼實現(xiàn) 下拉框策略 批注策略 數(shù)據(jù)讀取監(jiān)聽 Excel工具類 創(chuàng)建導入數(shù)據(jù)模板類 創(chuàng)建數(shù)據(jù)導出模板 Web接口 結果展示 模板下載 數(shù)據(jù)導入 數(shù)據(jù)導出 ? 代碼復制粘貼

    2024年02月05日
    瀏覽(26)
  • 基于Luckysheet實現(xiàn)的協(xié)同編輯在線表格支持在線導入數(shù)據(jù)庫,前端導出,前端導入,后端導出

    基于Luckysheet實現(xiàn)的協(xié)同編輯在線表格支持在線導入數(shù)據(jù)庫,前端導出,前端導入,后端導出

    提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內容: 這兩年,在線表格協(xié)作工具越來越火,但開源界一直沒有相關的實現(xiàn),被壟斷在幾個大廠手上,隨著Luckysheet 的橫空出世,開源界終于也有一個漂亮能打的在

    2024年02月11日
    瀏覽(30)
  • 基于再生龍(clonezilla)的系統(tǒng)鏡像的備份和還原

    基于再生龍(clonezilla)的系統(tǒng)鏡像的備份和還原

    1.1、啟動U盤,建議8G及以上 1.2、再生龍鏡像 https://udomain.dl.sourceforge.net/project/clonezilla/clonezilla_live_stable/3.0.1-8/clonezilla-live-3.0.1-8-amd64.iso 1.3、U盤啟動制作工具Rufus軟件 Rufus - Download 1.4、存儲數(shù)據(jù)U盤或者移動硬盤,建議50G以上 2.1、開啟Rufus軟件 2.2、插入啟動U盤 2.3、鏡像選擇再

    2024年02月05日
    瀏覽(43)
  • Mysql備份命令Mysqldump導入、導出以及壓縮成zip、gz格式

    命令:mysqldump -u用戶名 -p數(shù)據(jù)庫密碼 數(shù)據(jù)庫名 文件名 如果用戶名需要密碼,則需要在此命令執(zhí)行后輸入一次密碼核對;如果數(shù)據(jù)庫用戶名不需要密碼,則不要加“-p”參數(shù),導入的時候相同。注意輸入的用戶名需要擁有對應數(shù)據(jù)庫的操作權限,否則無法導出數(shù)據(jù)。由于是作

    2024年02月11日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包