重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

怎么在Java中利用kafka发送消息

这期内容当中小编将会给大家带来有关怎么在Java中利用kafka发送消息,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

十年的牙克石网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都全网营销的优势是能够根据用户设备显示端的尺寸不同,自动调整牙克石建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联公司从事“牙克石网站设计”,“牙克石网站推广”以来,每个客户项目都认真落实执行。

1. maven依赖包

 
 org.apache.kafka 
 kafka-clients 
 0.9.0.1 

2. 生产者代码

package com.lnho.example.kafka;  
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord;   
import java.util.Properties;   
public class KafkaProducerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("acks", "all"); 
  props.put("retries", 0); 
  props.put("batch.size", 16384); 
  props.put("linger.ms", 1); 
  props.put("buffer.memory", 33554432); 
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");   
  Producer producer = new KafkaProducer<>(props); 
  for(int i = 0; i < 100; i++) 
   producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));   
  producer.close(); 
 } 
}

3. 消费者代码

package com.lnho.example.kafka;   
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import java.util.Arrays; 
import java.util.Properties;   
public class KafkaConsumerExample { 
 public static void main(String[] args) { 
  Properties props = new Properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("group.id", "test"); 
  props.put("enable.auto.commit", "true"); 
  props.put("auto.commit.interval.ms", "1000"); 
  props.put("session.timeout.ms", "30000"); 
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
  KafkaConsumer consumer = new KafkaConsumer<>(props); 
  consumer.subscribe(Arrays.asList("topic1")); 
  while (true) { 
   ConsumerRecords records = consumer.poll(100); 
   for (ConsumerRecord record : records) 
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); 
  } 
 } 
}

上述就是小编为大家分享的怎么在Java中利用kafka发送消息了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。


本文题目:怎么在Java中利用kafka发送消息
浏览地址:http://cqcxhl.cn/article/pgsshj.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP