POSTGRESQL中ETL、fdw的平行替換
01、簡介
“ 在我前兩次的文章中,說到postgresql對于python的支持,其實很多功能也就可以封裝進入的postgresql數(shù)據(jù)庫中去。比如fdw、etl等,本文將以此為敘述點,進行演示展示”
在postgresql數(shù)據(jù)庫中fdw的支持,在創(chuàng)建和使用上都不上太方便,特別是fdw在用表級別關聯(lián)的時候,性能會大大折扣,因為fdw的數(shù)據(jù)并不會落地到本地?。所以我們可以利用postgresql對于python的支持,自行封裝一個庫對庫的調度工具,將遠端數(shù)據(jù)進行落地?后再次使用。對于使用的便利性,讀者可自行?對比。
02、postgresql16.1的安裝
安裝依賴
yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc gcc-c++ openssl-devel python3 python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make
添加用戶
useradd postgres
vim /etc/sudo
在101行以下添加以下內容
postgres ALL=(ALL) NOPASSWD: ALL
進入官網(wǎng)找到鏈接,這里使用源碼安裝。
wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz
解壓并進入解壓目錄
mv postgresql-16.1.tar.gz /home/postgres
su - postgres
tar -zxf postgresql-16.1.tar.gz
cd postgresql-16.1
這里編譯python支持還是很重要。–with-python 自行構建plpython3u插件
./configure --prefix=/home/postgres/pg --with-openssl --with-python
make && make install
編輯環(huán)境變量
cd
vim .bash_profile
加入以下環(huán)境變量
export PATH=/home/postgres/pg/bin:$PATH
export PGDATA=/home/postgres/pg/data
加載環(huán)境變量
source ~/.bash_profile
初始化數(shù)據(jù)庫
initdb -D $PGDATA -U postgres -W
(輸入超級用戶密碼兩次)
pg_ctl start
pg_ctl status
進入數(shù)據(jù)庫創(chuàng)建拓展
CREATE EXTENSION plpython3u CASCADE;
02、創(chuàng)建支持跨庫訪問的函數(shù)
首先下載python鏈接數(shù)據(jù)庫所需module
postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbc
Downloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)
|████████████████████████████████| 330 kB 2.8 MB/s
Collecting pymysql
Downloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)
|████████████████████████████████| 43 kB 2.4 MB/s
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39
在鏈接遠程Oracle數(shù)據(jù)庫,需要下載指定的客戶端,本文使用的是oracle 19C
wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
編輯環(huán)境變量
vim /etc/profile
配置以下環(huán)境變量值
export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH
加載環(huán)境變量
source /etc/profile
在postgresql數(shù)據(jù)庫中創(chuàng)建具有跨庫鏈接mysql\oracle\sqlserver功能的function。
CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$
import cx_Oracle
import pyodbc
import pymysql
def read_data_from_database(db_type, host, port, username, password, db_name, table_name):
result_values = [] # Initialize as an empty list
# 讀取Oracle數(shù)據(jù)庫中指定表數(shù)據(jù)的函數(shù)
if db_type.lower() == 'oracle':
connection_string = f"{username}/{password}@{host}:{port}/{db_name}"
connection = cx_Oracle.connect(connection_string)
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name}')
result = cursor.fetchall()
cursor.close()
connection.close()
# 將結果轉換為支持INSERT INTO的VALUES語句
for row in result:
values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
result_values.append(f'({values_str})')
# 讀取SQL Server數(shù)據(jù)庫中指定表數(shù)據(jù)的函數(shù)
elif db_type.lower() == 'sqlserver':
connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name}')
result = cursor.fetchall()
cursor.close()
connection.close()
# 將結果轉換為支持INSERT INTO的VALUES語句
for row in result:
values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
result_values.append(f'({values_str})')
# 讀取MySQL數(shù)據(jù)庫中指定表數(shù)據(jù)的函數(shù)
elif db_type.lower() == 'mysql':
connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name}')
result = cursor.fetchall()
cursor.close()
connection.close()
# 將結果轉換為支持INSERT INTO的VALUES語句
for row in result:
values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
result_values.append(f'({values_str})')
else:
raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")
# 返回拼接的VALUES子句
return ', '.join(result_values)
insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values
$$ LANGUAGE plpython3u;
以Oracle作為測試 在Oracle 和PG中均創(chuàng)建測試表conn_fdw
postgresql
-- 創(chuàng)建表 conn_fdw
CREATE TABLE conn_fdw (
id integer,
name VARCHAR(50),
age integer,
city VARCHAR(50),
salary integer
);
oracle中
-- 創(chuàng)建表 conn_fdw
CREATE TABLE conn_fdw (
id NUMBER,
name VARCHAR2(50),
age NUMBER,
city VARCHAR2(50),
salary NUMBER
);
Oracle中插入數(shù)據(jù)
-- 插入20行數(shù)據(jù)
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);
此時再結合SQL語言進行處理遠程連接傳過來數(shù)據(jù),再創(chuàng)建一個函數(shù)用于調用以上創(chuàng)建fdw_db
CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100)
,port integer, username VARCHAR(100),
password VARCHAR(100), db_name VARCHAR(100),
tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare
data_values text;
begin
SELECT fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;
EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;
$$ LANGUAGE plpgsql;
進行調用
SELECT inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
進入數(shù)據(jù)庫中查看
此時數(shù)據(jù)已經落地文章來源:http://www.zghlxwxcb.cn/news/detail-795467.html
postgres=# select * from CONN_FDW;
id | name | age | city | salary
----+------+-----+------+--------
(0 rows)
postgres=# SELECT inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
inset_fdw_db
--------------
(1 row)
postgres=# select * from CONN_FDW;
id | name | age | city | salary
----+---------+-----+---------------+--------
1 | John | 30 | New York | 50000
2 | Alice | 25 | Los Angeles | 60000
3 | Bob | 35 | Chicago | 70000
4 | Eva | 28 | San Francisco | 55000
5 | Mike | 32 | Seattle | 65000
6 | Sophia | 29 | Boston | 75000
7 | David | 27 | Denver | 52000
8 | Emily | 31 | Austin | 68000
9 | Daniel | 26 | Phoenix | 58000
10 | Olivia | 33 | Houston | 72000
11 | Liam | 24 | Portland | 49000
12 | Ava | 34 | Atlanta | 71000
13 | Logan | 30 | Miami | 62000
14 | Mia | 28 | Dallas | 54000
15 | Jackson | 29 | Minneapolis | 67000
16 | Sophie | 31 | Detroit | 59000
17 | William | 27 | Philadelphia | 70000
18 | Emma | 32 | San Diego | 66000
19 | James | 26 | Raleigh | 63000
20 | Avery | 35 | Tampa | 71000
(20 rows)
總結
該方法不僅可以應用到數(shù)據(jù)庫對數(shù)據(jù)庫之間,也可以應到,數(shù)據(jù)庫對文件路徑下。在postgresql嵌入python代碼 其實可以替換掉一些中間件的使用??煽匦?,定制性也會更強。文章來源地址http://www.zghlxwxcb.cn/news/detail-795467.html
到了這里,關于POSTGRESQL中ETL、fdw的平行替換的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!