国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

記一次Flink遇到性能瓶頸

這篇具有很好參考價值的文章主要介紹了記一次Flink遇到性能瓶頸。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

前言

這周的主要時間花在Flink上面,做了一個簡單的從文本文件中讀取數(shù)據(jù),然后存入數(shù)據(jù)庫的例子,能夠正常的實(shí)現(xiàn)功能,但是遇到個問題,我有四臺機(jī)器,自己搭建了一個standalone的集群,不論我把并行度設(shè)置多少,跑起來的耗時都非常接近,實(shí)在是百思不得其解。機(jī)器多似乎并不能幫助它。 把過程記錄在此,看后面隨著學(xué)習(xí)的深入能不能解答出這個問題。
記一次Flink遇到性能瓶頸

嘗試過的修復(fù)方法

集群搭建

出現(xiàn)這個問題后,我從集群的角度來進(jìn)行了些修改,
1,機(jī)器是2核的,slots被設(shè)置成了6,那我就有點(diǎn)懷疑是這個設(shè)置問題,因?yàn)槠鋵?shí)只有2核,設(shè)置的多了,反而存在搶占資源,導(dǎo)致運(yùn)行達(dá)不到效果,改成2后效果一樣,沒有改進(jìn)。這個參數(shù)在
taskmanager.numberOfTaskSlots: 2
2,調(diào)整內(nèi)存, taskmanager 從2G調(diào)整為4G, 效果也沒有變化。
taskmanager.memory.process.size: 4000m
這里說下這個內(nèi)存,我們設(shè)置的是總的Memory,也就是這個Total Process Memory。
記一次Flink遇到性能瓶頸
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory。
所以我們調(diào)整大小后,它兩個也就相應(yīng)的增加了。 我查了下這兩個,可以理解為堆內(nèi)存和堆外內(nèi)存,
一個是存放我們程序的對象,會被垃圾回收器回收;一個是堆外內(nèi)存,比如RockDB 和 緩存 sort,hash 等的中間結(jié)果。

程序方面修改

最開始的時候我把保存數(shù)據(jù)庫操作寫在MapFunction里面,后來改到SinkFunction里面。
SinkFunction里面保存數(shù)據(jù)庫的方法也進(jìn)行了反復(fù)修改,從開始使用Spring的JdbcTemplate,換成后來直接使用最原始JDBC。 而且還踩了一個坑,開始的時候用的注入的JdbcTemplate, 本地運(yùn)行沒有問題,到了集群上面,發(fā)到別的機(jī)器的時候,注入的東西就是空的了。
換成原始的JDBC速度能提升不少, 我猜想這里的原因是jdbctemplate做了些多余的事情, JDBC打開一次,后面Invoke的時候就直接存了,效率要高些,所以速度上提升不少。
這里把部分代碼貼出來, 在Open的時候就預(yù)加載好PreparedStatement, Invoke的時候直接傳參數(shù),調(diào)用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //關(guān)閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

總結(jié)

從多個方面去改進(jìn),結(jié)果發(fā)現(xiàn)還是一樣的,就是使用一臺機(jī)器和使用三臺機(jī)器,時間上一樣的,再懷疑我只能懷疑是某臺機(jī)器有問題,然后運(yùn)行的時候,由最慢的機(jī)器決定了速度。 我在使用MapFunction的時候有觀察到,有的時候,某臺機(jī)器已經(jīng)處理上千條,而有的只處理了幾十條,到最后完成的時候,大家處理的數(shù)量又是很接近的。這樣能夠解釋為什么機(jī)器多了,速度卻是一樣的。但是我沒有辦法找出哪臺機(jī)器來。 我自己的本地運(yùn)行,并行數(shù)設(shè)置的多,速度上面是有提升的,到了集群就碰到這樣的現(xiàn)象,后面看能不能解決它, 先記錄在此。文章來源地址http://www.zghlxwxcb.cn/news/detail-414576.html

