GIS大數(shù)據(jù)處理框架sedona(塞多納)編程入門指導(dǎo)
簡(jiǎn)介
Apache Sedona?是一個(gè)用于處理大規(guī)??臻g數(shù)據(jù)的集群計(jì)算系統(tǒng)。Sedona擴(kuò)展了現(xiàn)有的集群計(jì)算系統(tǒng),如Apache Spark和Apache Flink,使用一組開箱即用的分布式空間數(shù)據(jù)集和空間SQL,可以有效地加載、處理和分析跨機(jī)器的大規(guī)??臻g數(shù)據(jù)。碼云鏡像 碼云sedona文檔持續(xù)更新中
代碼結(jié)構(gòu)
- common java核心包,對(duì)底層JTS、geotools坐標(biāo)系轉(zhuǎn)換等操作方法的接口包裝,并提供了circle(擴(kuò)展JTS功能),距離計(jì)算方法:Haversine方式,Spheroid橢球;WKT,GeoJSON等格式轉(zhuǎn)換;索引支持QUADTREE,RTREE;geohash計(jì)算;供spark、flink等上層應(yīng)用調(diào)用使用
- core 與spark適配核心包,封裝提供基礎(chǔ)對(duì)象SpatialRDD,PointRDD,LineStingRDD,CircleRDD,PolygonRDD;幾何鏈接操作joinJudgement(通過幾何拓?fù)潢P(guān)系),knnJudgement(幾何距離),rangeJudgement(treeIndex索引范圍查詢);數(shù)據(jù)讀取轉(zhuǎn)換formatMapper:cvs,wkt,geoJson,shapefile,netcdf;spatialPartitioning分區(qū)器:QuadtreePartitioning,KDBTreePartitioner等
- flink flink適配,調(diào)用common下的functions里面提供的函數(shù)方法
- python-adapter python適配,調(diào)用common下的functions里面提供的函數(shù)方法
- sql spark-sql適配,調(diào)用common下的functions里面提供的函數(shù)方法
使用說明
在spark下面的使用說明
1.安裝
具體參看
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-shaded-3.0_2.12</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-viz-3.0_2.12</artifactId>
<version>1.4.0</version>
</dependency>
<!-- Optional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>1.4.0-28.2</version>
</dependency>
2.初始化SparkSession
SparkSession sparkSession = SparkSession.builder()
.master("local[*]") // Delete this if run in cluster mode
.appName("readTestScala") // Change this to a proper name
// Enable Sedona custom Kryo serializer
.config("spark.serializer", KryoSerializer.class.getName) // org.apache.spark.serializer.KryoSerializer
.config("spark.kryo.registrator", SedonaKryoRegistrator.class.getName)
.getOrCreate() // org.apache.sedona.core.serde.SedonaKryoRegistrator
3.安裝函數(shù)
SedonaSQLRegistrator.registerAll(sparkSession)
4.使用例子
4.1 dataFrame方式加載數(shù)據(jù)
4.1.1 從文件加載數(shù)據(jù)
假設(shè)有一個(gè)WKT數(shù)據(jù)格式的tsv文件,存儲(chǔ)位置/Download/usa-county.tsv
POLYGON (..., ...) Cuming County
POLYGON (..., ...) Wahkiakum County
POLYGON (..., ...) De Baca County
POLYGON (..., ...) Lancaster County
加載
Dataset<Row> rawDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load("/Download/usa-county.tsv")
rawDf.createOrReplaceTempView("rawdf")
rawDf.show()
結(jié)果展示
| _c0|_c1|_c2| _c3| _c4| _c5| _c6|_c7|_c8| _c9|_c10| _c11|_c12|_c13| _c14| _c15| _c16| _c17|
+--------------------+---+---+--------+-----+-----------+--------------------+---+---+-----+----+-----+----+----+----------+--------+-----------+------------+
|POLYGON ((-97.019...| 31|039|00835841|31039| Cuming| Cuming County| 06| H1|G4020|null| null|null| A|1477895811|10447360|+41.9158651|-096.7885168|
|POLYGON ((-123.43...| 53|069|01513275|53069| Wahkiakum| Wahkiakum County| 06| H1|G4020|null| null|null| A| 682138871|61658258|+46.2946377|-123.4244583|
|POLYGON ((-104.56...| 35|011|00933054|35011| De Baca| De Baca County| 06| H1|G4020|null| null|null| A|6015539696|29159492|+34.3592729|-104.3686961|
|POLYGON ((-96.910...| 31|109|00835876|31109| Lancaster| Lancaster County| 06| H1|G4020| 339|30700|null| A|2169240202|22877180|+40.7835474|-096.6886584|
4.1.1 通過ST_函數(shù)
SELECT ST_GeomFromWKT(_c0) AS countyshape, _c1, _c2
4.1.3 從GeoJSON文件讀取
String schema = "type string, crs string, totalFeatures long, features array<struct<type string, geometry string, properties map<string, string>>>";
sparkSession.read.schema(schema).json(geojson_path)
.selectExpr("explode(features) as features") // Explode the envelope to get one feature per row.
.select("features.*") // Unpack the features struct.
.withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)")) // Convert the geometry string.
.printSchema();
4.1.4 從數(shù)據(jù)庫讀取
// For any JDBC data source, inluding Postgis.
Dataset<Row> df = sparkSession.read().format("jdbc")
// Other options.
.option("query", "SELECT id, ST_AsBinary(geom) as geom FROM my_table")
.load()
.withColumn("geom", expr("ST_GeomFromWKB(geom)"))
// This is a simplified version that works for Postgis.
Dataset<Row> df = sparkSession.read().format("jdbc")
// Other options.
.option("dbtable", "my_table")
.load()
.withColumn("geom", expr("ST_GeomFromWKB(geom)"))
4.2 CRS(坐標(biāo)系)轉(zhuǎn)換
SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS newcountyshape, _c1, _c2, _c3, _c4, _c5, _c6, _c7
FROM spatialdf
4.3 地理空間查詢
4.3.1 范圍查詢
ST_Contains, ST_Intersects, ST_Within
SELECT *
FROM spatialdf
WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape)
4.3.2 距離查詢
ST_Distance文章來源:http://www.zghlxwxcb.cn/news/detail-492492.html
SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance
FROM spatialdf
ORDER BY distance DESC
LIMIT 5
4.3.3 關(guān)聯(lián)查詢
SELECT *
FROM polygondf, pointdf
WHERE ST_Contains(polygondf.polygonshape,pointdf.pointshape)
SELECT *
FROM polygondf, pointdf
WHERE ST_Intersects(polygondf.polygonshape,pointdf.pointshape)
SELECT *
FROM pointdf, polygondf
WHERE ST_Within(pointdf.pointshape, polygondf.polygonshape)
SELECT *
FROM pointdf1, pointdf2
WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2
5 存儲(chǔ)
已入postgis為例文章來源地址http://www.zghlxwxcb.cn/news/detail-492492.html
my_postgis_db# create table my_table (id int8, geom geometry);
df.withColumn("geom", expr("ST_AsEWKB(geom)")
.write.format("jdbc")
.option("truncate","true") // Don't let Spark recreate the table.
// Other options.
.save()
// If you didn't create the table before writing you can change the type afterward.
my_postgis_db# alter table my_table alter column geom type geometry;
6 SpatialRDD與DataFrame相好轉(zhuǎn)換
6.1 SpatialRDD轉(zhuǎn)DataFrame
Dataset<Row> spatialDf = Adapter.toDf(spatialRDD, sparkSession)
6.2 DataFrame轉(zhuǎn)SpatialRDD
val schema = StructType(Array(
StructField("county", GeometryUDT, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
val spatialDf = Adapter.toDf(spatialRDD, schema, sparkSession)
到了這里,關(guān)于GIS大數(shù)據(jù)處理框架sedona(塞多納)編程入門指導(dǎo)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!