Flink之FileSink將數(shù)據(jù)寫入parquet文件
在使用FileSink將數(shù)據(jù)寫入列式存儲文件中時必須使用forBulkFormat
,列式存儲文件如ORCFile
、ParquetFile
,這里就以ParquetFile
為例結(jié)合代碼進(jìn)行說明.
在Flink1.15.3
中是通過構(gòu)造ParquetWriterFactory
然后調(diào)用forBulkFormat
方法將構(gòu)造好的ParquetWriterFactory
傳入,這里先講一下構(gòu)造ParquetWriterFactory
一共有三種方式
序列 | API |
---|---|
方式一 | AvroParquetWriters.forGenericRecord |
方式二 | AvroParquetWriters.forSpecificRecord |
方式三 | AvroParquetWriters.forReflectRecord |
其中方式三AvroParquetWriters.forReflectRecord
是我們常用的方法,使用起來也是復(fù)雜最低、代碼變更時靈活度較好方法,方式二AvroParquetWriters.forSpecificRecord
使用起來復(fù)雜度較高,但是代碼變更的時候靈活度相對較好的方法,方式一AvroParquetWriters.forGenericRecord
使用起來比較麻煩,而且代碼變更時需要更改的也比較多,這里主要介紹方式二和方式三的使用方式.
要說明一點再Flink1.15.3
中是通過AvroParquetWriters
來構(gòu)造ParquetWriterFactory
,如果是早期版本的Flink可能是要通過ParquetAvroWriters
來進(jìn)行構(gòu)造,當(dāng)然在1.15.3
中也可以通過這個方式進(jìn)行構(gòu)造,不過ParquetAvroWriters
已經(jīng)標(biāo)注為過時并且建議使用AvroParquetWriters
源碼內(nèi)容如下:
/**
* Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro
* types.
*
* @deprecated use {@link AvroParquetWriters} instead. // 看這部分是建議使用AvroParquetWriters
*/
@Deprecated // 這里已經(jīng)標(biāo)注了過時
public class ParquetAvroWriters {
/**
* Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the
* schema of that specific type to build and write the columnar data.
*
* @param type The class of the type to write.
*/
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
Class<T> type) {
return AvroParquetWriters.forSpecificRecord(type);
}
-
AvroParquetWriters.forReflectRecord(方式三)
這里就先介紹一下
AvroParquetWriters.forReflectRecord
的使用方式,我們在使用FileSink
時最好配合Checkpoint
使用,不然文件只會出現(xiàn)inprogress
狀態(tài),感興趣的可以自己實驗一下,我在Flink中FileSink的使用演示了加Checkpoint
和不加Checkpoint
的區(qū)別感興趣的可以看一下.代碼模板內(nèi)容比較簡單,直接代碼演示:
import com.jin.bean.User; import com.jin.schema.UserSchemaBean; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.avro.AvroParquetWriters; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/6/28 * @Description: 測試 **/ public class FlinkFileSinkForParquet { public static void main(String[] args) throws Exception { // 創(chuàng)建流環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(shè)置并行度 env.setParallelism(1); // 每30秒作為checkpoint的一個周期 env.enableCheckpointing(30000); // 兩次checkpoint間隔最少是20秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000); // 程序取消或者停止時不刪除checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // checkpoint必須在60秒結(jié)束,否則將丟棄 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只能有一個checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 設(shè)置EXACTLY_ONCE語義,默認(rèn)就是這個 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoint存儲位置 env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint"); // 添加數(shù)據(jù)源(這里使用的是自定義數(shù)據(jù)源CustomizeSource,方便測試) DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource()); // 將數(shù)據(jù)流中的數(shù)據(jù)存儲到bean對象中 SingleOutputStreamOperator<User> userMapStream = sourceStream.map(bean -> new User(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())); // 構(gòu)建parquetWriterFactory ParquetWriterFactory<User> parquetWriterFactory2 = AvroParquetWriters.forReflectRecord(User.class); // 構(gòu)建FileSink FileSink<User> parquetFileSink = FileSink // 使用Bulk模式,并配置路徑和對應(yīng)的schema .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory2) // 分桶策略,使用默認(rèn)的 .withBucketAssigner(new DateTimeBucketAssigner<User>()) // 每100毫秒檢查一次分桶 .withBucketCheckInterval(100) // 滾動策略,Bulk的滾動策略只有一種,就是發(fā)生Checkpoint的時候才進(jìn)行滾動(為了保證列式文件的完整性) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); // 輸出到文件 userMapStream.sinkTo(parquetFileSink); env.execute(); } } @Getter @Setter @ToString @NoArgsConstructor @AllArgsConstructor class User { private String name; private int age; private String gender; private String hobbit; }
代碼中注釋很詳細(xì)了,具體使用看注釋即可。這里說明一下為什么
forBulkFormat
的滾動策略只有OnCheckpointRollingPolicy
而不是像forRowFormat
那樣可以通過時間和文件大小來控制文件滾動,注釋中我也講了是為了保證列式存儲文件的完整性,因為列式文件中記錄了很多信息,并不想行式存儲文件一行一行的寫就行,寫到某一行直接停了也不影響文件的使用,而列式存儲文件中不單單是記錄了數(shù)據(jù)本身還有對應(yīng)的字段類型、文件頭信息、文件尾信息、切片索引等很多信息,如果在寫入數(shù)據(jù)時某一刻直接停止了,而文件還沒有生成完整的信息那就會導(dǎo)致這個列士存儲文件根本不具備使用性,是無法進(jìn)行解析的。就比如說
ParquetFile
,它的文件結(jié)構(gòu)如下圖可以看到文件的結(jié)構(gòu)信息是很復(fù)雜的,如果感興了解一下可以看數(shù)據(jù)存儲格式這篇文章了解一下,這里就不細(xì)說了,內(nèi)容還是比較多的.
-
AvroParquetWriters.forSpecificRecord(方式二)
forSpecificRecord
的使用不像forReflectRecord
那樣自定義一個bean
接收數(shù)據(jù)就行了,使用forSpecificRecord
還要結(jié)合一下Apache avro
的官網(wǎng)看一下,下面我就介紹一下如何使用forSpecificRecord
.avro
的使用有兩種方式一是通過API
直接調(diào)用的方式,二通過配置avsc
文件然后進(jìn)行編譯的方式,在代碼中我們使用的第二種方式,使用第一種方式同樣會出現(xiàn)很多schema
的信息在代碼中寫死修改起來會比較復(fù)雜的問題,而且對avro
的API
也要足夠熟悉,學(xué)習(xí)成本還是有的.-
在
resource
目錄中創(chuàng)建avsc
文件,文件內(nèi)容如下{ "namespace": "com.jin.schema", "type": "record", "name": "UserSchemaBean", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"}, {"name": "gender", "type": "string"}, {"name": "hobbit", "type": "string"} ] }
文件中的內(nèi)容就是
schema
信息,這里我相信大家都能看得明白."namespace": "com.jin.schema"
編譯后自動創(chuàng)建的bean
的存儲位置,"name": "UserSchemaBean"
就是配置生成bean
的名稱,fields
中就是配置生成bean
的成員變量和對應(yīng)的數(shù)據(jù)類型.官網(wǎng)演示的
avsc
文件內(nèi)容如下:{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
編譯后就會根據(jù)
avsc
文件中的schema
信息在配置好的目錄中自動創(chuàng)建bean
. -
在
Maven
中添加avsc
文件編譯插件官網(wǎng)內(nèi)容如下:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin>
要注意
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
是已經(jīng)配置完的avsc
文件的位置,像是我就是在原有的resource
目錄下配置的就要將內(nèi)容改成<sourceDirectory>${project.basedir}/src/main/resource/</sourceDirectory>
否則在編譯時就會報錯找不到對應(yīng)的目錄或文件,如果想直接使用<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
那就在項目的main
目錄下創(chuàng)建一個avro
目錄并將目錄性質(zhì)改為Source root
(這個如果不會可自行百度,關(guān)鍵字我都已經(jīng)提供了).我的項目中實際配置如下:
<!-- avro插件 --> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.10.0</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin>
選擇插件的版本時要注意依賴沖突問題,我們要先看一下Flink的
flink-avro
下的org.apache.avro:avro
是什么版本,如下圖:可以看到
1.15.3
的org.apache.avro:avro
的版本是1.10.0
,所以我選擇的插件也是這個版本. -
編譯
上面步驟都完成了就可以進(jìn)行編譯了,Maven->Lifecycle->compile,這里看一下編譯后的結(jié)果如下圖:
可以看到已經(jīng)根據(jù)我們配置的
avsc
文件自動創(chuàng)建了對應(yīng)的bean
,這里看一下成員變量內(nèi)容是否一致,如下:/** * All-args constructor. * @param name The new value for name * @param age The new value for age * @param gender The new value for gender * @param hobbit The new value for hobbit */ public UserSchemaBean(java.lang.CharSequence name, java.lang.Integer age, java.lang.CharSequence gender, java.lang.CharSequence hobbit) { this.name = name; this.age = age; this.gender = gender; this.hobbit = hobbit; }
可以看到成員變量信息也是完全一致,我這里值展示了小部分代碼,編譯后的
bean
中的代碼信息很多,不過我們不用關(guān)心這個,懂與不懂都不影響使用. -
代碼內(nèi)容
接下來就到主題了,實際的代碼內(nèi)容如下:
import com.jin.schema.UserSchemaBean; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.avro.AvroParquetWriters; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/6/28 * @Description: 測試 **/ public class FlinkFileSinkForParquet { public static void main(String[] args) throws Exception { // 創(chuàng)建流環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(shè)置并行度 env.setParallelism(1); // 每30秒作為checkpoint的一個周期 env.enableCheckpointing(30000); // 兩次checkpoint間隔最少是20秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000); // 程序取消或者停止時不刪除checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // checkpoint必須在60秒結(jié)束,否則將丟棄 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只能有一個checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 設(shè)置EXACTLY_ONCE語義,默認(rèn)就是這個 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoint存儲位置 env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint"); // 添加數(shù)據(jù)源(這里使用的是自定義數(shù)據(jù)源,方便測試) DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource()); // 將數(shù)據(jù)流中的對象轉(zhuǎn)成UserSchemaBean類型 SingleOutputStreamOperator<UserSchemaBean> mapStream = sourceStream.map(bean -> new UserSchemaBean(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())); // 構(gòu)建parquetWriterFactory,這里傳入的就是編譯后的UserSchemaBean ParquetWriterFactory<UserSchemaBean> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(UserSchemaBean.class); // 構(gòu)建FileSink FileSink<UserSchemaBean> parquetFileSink = FileSink // 使用Bulk模式,并配置路徑和對應(yīng)的schema .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory) // 分桶策略,使用默認(rèn)的 .withBucketAssigner(new DateTimeBucketAssigner<UserSchemaBean>()) // 每100毫秒檢查一次分桶 .withBucketCheckInterval(100) // 滾動策略,Bulk的滾動策略只有一種,就是發(fā)生Checkpoint的時候才進(jìn)行滾動(為了保證列式文件的完整性) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); // 輸出到文件 mapStream.sinkTo(parquetFileSink); env.execute(); } }
通過代碼我們可以看到,內(nèi)容基本就是一致的無非就是
forSpecificRecord
傳入的bean
不同而已,當(dāng)然還是建議使用AvroParquetWriters.forReflectRecord
這種方式,簡易高效,復(fù)雜的過程并不一定能提高我們的代碼能力.文章來源:http://www.zghlxwxcb.cn/news/detail-770193.html到這里這兩種方式我都介紹完了,希望看完這篇文章有所收獲.文章來源地址http://www.zghlxwxcb.cn/news/detail-770193.html
-
到了這里,關(guān)于Flink之FileSink將數(shù)據(jù)寫入parquet文件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!