重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
最近写了个kafka的接收消息的功能,需要使用回调处理收到的消息。
为北镇等地区用户提供了全套网页设计制作服务,及北镇网站建设行业解决方案。主营业务为网站设计、成都网站建设、北镇网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
一个是基本的回调,一个是使用接口功能实现回调,对接口是个很好的学习。
1.正常回调
kafka的接收消息处。收到消息后,使用传入的Onmessage进行处理。
调用kafka接收消息的单元,并在调用方写好回调
在调用方实现回调需要执行的方法
感觉还是使用基本回调相对简单点,接口就当学习了。
另外跨包的接口的方法要大写!定位了好久发现个入门的问题。
环境:
现象:golang微服务内存占用超过1G,查看日志发现大量kafka相关错误日志,继而查看kafka集群,其中一个kafka节点容器挂掉了。
疑问 为什么kafka集群只有一个broker挂了,客户端就大量报错呢
通过beego admin页面获取 mem-1.memprof
可以看到调用栈为 withRecover backgroundMetadataUpdataer refreshMeaatdata RefreshMetada tryRefreshMetadata ...
sarama-cluster: NewClient
为什么kafka集群只有一个broker,但是NewClient确失败了?
在kafka容器里查看topic, 发现Replicas和Isr只有一个,找到kafka官方配置说明,自动生成的topic需要配置default.replication.factor这个参数,才会生成3副本。
本文主要研究一下golang的zap的ZapKafkaWriter
WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。