前言
這周的主要時間花在Flink上面,做了一個簡單的從文本文件中讀取數(shù)據(jù),然后存入數(shù)據(jù)庫的例子,能夠正常的實(shí)現(xiàn)功能,但是遇到個問題,我有四臺機(jī)器,自己搭建了一個standalone的集群,不論我把并行度設(shè)置多少,跑起來的耗時都非常接近,實(shí)在是百思不得其解。機(jī)器多似乎并不能幫助它。 把過程記錄在此,看后面隨著學(xué)習(xí)的深入能不能解答出這個問題。
嘗試過的修復(fù)方法
集群搭建
出現(xiàn)這個問題后,我從集群的角度來進(jìn)行了些修改,
1,機(jī)器是2核的,slots被設(shè)置成了6,那我就有點(diǎn)懷疑是這個設(shè)置問題,因?yàn)槠鋵?shí)只有2核,設(shè)置的多了,反而存在搶占資源,導(dǎo)致運(yùn)行達(dá)不到效果,改成2后效果一樣,沒有改進(jìn)。這個參數(shù)在
taskmanager.numberOfTaskSlots: 2
2,調(diào)整內(nèi)存, taskmanager 從2G調(diào)整為4G, 效果也沒有變化。
taskmanager.memory.process.size: 4000m
這里說下這個內(nèi)存,我們設(shè)置的是總的Memory,也就是這個Total Process Memory。
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory。
所以我們調(diào)整大小后,它兩個也就相應(yīng)的增加了。 我查了下這兩個,可以理解為堆內(nèi)存和堆外內(nèi)存,
一個是存放我們程序的對象,會被垃圾回收器回收;一個是堆外內(nèi)存,比如RockDB 和 緩存 sort,hash 等的中間結(jié)果。
程序方面修改
最開始的時候我把保存數(shù)據(jù)庫操作寫在MapFunction里面,后來改到SinkFunction里面。
SinkFunction里面保存數(shù)據(jù)庫的方法也進(jìn)行了反復(fù)修改,從開始使用Spring的JdbcTemplate,換成后來直接使用最原始JDBC。 而且還踩了一個坑,開始的時候用的注入的JdbcTemplate, 本地運(yùn)行沒有問題,到了集群上面,發(fā)到別的機(jī)器的時候,注入的東西就是空的了。
換成原始的JDBC速度能提升不少, 我猜想這里的原因是jdbctemplate做了些多余的事情, JDBC打開一次,后面Invoke的時候就直接存了,效率要高些,所以速度上提升不少。
這里把部分代碼貼出來, 在Open的時候就預(yù)加載好PreparedStatement, Invoke的時候直接傳參數(shù),調(diào)用就可以了。文章來源:http://www.zghlxwxcb.cn/news/detail-414576.html
public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
private PreparedStatement updatePS;
private PreparedStatement insertPS;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HikariDataSource dataSource = new HikariDataSource();
connection = getConnection(dataSource);
if(connection != null)
{
String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
updatePS = this.connection.prepareStatement(updateSQL);
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
}
}
@Override
public void close() throws Exception {
super.close();
if (updatePS != null) {
updatePS.close();
}
if (insertPS != null) {
insertPS.close();
}
//關(guān)閉連接和釋放資源
if (connection != null) {
connection.close();
}
}
/**
* 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
*
* @param marketPrice
* @param context
* @throws Exception
*/
@Override
public void invoke(MarketPrice marketPrice, Context context) throws Exception {
log.info("start save for {}", marketPrice.getPerformanceId().toString() );
updatePS.setDouble(1,marketPrice.getOpenPrice());
updatePS.setDouble(2,marketPrice.getHighPrice());
updatePS.setDouble(3,marketPrice.getLowPrice());
updatePS.setDouble(4,marketPrice.getClosePrice());
updatePS.setString(5, marketPrice.getPerformanceId().toString());
updatePS.setInt(6, marketPrice.getPriceAsOfDate());
int result = updatePS.executeUpdate();
log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);
if(result == 0)
{
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
insertPS.setString(1, marketPrice.getPerformanceId().toString());
insertPS.setInt(2, marketPrice.getPriceAsOfDate());
insertPS.setDouble(3,marketPrice.getOpenPrice());
insertPS.setDouble(4,marketPrice.getHighPrice());
insertPS.setDouble(5,marketPrice.getLowPrice());
insertPS.setDouble(6,marketPrice.getClosePrice());
result = insertPS.executeUpdate();
log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
}
}
}
總結(jié)
從多個方面去改進(jìn),結(jié)果發(fā)現(xiàn)還是一樣的,就是使用一臺機(jī)器和使用三臺機(jī)器,時間上一樣的,再懷疑我只能懷疑是某臺機(jī)器有問題,然后運(yùn)行的時候,由最慢的機(jī)器決定了速度。 我在使用MapFunction的時候有觀察到,有的時候,某臺機(jī)器已經(jīng)處理上千條,而有的只處理了幾十條,到最后完成的時候,大家處理的數(shù)量又是很接近的。這樣能夠解釋為什么機(jī)器多了,速度卻是一樣的。但是我沒有辦法找出哪臺機(jī)器來。 我自己的本地運(yùn)行,并行數(shù)設(shè)置的多,速度上面是有提升的,到了集群就碰到這樣的現(xiàn)象,后面看能不能解決它, 先記錄在此。文章來源地址http://www.zghlxwxcb.cn/news/detail-414576.html
到了這里,關(guān)于記一次Flink遇到性能瓶頸的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!