到了這里,關(guān)于記一次Flink遇到性能瓶頸的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 記一次Selenium框架的爬蟲遇到下拉框頁面的解決經(jīng)歷

    記一次Selenium框架的爬蟲遇到下拉框頁面的解決經(jīng)歷

    最近有一個項(xiàng)目需要使用爬蟲從某網(wǎng)站抓取全國的醫(yī)院名稱,等級,地址等信息 爬取的url為https://some/website/that/i/can/tell/you/sorry 用瀏覽器打開這個url會發(fā)現(xiàn),切換不同的省市需要點(diǎn)擊左上角的下拉框進(jìn)行選擇 通常遇到這種下拉框頁面,我們第一時間想到使用Selenium框架的Sel

    2024年01月21日
    瀏覽(28)
  • 記一次翻頁性能優(yōu)化

    記一次翻頁性能優(yōu)化

    ???由于是公司項(xiàng)目,所以不方便給出代碼或者視頻,只能列一些自己畫的流程圖。 ???大致情況如上,前端有7個顯示區(qū)。在對其進(jìn)行滾動翻頁的時候,存在以下問題: ???通過分析代碼,調(diào)查log發(fā)現(xiàn),翻頁切換平均耗時在600ms。其主要的業(yè)務(wù)邏輯如下: 主要問題有

    2024年02月05日
    瀏覽(42)
  • 記一次模糊查詢踩坑 Flink+ES

    公司需要對商品名稱進(jìn)行模糊模糊查詢,考慮到商品表存量數(shù)據(jù)千萬級,直接數(shù)據(jù)庫模糊查詢效率肯定極其低下,所以選擇使用 ElasticSearch 對商品信息進(jìn)行模糊查詢。 因?yàn)樾枰嬖械牟樵兘涌?,保持原有查詢接口的入?yún)⒊鰠?,所以需要全?增量同步MySQL數(shù)據(jù)到ES進(jìn)行索引

    2024年02月05日
    瀏覽(29)
  • 記一次Flink通過Kafka寫入MySQL的過程

    記一次Flink通過Kafka寫入MySQL的過程

    一、前言 總體思路:source --transform --sink ,即從source獲取相應(yīng)的數(shù)據(jù)來源,然后進(jìn)行數(shù)據(jù)轉(zhuǎn)換,將數(shù)據(jù)從比較亂的格式,轉(zhuǎn)換成我們需要的格式,轉(zhuǎn)換處理后,然后進(jìn)行sink功能,也就是將數(shù)據(jù)寫入的相應(yīng)的數(shù)據(jù)庫DB中或者寫入Hive的HDFS文件存儲。 思路: pom部分放到最后面。 二

    2024年01月24日
    瀏覽(27)
  • 記一次 JMeter 壓測 HTTPS 性能問題

    記一次 JMeter 壓測 HTTPS 性能問題

    在使用 JMeter 壓測時,發(fā)現(xiàn)同一后端服務(wù),在單機(jī) 500 并發(fā)下,HTTP 和 HTTPS 協(xié)議壓測 RT 差距非常大。同時觀測后端服務(wù)各監(jiān)控指標(biāo)水位都很低,因此懷疑性能瓶頸在 JMeter 施壓客戶端。 切入點(diǎn):垃圾回收 首先在施壓機(jī)觀察到 CPU 使用率和內(nèi)存使用率都很高,詳細(xì)看下各線程

    2024年01月21日
    瀏覽(31)
  • 記一次rax應(yīng)用用戶體驗(yàn)性能優(yōu)化

    記一次rax應(yīng)用用戶體驗(yàn)性能優(yōu)化

    對于前端開發(fā)攻城獅們來說,性能優(yōu)化是一個永恒的話題。隨著前端需求復(fù)雜度的不斷升高,在項(xiàng)目中想始終保持著良好的性能也逐漸成為了一個有挑戰(zhàn)的事情。本次分享簡述我們在 Rax 項(xiàng)目中常用的一些性能優(yōu)化方式,并將從近期的一個實(shí)際業(yè)務(wù)需求出發(fā),講述我在 Rax C端

    2024年02月21日
    瀏覽(22)
  • 記一次SpringBoot應(yīng)用性能調(diào)優(yōu)過程

    記一次SpringBoot應(yīng)用性能調(diào)優(yōu)過程

    使用SpringBoot、MyBatis-Plus開發(fā)一個接口轉(zhuǎn)發(fā)的能,將第三方接口注冊到平臺中,由平臺對外提供統(tǒng)一的地址,平臺轉(zhuǎn)發(fā)時記錄接口的轉(zhuǎn)發(fā)日志信息。開發(fā)完成后使用Jmeter進(jìn)行性能測試,使用100個線程、持續(xù)壓測180秒,測試結(jié)果如下,每秒僅支持8個并發(fā)。 服務(wù)器 作用 CPU核數(shù) 內(nèi)

    2024年02月03日
    瀏覽(19)
  • 記一次卡頓的性能優(yōu)化經(jīng)歷實(shí)操

    記一次卡頓的性能優(yōu)化經(jīng)歷實(shí)操

    本篇的性能優(yōu)化不是八股文類的優(yōu)化方案,而是針對具體場景,具體分析,從排查卡頓根因到一步步尋找解決方案,甚至是規(guī)避等方案來最終解決性能問題的經(jīng)歷實(shí)操 所以,解決方案可能不通用,不適用于你的場景,但這個解決過程是如何一步步去處理的,解決思路是怎么樣

    2024年02月02日
    瀏覽(32)
  • 測試2年遇到瓶頸,如何跨過這個坎,實(shí)現(xiàn)漲薪5k?

    測試2年遇到瓶頸,如何跨過這個坎,實(shí)現(xiàn)漲薪5k?

    最近和字節(jié)跳動的一個老朋友閑聊,感觸頗深,據(jù)他說公司近期招聘的測試工程師,大多數(shù)候選人都有一個“通病”: 在工作2-3年的時候遇到瓶頸,而且是一道很難跨越的坎。 為什么會遇到這種情況?因?yàn)榇蟛糠譁y試工程師在工作了一段時間后,都可以完成最初的基本知識

    2023年04月26日
    瀏覽(20)
  • 性能分析5部曲:瓶頸分析與問題定位,如何快速解決瓶頸?

    性能分析5部曲:瓶頸分析與問題定位,如何快速解決瓶頸?

    一、引言 很多做性能測試的同學(xué)都問過我這樣一個問題:魚哥(Carl_奕然),你說性能測試的重點(diǎn)是什么? 我的回答很簡單:瓶頸分析與問題定位。 在性能項(xiàng)目的整個周期,不管是腳本設(shè)計,腳本編寫還是腳本執(zhí)行,都還算簡單。 難點(diǎn)在于如何定位瓶頸,分析瓶頸,解決瓶頸。

    2024年02月20日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包