国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

DataX同步達夢數據到HDFS

這篇具有很好參考價值的文章主要介紹了DataX同步達夢數據到HDFS。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

DataX同步達夢數據到HDFS
1、 前提條件

  1. 安裝達夢數據庫客戶端
  2. 安裝Python3.5 以上
  3. 導入dmPython模塊

導入dmPython流程

  1. 在達夢數據庫客戶端 \drivers\python\dmPython這個路徑下執(zhí)行
python setup.py install 

● 如果報錯在PATH中加入E:\dmdbms\bin 達夢數據庫的安裝路徑,并重新裝載dmPython
Traceback (most recent call last):

File "setup.py", line 103, in

raise DistutilsSetupError("cannot locate an Dameng software " /

distutils.errors.DistutilsSetupError: cannot locate an Dameng software installation

● 如果報下面錯誤則需要自己下載Microsoft Visual C++ 14.0

error: Microsoft Visual C++ 14.0 is required. Get it with "Mi
  1. 在命令行里 輸入 impot dmPython
1.  impot dmPython
報錯 :ImportError: DLL load failed while importing dmPython: 找不到指定的模塊。

2.  import sys
	sys.path
在末尾等到C:\\Users\\lee\\AppData\\Local\\Pro
Python38\\lib\\site-packages\\dmpython-2.3-py3.8-win-amd64.egg
這個路徑

3. 將達夢數據庫安裝目錄下 E:\dmdbms\drivers\dpi下的所有文件,
拷貝到sys.path的最后一個目錄下面,再次導入import dmPython成功

2、 生成Job腳本

# ecoding=utf-8
import json
import getopt
import os
import sys
import dmPython

#DM相關配置,需根據實際情況作出修改
DM_host = "124.**.**.249"
DM_port = "5**6"
DM_user = "MES_****_**_SP"
DM_passwd = "*******"

#HDFS NameNode相關配置,需根據實際情況作出修改
hdfs_nn_host = "pt101"
hdfs_nn_port = "8020"

#生成配置文件的目標路徑,可根據實際情況作出修改
output_path = "D:/"


def get_connection():
    return dmPython.connect(user=DM_user, password=DM_passwd, host=DM_host, port=int(DM_port))


def get_DM_meta(owner, table):
    connection = get_connection()
    cursor = connection.cursor()
    DMsql="SELECT COLUMN_NAME,DATA_TYPE FROM table_name WHERE owner="+"'"+owner+"'"+" AND TABLE_NAME="+"'"+table+"'"
    cursor.execute(DMsql)
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_DM_columns(owner, table):
    return list(map(lambda x: x[0], get_DM_meta(owner, table)))


def get_hive_columns(owner, table):
    def type_mapping(dm_type):
        mappings = {
            "NUMBER": "string",
            "INT": "int",
            "BLOB": "string",
            "CLOB": "string",
            "BIGINT": "bigint",
            "DOUBLE": "double",
            "FLOAT": "float",
            "TEXT": "string",
            "VARCHAR2": "string",
            "VARCHAR": "string",
            "TINYINT": "tinyint",
            "CHAR": "char",
            "TIMESTAMP": "string",
            "DECIMAL": "string",
            "DATETIME": "string",
            "DATE": "string",
            "SMALLINT": "smallint",
            "BIT": "boolean",
            "DEC": "string"
        }
        return mappings[dm_type]

    meta = get_DM_meta(owner, table)
    json.dumps(list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, get_DM_meta(owner, table)))))
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1])}, meta))


