国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka+Fink 實戰(zhàn)+工具類

這篇具有很好參考價值的文章主要介紹了Kafka+Fink 實戰(zhàn)+工具類。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 記錄日志
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 瀏覽器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部請求頭
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日志類型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志內(nèi)容
                .data(availableMap)
                //客戶端ip
                .ip(ip)
                // 時間
                .ts(CommonUtil.getCurrentTimestamp())
                //業(yè)務(wù)唯一標(biāo)識(短鏈碼)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //打印日志 in 控制臺
        log.info(jsonLog);

        // 發(fā)送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定義 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定義 消費組
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定義 消費組
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端設(shè)備唯一標(biāo)識
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分組
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 識別新老訪客    richMap open函數(shù),對狀態(tài)以及日期格式進行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老訪客");

        // 存儲到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 獲取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject dataJsonObj = jsonObject.getJSONObject("data");
        if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                    URL url = new URL(referer);
                    return url.getHost();
                } catch (MalformedURLException e) {
                    log.error("提取referer失敗:{}",e.toString());
                }
            }
        }

        return "";

    }

    /**
     * 生成設(shè)備唯一標(biāo)識
     *
     * @param jsonObject
     * @return
     */
    public static String getDeviceId(JSONObject jsonObject){
        Map<String,String> map= new TreeMap<>();

        try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

        }catch (Exception e){
            log.error("生產(chǎn)唯一deviceId異常:{}", jsonObject);
            return null;
        }


    }


}

  • KafkaUtil

    @Slf4j
    public class KafkaUtil {
    
        /**
         * kafka 的 broker 地址
         */
        private static String KAFKA_SERVER = null;
    
        static {
            Properties properties = new Properties();
    
            InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    
            try {
                properties.load(in);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("加載Kafka配置文件失敗:{}",e.getMessage());
            }
    
            //獲取配置文件中的value
            KAFKA_SERVER = properties.getProperty("kafka.servers");
    
        }
    
        /**
         * 獲取flink的kafka消費者
         * @param topic
         * @param groupId
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    
            return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
        }
    
        /**
         * 獲取flink的kafka生產(chǎn)者
         * @param topic
         * @return
         */
        public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
            return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
        }
    }
    
    
  • TimeUtil文章來源地址http://www.zghlxwxcb.cn/news/detail-658286.html

    public class TimeUtil {
    
        /**
         * 默認(rèn)日期格式
         */
        private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    
        /**
         * 默認(rèn)日期格式
         */
        private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    
        private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    
    
        /**
         * LocalDateTime 轉(zhuǎn) 字符串,指定日期格式
         * @param localDateTime
         * @param pattern
         * @return
         */
        public static String format(LocalDateTime localDateTime, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * Date 轉(zhuǎn) 字符串, 指定日期格式
         * @param time
         * @param pattern
         * @return
         */
        public static String format(Date time, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         *  Date 轉(zhuǎn) 字符串,默認(rèn)日期格式
         * @param time
         * @return
         */
        public static String format(Date time){
    
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         * timestamp 轉(zhuǎn) 字符串,默認(rèn)日期格式
         *
         * @param timestamp
         * @return
         */
        public static String format(long timestamp) {
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * 字符串 轉(zhuǎn) Date
         *
         * @param time
         * @return
         */
        public static Date strToDate(String time) {
            LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
            return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    
        }
    
    }
    

到了這里,關(guān)于Kafka+Fink 實戰(zhàn)+工具類的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka實戰(zhàn)進階:一篇詳解與互聯(lián)網(wǎng)實戰(zhàn)PDF指南,帶你深入Apache Kafka的世界

    Kafka實戰(zhàn)進階:一篇詳解與互聯(lián)網(wǎng)實戰(zhàn)PDF指南,帶你深入Apache Kafka的世界

    Apache Kafka 是由Apache軟件基金會開發(fā)的一款開源消息系統(tǒng)項目,主要使用Scala語言編寫。該項目旨在為處理實時數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的平臺。Kafka作為一種分布式的、分區(qū)的、多復(fù)本的日志提交服務(wù),憑借其獨特的設(shè)計提供了豐富的消息系統(tǒng)功能。 特點 高吞吐量

    2024年01月19日
    瀏覽(24)
  • Spring Boot+Kafka實戰(zhàn)生產(chǎn)級Kafka消費組

    作者:禪與計算機程序設(shè)計藝術(shù) Kafka是一個開源分布式消息系統(tǒng),最初由LinkedIn開發(fā),之后成為Apache項目的一部分。Kafka主要用于大數(shù)據(jù)實時流處理,具有低延遲、高吞吐量等特點。本文將會從基本概念、術(shù)語說明、原理及應(yīng)用場景三個方面對Kafka進行詳細(xì)介紹。 Kafka作為一個

    2024年02月10日
    瀏覽(23)
  • 【Kafka】Kafka監(jiān)控工具Kafka-eagle簡介

    Kafka-eagle是一種基于Web的開源管理工具,可以用來監(jiān)控、管理多個Kafka集群。 下面是使用Docker部署Kafka-eagle的步驟: 下載并安裝Docker和Docker Compose。 創(chuàng)建文件夾,例如kafka-eagle,并在其中創(chuàng)建docker-compose.yml文件,將以下配置寫入: 在命令行中轉(zhuǎn)到kafka-eagle文件夾中,運行以下命

    2024年02月14日
    瀏覽(17)
  • kafka map kafka可視化工具

    kafka map kafka可視化工具

    kafka-map是使用Java17和React開發(fā)的一款kafka可視化工具。 目前支持的功能有: 多集群管理 集群狀態(tài)監(jiān)控(分區(qū)數(shù)量、副本數(shù)量、存儲大小、offset) 主題創(chuàng)建、刪除、擴容(刪除需配置delete.topic.enable = true) broker狀態(tài)監(jiān)控 消費者組查看、刪除 重置offset 消息查詢(支持String和j

    2024年03月28日
    瀏覽(163)
  • Kafka 可視化工具 Kafka Tool

    Kafka 可視化工具 Kafka Tool

    使用Kafka的小伙伴,有沒有為無法直觀地查看 Kafka 的 Topic 里的內(nèi)容而發(fā)過愁呢? 下面推薦給大家一款帶有可視化頁面的Kafka工具: Kafka Tool (目前最新版本是 2.0.4 ) 下載地址 http://www.kafkatool.com/download.html 下載界面 不同版本的Kafka對應(yīng)不同版本的工具,個人使用的是0.11,所

    2024年02月12日
    瀏覽(20)
  • KafKa 分區(qū),副本實戰(zhàn)

    5個broker (1主4從) 安裝目路/config/server.properties, 額外復(fù)制4份為 server-2.properties,server-3.properties,server-4.properties,server-5.properties 主要配置不同 server.properties server-2.properties server-3.properties server-4.properties server-5.properties 運行這5個broker 創(chuàng)建一個主題test,8個分區(qū),3個副本 bootstrap

    2024年02月11日
    瀏覽(18)
  • 【面試實戰(zhàn)】Kafka面試題

    是一個分布式流式處理平臺,流平臺一個關(guān)鍵的功能就是消息隊列。 項目中使用Kafka消息隊列,對評論、點贊、關(guān)注功能發(fā)布通知,封裝為Event實體類。 消費者負(fù)責(zé)將消息隊列中的Event取出,并將其封裝為Message對象,并持久化到數(shù)據(jù)庫中保存。 了解到除了Kafka還有其他的消息

    2024年02月07日
    瀏覽(12)
  • kafka消息系統(tǒng)實戰(zhàn)

    kafka消息系統(tǒng)實戰(zhàn)

    kafka是什么? ????????是一種高吞吐量的、分布式、發(fā)布、訂閱、消息系統(tǒng)? 1.導(dǎo)入maven坐標(biāo) 2.編寫提供者 3.編寫消費者 ?4.下載kafka 點此去官網(wǎng)下載——Apache Kafka 解壓后進入config目錄? 修改zookeeper.properties dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper 修改日志存放的路徑server.propertie

    2024年02月10日
    瀏覽(11)
  • 【kafka】Kafka 可視化工具Kafka Eagle安裝和使用

    【kafka】Kafka 可視化工具Kafka Eagle安裝和使用

    Kafka產(chǎn)線環(huán)境需要管理的Topic和Consumser越來越多,使用命令行工具進行管理會非常繁雜。因此,大數(shù)據(jù)平臺上需要一套Kafka的管理監(jiān)控系統(tǒng),Kafka-Eagle。 Kafka Eagle是一個用于監(jiān)控和管理kafka的開源組件,可以同時監(jiān)控多個kafka集群。 Kafka Eagle提供了完善的監(jiān)控頁面和kafka常用操作

    2023年04月15日
    瀏覽(21)
  • kafka學(xué)習(xí)-概念與簡單實戰(zhàn)

    kafka學(xué)習(xí)-概念與簡單實戰(zhàn)

    目錄 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生產(chǎn)者和消費者 2、開發(fā)實戰(zhàn) 2.1、消息發(fā)送 介紹 代碼實現(xiàn) 2.2、消息消費 介紹 代碼實現(xiàn) 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer ????????kafka的基本數(shù)據(jù)單元,由字節(jié)數(shù)組組成??梢岳斫?/p>

    2024年02月09日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包