重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
在SpringBoot中如何使用redisTemplate重新消费Redis Stream中未ACK的消息,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
创新互联建站公司2013年成立,先为鹤城等服务建站,鹤城等地企业,进行企业商务咨询服务。为鹤城企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
消费组从stream中获取到消息后,会分配给自己组中其中的一个消费者进行消费,消费者消费完毕,需要给消费组返回ACK,表示这条消息已经消费完毕了。
当消费者从消费组获取到消息的时候,会先把消息添加到自己的pending消息列表,当消费者给消费组返回ACK的时候,就会把这条消息从pending队列删除。(每个消费者都有自己的pending消息队列)
消费者可能没有及时的返回ACK。例如消费者消费完毕后,宕机,没有及时返回ACK,此时就会导致这条消息占用2倍的内存(stream中保存一份, 消费者的的pending消息列表中保存一份)
XADD my_stream * hello world
随便添加一条消息,目的是为了初始化stream
XGROUP CREATE my_stream my_group $
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
XREADGROUP GROUP my_group my_consumer2 BLOCK 0 STREAMS my_stream >
XADD my_stream * message1 Hello XADD my_stream * message2 SpringBoot XADD my_stream * message3 Community
可以看到,一共Push了3条消息,它们的ID分别是
1605524648266-0 (message1 )
1605524657157-0 (message2)
1605524665215-0 (message3)
现在的状况是,消费者1,消费了2条消息(message1和message3),消费者2,消费了1条消息(message2)。都是消费成功了的,但是它们都还没有进行ACK。
在客户端,消费者消费到一条消息后会立即返回,需要重新执行命令,来回到阻塞状态
现在我们打算,把消费者1,消费的那条message1
进行ACK
XACK my_stream my_group 1605524648266-0
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 2 # 消费组中,所有消费者的pending消息数量 2) "1605524657157-0" # pending消息中的,最小消息ID 3) "1605524665215-0" # pending消息中的,最大消息ID 4) 1) 1) "my_consumer1" # 消费者1 2) "1" # 有1条待确认消息 2) 1) "my_consumer2" # 消费者2 2) "1" # 有2条待确认消息
127.0.0.1:6379> XPENDING my_stream my_group 0 + 10 my_consumer1 1) 1) "1605524665215-0" # 待ACK消息ID 2) "my_consumer1" # 所属消费者 3) (integer) 847437 # 消息自从被消费者获取后到现在过去的时间(毫秒) - idle time 4) (integer) 1 # 消息被获取的次数 - delivery counter
这条命令,表示查询消费组my_group
中消费者my_consumer1
的opending队列,开始ID是0,结束ID是最大,最多检索10个结果。
现在的情况就是,一共3条消息,消费者1消费了2条,ack了1条。消费者2消费了1条,没有ack。消费者1和2,各自的pending队列中都有一条未ack的消息
如何实现将未被成功消费的消息获取出来重新进行消费?之前的演示,目的都是为了造一些数据,所以是用的客户端命令,从这里开始,所有的演示,都会使用spring-data-redis
中的RedisTemplate
。
import java.time.Duration; import java.util.List; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; import org.springframework.data.redis.connection.stream.PendingMessagesSummary; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @Test public void test() { StreamOperationsstreamOperations = this.stringRedisTemplate.opsForStream(); // 获取my_group中的pending消息信息,本质上就是执行XPENDING指令 PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group"); // 所有pending消息的数量 long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages(); // 消费组名称 String groupName= pendingMessagesSummary.getGroupName(); // pending队列中的最小ID String minMessageId = pendingMessagesSummary.minMessageId(); // pending队列中的最大ID String maxMessageId = pendingMessagesSummary.maxMessageId(); LOGGER.info("消费组:{},一共有{}条pending消息,最大ID={},最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId); // 每个消费者的pending消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); pendingMessagesPerConsumer.entrySet().forEach(entry -> { // 消费者 String consumer = entry.getKey(); // 消费者的pending消息数量 long consumerTotalPendingMessages = entry.getValue(); LOGGER.info("消费者:{},一共有{}条pending消息", consumer, consumerTotalPendingMessages); if (consumerTotalPendingMessages > 0) { // 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值 PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10); // 遍历所有Opending消息的详情 pendingMessages.forEach(message -> { // 消息的ID RecordId recordId = message.getId(); // 消息从消费组中获取,到此刻的时间 Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery(); // 消息被获取的次数 long deliveryCount = message.getTotalDeliveryCount(); LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount); /** * 演示手动消费的这个判断非常的针对,目的就是要读取消费者“my_consumer1”pending消息中,ID=1605524665215-0的这条消息 */ if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) { // 通过streamOperations,直接读取这条pending消息, List > result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0")); // 开始和结束都是同一个ID,所以结果只有一条 MapRecord record = result.get(0); // 这里执行日志输出,模拟的就是消费逻辑 LOGGER.info("消费了pending消息:id={}, value={}", record.getId(), record.getValue()); // 如果手动消费成功后,往消费组提交消息的ACK Long retVal = streamOperations.acknowledge("my_group", record); LOGGER.info("消息ack,一共ack了{}条", retVal); } }); } }); } }
这种方式就是,遍历消费组的pending消息情况,再遍历每个消费者的pending消息id列表,再根据id,直接去stream读取这条消息,进行消费Ack。
消费组:my_group,一共有2条pending消息,最大ID=1605524657157-0,最小ID=1605524665215-0 消费者:my_consumer1,一共有1条pending消息 openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1 消费了pending消息:id=1605524665215-0, value={message3=Community} 消息ack,一共ack了1条 消费者:my_consumer2,一共有1条pending消息 openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1
最终的结果就是,消费者1的唯一一条pending消息被Ack了,这里有几个点要注意
遍历消费者pending列表时候,最小/大消息id,可以根据XPENDING
指令中的结果来,我写0 - +
,只是为了偷懒
遍历到消费者pending消息的时候,可以根据elapsedTimeSinceLastDelivery
(idle time)和deliveryCount
(delivery counter)做一些逻辑判断,elapsedTimeSinceLastDelivery
越长,表示这条消息被消费了很久,都没Ack,deliveryCount
表示重新投递N次后(下文会讲),都没被消费成功,可能是消费逻辑有问题,或者是Ack有问题。
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 1 2) "1605524657157-0" 3) "1605524657157-0" 4) 1) 1) "my_consumer2" 2) "1"
消费者1,唯1条待ack的消息看,已经被我们遍历出来手动消费,手动ack了,所以只剩下消费者2还有1条pending消息。。
如果一个消费者,一直不能消费掉某条消息,或者说这个消费者因为某些消息,永远也不能上过线了,那么可以把这个消费者的pending消息,转移到其他的消费者pending列表中,重新消费
其实我们这里要做的事情,就是把“消费者2”的唯一1条pending消息“ 1605524657157-0”(message2),交给“消费者1”,重新进行消费。
XCLAIM my_stream my_group my_consumer1 10000 1605524657157-0
把1605524657157-0
这条消息,重新给my_group
中的my_consumer1
进行消费,前提条件是这条消息的idle time
大于了10秒钟(从获取消息到现在超过10秒都没Ack)。
import java.time.Duration; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @Test public void test() { ListretVal = this.stringRedisTemplate.execute(new RedisCallback >() { @Override public List
doInRedis(RedisConnection connection) throws DataAccessException { // XCLAIM 指令的实现方法 return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0")); } }); for (ByteRecord byteRecord : retVal) { LOGGER.info("改了消息的消费者:id={}, value={}", byteRecord.getId(), byteRecord.getValue()); } } }
改了消息的消费者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 1 2) "1605524657157-0" 3) "1605524657157-0" 4) 1) 1) "my_consumer1" 2) "1"
可以看到,消息 “1605524657157-0”(message2),已经从“消费者2”名下,转移到了”消费者1”,接下来要做的事情,就是遍历“消费者1”的pending列表,消费掉它。
最开始在控制,演示了通过客户端,进行消费者阻塞消费的时候,写了一条命令
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
其中最后那个>
,表示ID,是一个特殊字符,如果不是,当ID不是特殊字符>
时, XREADGROUP
不再是从消息队列中读取消息, 而是从消费者的的pending消息列表中读取历史消息。(一般将参数设为0-0,表示读取所有的pending消息)
127.0.0.1:6379> XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream 0 1) 1) "my_stream" 2) 1) 1) "1605524657157-0" 2) 1) "message2" 2) "SpringBoot"
读取到了,消费者1,pending消息中的唯一一条消息记录
import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import io.springboot.jwt.SpringBootJwtApplication; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) public class RedisStreamTest { private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class); @Autowired private StringRedisTemplate stringRedisTemplate; @SuppressWarnings("unchecked") @Test public void test() { StreamOperationsstreamOperations = this.stringRedisTemplate.opsForStream(); // 从消费者的pending队列中读取消息 List > retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0"))); // 遍历消息 for (MapRecord record : retVal ) { // 消费消息 LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue()); // 手动ack消息 streamOperations.acknowledge("my_group", record); } } }
这种方式,就是直接从消费者的pending队列中读取数据,手动进行消费,然后Ack
消息id=1605524657157-0, 消息value={message2=SpringBoot}
127.0.0.1:6379> XPENDING my_stream my_group 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
没了,一条都没,全部已经Ack了。
死信,就是一直没法被消费的消息,可以根据这个两个属性idle time
和delivery counter
进行判断
idle time
当消息被消费者读取后,就会开始计时,如果一个pending消息的idle time
很长,表示这消息,可能是在Ack时发生了异常,或者说还没来得及Ack,消费者就宕机了,导致一直没有被Ack,当消息发生了转移,它会清零,重新计时。
delivery counter
,它表示转移的次数,每当一条消息的消费者发生变更的时候,它的值都会+1,如果一条pending消息的delivery counter
值很大,表示它在多个消费者之间进行了多次转移都没法成功消费,可以人工的读取,消费掉。
redis5的stream,可以说功能还是蛮强大(设计上狠狠借鉴了一把Kakfa)。如果应用规模并不大,需要一个MQ服务,我想Stream的你可以试试看,比起自己搭建kakfa,RocketMQ之类的,来的快当而且更好维护。
看完上述内容,你们掌握在SpringBoot中如何使用RedisTemplate重新消费Redis Stream中未ACK的消息的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!