国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka復(fù)習(xí):(3)自定義序列化器和反序列化器

這篇具有很好參考價(jià)值的文章主要介紹了kafka復(fù)習(xí):(3)自定義序列化器和反序列化器。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、實(shí)體類定義:


public class Company {
    private String name;
    private String address;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "Company{" +
                "name='" + name + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

    public Company(String name, String address) {
        this.name = name;
        this.address = address;
    }

    public Company() {
    }
}

二、自定義序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    //進(jìn)行字節(jié)數(shù)組序列化
    @Override
    public byte[] serialize(String topic, Company data) {
        if(data == null){
            return null;
        }
        byte[] name, address;
        try{
            if(data.getName() != null){
                name = data.getName().getBytes("UTF-8");
            }else {
                name = new byte[0];
            }
            if(data.getAddress() != null){
                address = data.getAddress().getBytes("UTF-8");
            }else{
                address = new byte[0];
            }
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);

            byteBuffer.putInt(name.length);
            byteBuffer.put(name);
            byteBuffer.putInt(address.length);
            byteBuffer.put(address);
            return byteBuffer.array();
        }catch (UnsupportedEncodingException e){
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException ex) {
            throw new SerializationException("Error:"+ex.getMessage());
        }
        return new Company(name,address);

    }

    @Override
    public void close() {

    }
}

三、定義生產(chǎn)者和消費(fèi)者文章來源地址http://www.zghlxwxcb.cn/news/detail-667727.html

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CompanyProducer {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //設(shè)置value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
        properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");
        KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
        Company company = new Company();
        company.setAddress("Beijing");
        company.setName("Connection");
        ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);
        producer.send(record).get();
    }
}

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CompanyConsumer {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");
        KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));
        while(true){
            ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){
                System.out.println(consumerRecord.value());
            }
        }
    }
}

到了這里,關(guān)于kafka復(fù)習(xí):(3)自定義序列化器和反序列化器的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Unity-序列化和反序列化

    序列化是指把對象轉(zhuǎn)換為字節(jié)序列的過程,而反序列化是指把字節(jié)序列恢復(fù)為對象的過程。序列化最主要的用途就是傳遞對象和保存對象。 在Unity中保存和加載、prefab、scene、Inspector窗口、實(shí)例化預(yù)制體等都使用了序列化與反序列化。 1 自定義的具有Serializable特性的非抽象、

    2024年01月24日
    瀏覽(27)
  • Java序列化和反序列化

    目錄 一、序列化和反序列化 二、Java序列化演示 三、反序列化漏洞 1、含義 ?序列化就是內(nèi)存中的對象寫入到IO流中,保存的格式可以是二進(jìn)制或者文本內(nèi)容。反序列化就是IO流還原成對象。 2、用途 (1)傳輸網(wǎng)絡(luò)對象 (2)保存Session 1、序列化 java.io.ObjectOutputStream代表對象

    2023年04月25日
    瀏覽(24)
  • Java序列化和反序列化機(jī)制

    在閱讀 ArrayList 源碼的時(shí)候,注意到,其內(nèi)部的成員變量動態(tài)數(shù)組 elementData 被Java中的 transient 修飾 transient 意味著Java在序列化時(shí)會跳過該字段(不序列化該字段) 而Java在默認(rèn)情況下會序列化類(實(shí)現(xiàn)了 Java.io.Serializable 接口的類)的所有非瞬態(tài)(未被 transient 修飾

    2024年03月15日
    瀏覽(27)
  • [計(jì)算機(jī)網(wǎng)絡(luò)]---序列化和反序列化

    [計(jì)算機(jī)網(wǎng)絡(luò)]---序列化和反序列化

    前言 作者 :小蝸牛向前沖 名言 :我可以接受失敗,但我不能接受放棄 ?? 如果覺的博主的文章還不錯(cuò)的話,還請 點(diǎn)贊,收藏,關(guān)注??支持博主。如果發(fā)現(xiàn)有問題的地方歡迎?大家在評論區(qū)指正? 目錄 ?一、再談協(xié)議 二、序列化和反序化 1、網(wǎng)絡(luò)版本計(jì)算器的場景搭建 2、

    2024年02月20日
    瀏覽(21)
  • TCP定制協(xié)議,序列化和反序列化

    TCP定制協(xié)議,序列化和反序列化

    目錄 前言 1.理解協(xié)議 2.網(wǎng)絡(luò)版本計(jì)算器 2.1設(shè)計(jì)思路 2.2接口設(shè)計(jì) 2.3代碼實(shí)現(xiàn): 2.4編譯測試 總結(jié) ? ? ? ? 在之前的文章中,我們說TCP是面向字節(jié)流的,但是可能對于面向字節(jié)流這個(gè)概念,其實(shí)并不理解的,今天我們要介紹的是如何理解TCP是面向字節(jié)流的,通過編碼的方式,自

    2024年02月12日
    瀏覽(23)
  • java中的序列化和反序列化

    objectOutputStream 對象的序列化,以流的形式將對象寫入文件 構(gòu)造方法: objectOutputStream(OutputStream out) 傳入一個(gè)字節(jié)輸入流創(chuàng)建objectOutputStream對象 成員方法: void writeObject(object obj) 將指定的對象寫入objectOutputStream 使用步驟: 創(chuàng)建一個(gè)類,這個(gè)類實(shí)現(xiàn)Serializable接口,Serializable是一

    2024年02月14日
    瀏覽(19)
  • Java中序列化和反序列化解釋

    在Java中,序列化(Serialization)是指將對象的狀態(tài)轉(zhuǎn)換為字節(jié)流的過程,以便將其保存到文件、在網(wǎng)絡(luò)中傳輸或持久化到數(shù)據(jù)庫中。而反序列化(Deserialization)則是將字節(jié)流轉(zhuǎn)換回對象的過程,恢復(fù)對象的狀態(tài)。 序列化和反序列化主要用于以下場景: 1. 對象持久化:通過序列

    2024年02月07日
    瀏覽(23)
  • 從淺入深理解序列化和反序列化

    從淺入深理解序列化和反序列化

    什么是java序列化 序列化:把對象轉(zhuǎn)換為字節(jié)序列的過程 反序列:把字節(jié)序列恢復(fù)為對象的過程 對象序列化機(jī)制(object serialization)是java語言內(nèi)建的一種對象持久化方式,通過對象序列化,可以將對象的狀態(tài)信息保存為字節(jié)數(shù)組,并且可以在有需要的時(shí)候?qū)⑦@個(gè)字節(jié)數(shù)組通過

    2024年02月06日
    瀏覽(30)
  • iOS處理json,序列化和反序列化

    Mantle 是一個(gè)開源的 Objective-C 框架,用于在 iOS 和 macOS 應(yīng)用程序中實(shí)現(xiàn)模型層的序列化和反序列化。它提供了一種簡單而強(qiáng)大的方式來將 JSON數(shù)據(jù)格式轉(zhuǎn)換為自定義的數(shù)據(jù)模型對象,以及將數(shù)據(jù)模型對象轉(zhuǎn)換為字典或 JSON 格式。 Mantle具有如下特點(diǎn) 自動映射 Mantle自動將 JSON 數(shù)據(jù)

    2024年02月11日
    瀏覽(25)
  • 【精選】PHP&java 序列化和反序列化漏洞

    目錄 首先 其次 技巧和方法

    2024年01月23日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包