def generate_json(owner, source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "rdbmsreader",
                    "parameter": {
                        "username": DM_user,
                        "password": DM_passwd,
                        "column": get_DM_columns(owner, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_database+"."+source_table],
                            "jdbcUrl": ["jdbc:dm://" + DM_host + ":" + DM_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}${system.biz.date}",
                        "fileName": source_table,
                        "column": get_hive_columns(owner, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    owner = ""
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-u:-d:-t:', ['owner=', 'sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-u', '--owner'):
            owner = opt_value
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(owner,source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

3、 通過命令生成Job

python data_platform_datax_import_config.py -u user -d datebase-t table

4、啟動DataX進行數據傳輸文章來源地址http://www.zghlxwxcb.cn/news/detail-780108.html

  1. 將Job上傳在DataX的job目錄下
  2. 提前創(chuàng)建好文件夾
hadoop fs -mkdir -p /origin_data/datebase/table/2023-05-05
  1. 使用命令執(zhí)行DataX
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/datebase/table/2023-05-05" /opt/module/datax/job/路徑/腳本名稱.json

到了這里,關于DataX同步達夢數據到HDFS的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 數據同步工具DataX、Sqoop、Maxwell、Canal

    數據同步工具DataX、Sqoop、Maxwell、Canal

    常見的數據庫同步同步主要有:DataX、Sqoop、Maxwell、Canal 數據同步工具種類繁多,大致可分為兩類,一類是以DataX、Sqoop為代表的基于Select查詢的離線、批量同步工具,另一類是以Maxwell、Canal為代表的基于數據庫數據變更日志(例如MySQL的binlog,其會實時記錄所有的insert、upda

    2024年02月11日
    瀏覽(31)
  • DataX實現Mysql與ElasticSearch(ES)數據同步

    DataX實現Mysql與ElasticSearch(ES)數據同步

    jdk1.8及以上 python2 查看是否安裝成功 查看python版本號,判斷是否安裝成功 在datax/job下,json格式,具體內容及主要配置含義如下 mysqlreader為讀取mysql數據部分,配置mysql相關信息 username,password為數據庫賬號密碼 querySql:需要查詢數據的sql,也可通過colums指定需要查找的字段(

    2024年02月05日
    瀏覽(22)
  • 阿里巴巴開源DataX全量同步多個MySQL數據庫

    阿里巴巴開源DataX全量同步多個MySQL數據庫

    上次 寫了阿里巴巴高效的離線數據同步工具DataX: https://mp.weixin.qq.com/s/_ZXqA3H__Kwk-9O-9dKyOQ 安裝DataX這個開源工具,并且同步備份了幾張數據表。但是發(fā)現一個問題,就是每張表都需要單獨寫一個 job。如果數據表有幾百張是不是要寫幾百個,這個不太現實了。 正當一籌莫展之際

    2024年02月02日
    瀏覽(40)
  • Centos7.9通過datax-web2.0_用Datax3.0進行增量同步_增量刪除_數據更新---大數據之DataX工作筆記006

    ?1.注意這里的增量同步,不像之前用的DBsyncer或者是,NIFI中的利用binlog的形式,實現真正的實時的數據同步. ?2.這里的增量是,指定通過ID,或者時間來進行增量,比如大于2023-07-03 11:44:56的數據僅僅同步這個,或者是,id大于多少的這樣,這里建議用時間,因為如果有id用的字符串咋弄來

    2024年02月10日
    瀏覽(59)
  • DolphinScheduler 調度 DataX 實現 MySQL To ElasticSearch 增量數據同步實踐

    DolphinScheduler 調度 DataX 實現 MySQL To ElasticSearch 增量數據同步實踐

    基于SQL查詢的 CDC(Change Data Capture): 離線調度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過查詢去獲取表中最新的數據。也就是我們說的基于SQL查詢抽取; 無法保障數據一致性,查的過程中有可能數據已經發(fā)生了多次變更; 不保障實時性,基于離線調度存在天然的

    2024年02月03日
    瀏覽(24)
  • DataX將MySQL數據同步到HDFS中時,空值不處理可以嗎

    DataX將MySQL數據同步到HDFS中時,空值不處理可以嗎

    DataX將MySQL數據同步到HDFS中時,空值(NULL)存到HDFS中時,默認是存儲為空字符串(‘’)。 HFDS Writer并未提供nullFormat參數:也就是用戶并不能自定義null值寫到HFDS文件中的存儲格式。默認情況下,HFDS Writer會將null值存儲為空字符串(‘’),而Hive默認的null值存儲格式為N。所以

    2024年02月12日
    瀏覽(29)
  • datax 同步mongodb數據庫到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有點老了,后來發(fā)現flinkcdc都只能監(jiān)控一張表,多張表無法監(jiān)控) 2.datax版本:自己編譯的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量數據:需要每隔1小時將mongodb中17個集合的數據同步至hive,因為有數據生成時間,才用datax查詢方式,將上一個

    2023年04月23日
    瀏覽(92)
  • 業(yè)務數據同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    業(yè)務數據同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介紹 Sqoop : SQ L-to-Had oop ( Apache已經終止Sqoop項目 ) 用途:把關系型數據庫的數據轉移到HDFS(Hive、Hbase)(重點使用的場景);Hadoop中的數據轉移到關系型數據庫中。Sqoop是java語言開發(fā)的,底層使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是數據塊的轉移,沒有使

    2024年02月15日
    瀏覽(43)
  • 數據同步工具調研選型:SeaTunnel 與 DataX 、Sqoop、Flume、Flink CDC 對比

    數據同步工具調研選型:SeaTunnel 與 DataX 、Sqoop、Flume、Flink CDC 對比

    Apache SeaTunnel 是一個非常易用的超高性能分布式數據集成產品,支持海量數據的離線及實時同步。每天可穩(wěn)定高效同步萬億級數據,已應用于數百家企業(yè)生產,也是首個由國人主導貢獻到 Apache 基金會的數據集成頂級項目。 SeaTunnel 主要解決數據集成領域的常見問題: * 數據源

    2024年02月04日
    瀏覽(28)
  • datax同步數據到ClickHouse時同步時間特別長,原因:Too many partitions for single INSERT block (more than 100).

    今天將 Hive 分區(qū)中數據同步到 ClickHouse 時,發(fā)現有的任務運行時間很短,但是有的任務運行時間特別長,看了一下數據量,發(fā)現有的接近千萬條數據,但是幾分鐘就同步完了,但是有的才幾萬條數據,要同步半個多小時,還有的任務幾百萬條數據,甚至要同步四五個小時。開

    2023年04月08日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包