在上一課時中我們提過在實時計算的場景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因為高吞吐、低延遲的特點;同時也講了 Flink 作為生產(chǎn)者像 Kafka 寫入數(shù)據(jù)的方式和代碼實現(xiàn)。這一課時我們將從以下幾個方面介紹 Flink 消費 Kafka 中的數(shù)據(jù)方式和源碼實現(xiàn)。
Flink 如何消費 Kafka
Flink 在和 Kafka 對接的過程中,跟 Kafka 的版本是強相關(guān)的。上一課時也提到了,我們在使用 Kafka 連接器時需要引用相對應(yīng)的 Jar 包依賴,對于某些連接器比如 Kafka 是有版本要求的,一定要去官方網(wǎng)站找到對應(yīng)的依賴版本。
我們本地的 Kafka 版本是 2.1.0,所以需要對應(yīng)的類是 FlinkKafkaConsumer。首先需要在 pom.xml 中引入 jar 包依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
下面將對 Flink 消費 Kafka 數(shù)據(jù)的方式進(jìn)行分類講解。
消費單個 Topic
上一課時我們在本地搭建了 Kafka 環(huán)境,并且手動創(chuàng)建了名為 test 的 Topic,然后向名為 test 的 Topic 中寫入了數(shù)據(jù)。
那么現(xiàn)在我們要消費這個 Topic 中的數(shù)據(jù),該怎么做呢?
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//設(shè)置消費組
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
//設(shè)置從最早的ffset消費
consumer.setStartFromEarliest();
//還可以手動指定相應(yīng)的 topic, partition,offset,然后從指定好的位置開始消費
//HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
//map.put(new KafkaTopicPartition("test", 1), 10240L);
//假如partition有多個,可以指定每個partition的消費位置
//map.put(new KafkaTopicPartition("test", 2), 10560L);
//然后各個partition從指定位置消費
//consumer.setStartFromSpecificOffsets(map);
env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
System.out.println(value);
}
});
env.execute("start consumer...");
}
在設(shè)置消費 Kafka 中的數(shù)據(jù)時,可以顯示地指定從某個 Topic 的每一個 Partition 中進(jìn)行消費。
消費多個 Topic
我們的業(yè)務(wù)中會有這樣的情況,同樣的數(shù)據(jù)根據(jù)類型不同發(fā)送到了不同的 Topic 中,比如線上的訂單數(shù)據(jù)根據(jù)來源不同分別發(fā)往移動端和 PC 端兩個 Topic 中。但是我們不想把同樣的代碼復(fù)制一份,需重新指定一個 Topic 進(jìn)行消費,這時候應(yīng)該怎么辦呢?
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//設(shè)置消費組
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("test_A");
topics.add("test_B");
// 傳入一個 list,完美解決了這個問題
FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
...
我們可以傳入一個 list 來解決消費多個 Topic 的問題,如果用戶需要區(qū)分兩個 Topic 中的數(shù)據(jù),那么需要在發(fā)往 Kafka 中數(shù)據(jù)新增一個字段,用來區(qū)分來源。
消息序列化
我們在上述消費 Kafka 消息時,都默認(rèn)指定了消息的序列化方式,即 SimpleStringSchema。這里需要注意的是,在我們使用 SimpleStringSchema 的時候,返回的結(jié)果中只有原數(shù)據(jù),沒有 topic、parition 等信息,這時候可以自定義序列化的方式來實現(xiàn)自定義返回數(shù)據(jù)的結(jié)構(gòu)。
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
//是否表示流的最后一條元素,設(shè)置為false,表示數(shù)據(jù)會源源不斷地到來
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
//這里返回一個ConsumerRecord<String,String>類型的數(shù)據(jù),除了原數(shù)據(jù)還包括topic,offset,partition等信息
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new ConsumerRecord<String, String>(
record.topic(),
record.partition(),
record.offset(),
new String(record.key()),
new String(record.value())
);
}
//指定數(shù)據(jù)的輸入類型
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
}
}
這里自定義了 CustomDeSerializationSchema 信息,就可以直接使用了。
Parition 和 Topic 動態(tài)發(fā)現(xiàn)
在很多場景下,隨著業(yè)務(wù)的擴展,我們需要對 Kafka 的分區(qū)進(jìn)行擴展,為了防止新增的分區(qū)沒有被及時發(fā)現(xiàn)導(dǎo)致數(shù)據(jù)丟失,消費者必須要感知 Partition 的動態(tài)變化,可以使用 FlinkKafkaConsumer 的動態(tài)分區(qū)發(fā)現(xiàn)實現(xiàn)。
我們只需要指定下面的配置,即可打開動態(tài)分區(qū)發(fā)現(xiàn)功能:每隔 10ms 會動態(tài)獲取 Topic 的元數(shù)據(jù),對于新增的 Partition 會自動從最早的位點開始消費數(shù)據(jù)。
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
如果業(yè)務(wù)場景需要我們動態(tài)地發(fā)現(xiàn) Topic,可以指定 Topic 的正則表達(dá)式:
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消費 Kafka 設(shè)置 offset 的方法
Flink 消費 Kafka 需要指定消費的 offset,也就是偏移量。Flink 讀取 Kafka 的消息有五種消費方式:
-
指定 Topic 和 Partition
-
從最早位點開始消費
-
從指定時間點開始消費
-
從最新的數(shù)據(jù)開始消費
-
從上次消費位點開始消費
/**
* Flink從指定的topic和parition中指定的offset開始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink從topic中最早的offset消費
*/
consumer.setStartFromEarliest();
/**
* Flink從topic中指定的時間點開始消費
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink從topic中最新的數(shù)據(jù)開始消費
*/
consumer.setStartFromLatest();
/**
* Flink從topic中指定的group上次消費的位置開始消費,所以必須配置group.id參數(shù)
*/
consumer.setStartFromGroupOffsets();
源碼解析
從上面的類圖可以看出,F(xiàn)linkKafkaConsumer 繼承了 FlinkKafkaConsumerBase,而 FlinkKafkaConsumerBase 最終是對 SourceFunction 進(jìn)行了實現(xiàn)。
整體的流程:FlinkKafkaConsumer 首先創(chuàng)建了 KafkaFetcher 對象,然后 KafkaFetcher 創(chuàng)建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 負(fù)責(zé)直接從 Kafka 中讀取 msg,并交給 Handover,然后 Handover 將 msg 傳遞給 KafkaFetcher.emitRecord 將消息發(fā)出。
因為 FlinkKafkaConsumerBase 實現(xiàn)了 RichFunction 接口,所以當(dāng)程序啟動的時候,會首先調(diào)用 FlinkKafkaConsumerBase.open 方法:
public void open(Configuration configuration) throws Exception {
// 指定offset的提交方式
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// 創(chuàng)建分區(qū)發(fā)現(xiàn)器
this.partitionDiscoverer = createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
<span class="hljs-keyword">if</span> (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} <span class="hljs-keyword">else</span> {
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
<span class="hljs-keyword">if</span> (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
<span class="hljs-keyword">if</span> (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
LOG.warn(
<span class="hljs-string">"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution."</span>,
entry.getKey());
<span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
}
<span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
});
}
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
<span class="hljs-keyword">switch</span> (startupMode) {
<span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
<span class="hljs-keyword">if</span> (specificStartupOffsets == <span class="hljs-keyword">null</span>) {
<span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
<span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.SPECIFIC_OFFSETS +
<span class="hljs-string">", but no specific offsets were specified."</span>);
}
<span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
Long specificOffset = specificStartupOffsets.get(seedPartition);
<span class="hljs-keyword">if</span> (specificOffset != <span class="hljs-keyword">null</span>) {
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - <span class="hljs-number">1</span>);
} <span class="hljs-keyword">else</span> {
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">case</span> TIMESTAMP:
<span class="hljs-keyword">if</span> (startupOffsetsTimestamp == <span class="hljs-keyword">null</span>) {
<span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
<span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.TIMESTAMP +
<span class="hljs-string">", but no startup timestamp was specified."</span>);
}
<span class="hljs-keyword">for</span> (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == <span class="hljs-keyword">null</span>)
KafkaTopicPartitionStateSentinel.LATEST_OFFSET
: partitionToOffset.getValue() - <span class="hljs-number">1</span>);
}
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">default</span>:
<span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
}
<span class="hljs-keyword">if</span> (!subscribedPartitionsToStartOffsets.isEmpty()) {
<span class="hljs-keyword">switch</span> (startupMode) {
<span class="hljs-keyword">case</span> EARLIEST:
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">case</span> LATEST:
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">case</span> TIMESTAMP:
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
startupOffsetsTimestamp,
subscribedPartitionsToStartOffsets.keySet());
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = <span class="hljs-keyword">new</span> ArrayList<>(subscribedPartitionsToStartOffsets.size());
<span class="hljs-keyword">for</span> (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
<span class="hljs-keyword">if</span> (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}
<span class="hljs-keyword">if</span> (partitionsDefaultedToGroupOffsets.size() > <span class="hljs-number">0</span>) {
LOG.warn(<span class="hljs-string">"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"</span> +
<span class="hljs-string">"; their startup offsets will be defaulted to their committed group offsets in Kafka."</span>,
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
<span class="hljs-keyword">break</span>;
<span class="hljs-keyword">case</span> GROUP_OFFSETS:
LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}"</span>,
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
} <span class="hljs-keyword">else</span> {
LOG.info(<span class="hljs-string">"Consumer subtask {} initially has no partitions to read from."</span>,
getRuntimeContext().getIndexOfThisSubtask());
}
}
}
對 Kafka 中的 Topic 和 Partition 的數(shù)據(jù)進(jìn)行讀取的核心邏輯都在 run 方法中:
public void run(SourceContext<T> sourceContext) throws Exception {
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
public void onSuccess() {
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
failedCommits.inc();
}
};
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(“Consumer subtask {} creating fetcher with offsets {}.”,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}
Flink 消費 Kafka 數(shù)據(jù)代碼
上面介紹了 Flink 消費 Kafka 的方式,以及消息序列化的方式,同時介紹了分區(qū)和 Topic 的動態(tài)發(fā)現(xiàn)方法,那么回到我們的項目中來,消費 Kafka 數(shù)據(jù)的完整代碼如下:
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
//設(shè)置消費組
properties.setProperty("group.id", "group_test");
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
//設(shè)置從最早的ffset消費
consumer.setStartFromEarliest();
env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
System.out.println(value);
}
});
env.execute("start consumer...");
}
}
我們可以直接右鍵運行代碼,在控制臺中可以看到數(shù)據(jù)的正常打印,如下圖所示:
通過代碼可知,我們之前發(fā)往 Kafka 的消息被完整地打印出來了。
總結(jié)
這一課時介紹了 Flink 消費 Kafka 的方式,比如從常用的指定單個或者多個 Topic、消息的序列化、分區(qū)的動態(tài)發(fā)現(xiàn)等,還從源碼上介紹了 Flink 消費 Kafka 的原理。通過本課時的學(xué)習(xí),相信你可以對 Flink 消費 Kafka 有一個較為全面地了解,根據(jù)業(yè)務(wù)場景可以正確選擇消費的方式和配置。
精選評論
**6513:
項目中也是用main方法作為入口嗎?有集成springboot的案例嗎
??? 講師回復(fù):
??? 在編寫Flink代碼時,盡量避免使用spring類的框架,因為沒有必要。只要依賴Flink必要的包和一些工具類即可。
**冰:
老師,這里可以從指定offset消費,怎么在程序終止或停止時保存offset,以便啟時使用了?
??? 講師回復(fù):
??? 如果實際情況需要保存位點,那么一般是自己管理位點,每次停止重啟后從自己管理的位點消費,比如你可以存儲在mysql中,自己去讀取
**良:
我是用flink消費Kafka從指定的topic和partition中指定的offset處開始消費,實際結(jié)果與預(yù)期不一致,消費的分區(qū)對不上是啥原因呢?示例代碼:Mapoffsets.put(new KafkaTopicPartition(topic, 0), 1L);offsets.put(new KafkaTopicPartition(topic, 1), 2L);offsets.put(new KafkaTopicPartition(topic, 2), 3L);consumer.setStartFromSpecificOffsets(offsets);返回結(jié)果:ConsumerRecord(topic=new-topic-config-test, partition=0, offset=10105849, key=, value=Python從入門到放棄!ConsumerRecord(topic=new-topic-config-test, partition=3, offset=10107121, key=, value=Python從入門到放棄!ConsumerRecord(topic=new-topic-config-test, partition=2, offset=10102806, key=, value=Java從入門到放棄!
??? 講師回復(fù):
??? 從兩個原因查詢,第一看下并行度的設(shè)置要和kafka分區(qū)設(shè)置保持一致。第二,要保證kafka4個分區(qū)都有數(shù)據(jù)。
*軒:
請問哪里可以下載項目用的數(shù)據(jù),不是源碼
??? 講師回復(fù):
??? 在項目中有數(shù)據(jù),也可以自己造一些數(shù)據(jù)
**7324:
flink如何將key相同的數(shù)據(jù)寫入到kafka的同一個partition呢?文章來源:http://www.zghlxwxcb.cn/news/detail-712510.html
??? 講師回復(fù):
??? 你可以自定義自己的kafka分區(qū)器,可以查一下FlinkKafkaPartitioner的用法,但是一般我們不會這么用,如果你需要自定義寫入kafka的分區(qū)器,要保證數(shù)據(jù)盡量均勻,不要引起kafka端的數(shù)據(jù)傾斜文章來源地址http://www.zghlxwxcb.cn/news/detail-712510.html
到了這里,關(guān)于輕松通關(guān)Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務(wù)開發(fā)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!