kafka json格式—代码示例

houduangongchengshi

温馨提示:这篇文章已超过287天没有更新,请注意相关的内容是否还可用!

kafka json格式—代码示例

Kafka是一个分布式流处理平台,它支持以JSON格式进行数据传输。JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于阅读和编写,并且可以被多种编程语言解析和生成。

在Kafka中,JSON格式的消息可以被序列化和反序列化,以便在生产者和消费者之间进行传递和处理。JSON格式的消息由键值对组成,其中键是一个字符串,值可以是任何JSON支持的数据类型,如字符串、数字、布尔值、数组或对象。

下面是一个示例代码,展示了如何使用Kafka发送和接收JSON格式的消息:

我们需要创建一个Kafka生产者,配置生产者的属性,包括Kafka集群的地址和序列化器。在发送消息之前,我们需要将消息转换为JSON格式,并将其包装在ProducerRecord对象中:

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.StringSerializer;

import org.json.JSONObject;

public class KafkaProducerExample {

public static void main(String[] args) {

// 配置Kafka生产者的属性

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建Kafka生产者

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 创建JSON对象

JSONObject jsonMessage = new JSONObject();

jsonMessage.put("name", "John");

jsonMessage.put("age", 30);

jsonMessage.put("city", "New York");

// 将JSON对象转换为字符串并发送消息

ProducerRecord<String, String> record = new ProducerRecord<>("topic", jsonMessage.toString());

producer.send(record, new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception != null) {

System.err.println("Failed to send message: " + exception.getMessage());

} else {

System.out.println("Message sent successfully: " + metadata.topic() + ", " + metadata.partition() + ", " + metadata.offset());

}

}

});

// 关闭Kafka生产者

producer.close();

}

}

接下来,我们需要创建一个Kafka消费者,配置消费者的属性,并订阅要消费的主题。当接收到JSON格式的消息时,我们可以将其解析为JSON对象,并从中提取所需的数据:

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.json.JSONObject;

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

public class KafkaConsumerExample {

public static void main(String[] args) {

// 配置Kafka消费者的属性

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 创建Kafka消费者

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅要消费的主题

consumer.subscribe(Collections.singletonList("topic"));

// 持续消费消息

while (true) {

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {

// 解析JSON消息

JSONObject jsonMessage = new JSONObject(record.value());

String name = jsonMessage.getString("name");

int age = jsonMessage.getInt("age");

String city = jsonMessage.getString("city");

// 处理消息

System.out.println("Received message: name=" + name + ", age=" + age + ", city=" + city);

}

}

}

}

通过以上示例代码,我们可以看到如何在Kafka中使用JSON格式进行消息传输和处理。生产者将消息转换为JSON字符串并发送,消费者接收消息后解析JSON并提取所需的数据进行处理。这种方式非常灵活,可以适应各种类型的数据传输和处理需求。

文章版权声明:除非注明,否则均为莫宇前端原创文章,转载或复制请以超链接形式并注明出处。

取消
微信二维码
微信二维码
支付宝二维码