flink 1.13 streamload寫入doris
前言
官方教程詳細介紹了基于flink 1.16的各種寫入方式,本文主要介紹的是基于flink 1.13的RowData 數(shù)據(jù)流(RowDataSerializer)寫入文章來源地址http://www.zghlxwxcb.cn/news/detail-758109.html
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.1</flink.version>
</properties>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
一、目標表
-- doris sink table
CREATE TABLE IF NOT EXISTS events (
`user_id` INT NOT NULL COMMENT '用戶id',
`date` DATE COMMENT '接收日期',
`event_type` varchar(256) COMMENT '事件類型',
`province` varchar(128) COMMENT '省份',
`city` varchar(128) COMMENT '城市'
`receive_time` DATETIME
)
DUPLICATE KEY(`user_id`, `receive_date`)
COMMENT '事件表'
PARTITION BY RANGE (`receive_date`)
(
FROM
("2023-01-01") TO ("2023-09-14") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH (`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"compression"="LZ4",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1"
);
二、map
public class EventMapFunction extends RichMapFunction<Event, RowData> {
private static final Logger log = LoggerFactory.getLogger(EventMapFunction.class);
@Override
public RowData map(Event event) {
GenericRowData genericRowData = new GenericRowData(6);
try {
//map 字段映射
genericRowData.setField(0, event.getUserId());
// 字符型需要轉化,否則會報錯
genericRowData.setField(1, StringData.fromString(event.getReceiveDate()));
genericRowData.setField(2, StringData.fromString(event.getEventType()));
genericRowData.setField(3, StringData.fromString(event.getProvince()));
genericRowData.setField(4, StringData.fromString(event.getCity()));
genericRowData.setField(5, StringData.fromString(event.getReceiveTime()));
} catch (Exception e) {
log.error("Event data map error : " + e);
}
return genericRowData;
}
}
三、DorisSink
public class DorisSinkUtil {
public static SinkFunction<RowData> getDorisSink(String table, String labelPrefix) {
//寫入格式
Properties properties = new Properties();
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");
properties.setProperty("strip_outer_array", "true");
SinkFunction<RowData> dorisSink = DorisSink.sink(getEventFields(),
getEventDataType(),
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(0L)
.setMaxRetries(3)
.setStreamLoadProp(properties)
.build(),
DorisOptions.builder()
.setFenodes(readValue("doris.fenodes"))
.setTableIdentifier(table)
.setUsername(readValue("doris.username"))
.setPassword(readValue("doris.password"))
.build());
return dorisSink;
}
//字段及類型一一對應
public static String[] getEventFields() {
return new String[]{
"user_id",
"receive_date",
"event_type",
"province",
"city",
"receive_time"
};
}
public static LogicalType[] getEventDataType() {
return new LogicalType[]{
new IntType(),
new VarCharType(256),
new VarCharType(256),
new VarCharType(128),
new VarCharType(128),
new VarCharType(256)
};
}
}
四、job主類
public static void main(String[] args) {
try {
/** 一、 創(chuàng)建flink流式執(zhí)行環(huán)境 */
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(0);
/** 二、 獲取Source */
KafkaSource<String> eventSource = getKafkaSource(
readValue("kafka.broker.id"),
readValue("kafka.event.topic"),
readValue("kafka.event.group.id"));
/** 三、消費 Source */
SingleOutputStreamOperator<String> eventSourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "kafkaSource_event").setParallelism(12);
/** 四、解析數(shù)據(jù) */
SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = eventSourceStream.flatMap(new EventParseFlatMap()).setParallelism(12);
/** 五、將java bean類型轉化為rowdata類型 */
SingleOutputStreamOperator<RowData> eventStream = eventSingleOutputStreamOperator .map(new EventMapFunction()).setParallelism(12);
/** 六、構建doris sink */
SinkFunction<RowData> eventSink = DorisSinkUtil.getDorisSink(readValue("doris.table.event"), Constants.EVENT_LABEL_PREFIX);
/** 七、輸出至doris */
eventStream.addSink(eventSink).setParallelism(12);
env.execute("EventJob");
} catch (Exception e) {
LOG.error("EventJob failed to activate", e);
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-758109.html
到了這里,關于flink streamload寫入doris的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!