Redis Stream的结构如图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。

每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd
指令追加消息时自动创建。
每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id
在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create
进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id
变量。
每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。
同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id
往前移动。每个消费者者有一个组内唯一名称。
消费者(Consumer)内部会有个状态变量pending_ids
,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL
,也就是Pending Entries List
,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
消息ID
消息ID的形式是timestampInMillis-sequence
,例如1527846880572-5
,它表示当前的消息在毫米时间戳1527846880572
时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数
,而且必须是后面加入的消息的ID要大于前面的消息ID。
消息内容
消息内容就是键值对,形如hash结构的键值对,这没什么特别之处。
增删改查
消息队列相关命令:
- XADD – 添加消息到末尾
- XTRIM – 对流进行修剪,限制长度
- XDEL – 删除消息
- XLEN – 获取流包含的元素数量,即消息长度
- XRANGE – 获取消息列表,会自动过滤已经删除的消息
- XREVRANGE – 反向获取消息列表,ID 从大到小
- XREAD – 以阻塞或非阻塞方式获取消息列表
127.0.0.1:6379> XADD user * name chris age 20
"1615116774494-0"
127.0.0.1:6379> XADD user * name may age 20
"1615116808386-0"
127.0.0.1:6379> XADD user * name tony age 20
"1615116813152-0"
127.0.0.1:6379> XLEN user
(integer) 3
127.0.0.1:6379> XRANGE user - +
1) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
2) 1) "1615116808386-0"
2) 1) "name"
2) "may"
3) "age"
4) "20"
3) 1) "1615116813152-0"
2) 1) "name"
2) "tony"
3) "age"
4) "20"
127.0.0.1:6379> XRANGE user 1615116808386-0 +
1) 1) "1615116808386-0"
2) 1) "name"
2) "may"
3) "age"
4) "20"
2) 1) "1615116813152-0"
2) 1) "name"
2) "tony"
3) "age"
4) "20"
127.0.0.1:6379> XRANGE user - 1615116808386-0
1) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
2) 1) "1615116808386-0"
2) 1) "name"
2) "may"
3) "age"
4) "20"
127.0.0.1:6379> xdel user 1615116808386-0
(integer) 1
127.0.0.1:6379> XLEN user
(integer) 2
127.0.0.1:6379> XRANGE user - +
1) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
2) 1) "1615116813152-0"
2) 1) "name"
2) "tony"
3) "age"
4) "20"
独立消费
我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread
,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(Consumer Group)的存在,就好比Stream就是一个普通的列表(list)。
# 从Stream头部读取两条消息
127.0.0.1:6379> XREAD count 2 STREAMS user 0-0
1) 1) "user"
2) 1) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
2) 1) "1615116813152-0"
2) 1) "name"
2) "tony"
3) "age"
4) "20"
# 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> XREAD count 2 STREAMS user $
(nil)
# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
,block 0表示永远阻塞,直到消息到来,block 1000表示阻塞1s,如果1s内没有任何消息到来,就返回nil
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS user $
# 我们从新打开一个窗口,在这个窗口往Stream里塞消息

# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容 # 而且还显示了一个等待时间,这里我们等待了147.10s
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS user $
1) 1) "user"
2) 1) 1) "1615122579688-0"
2) 1) "name"
2) "nana"
3) "age"
4) "10"
(147.10s)
客户端如果想要使用xread进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用xread时,将上次返回的最后一个消息ID作为参数传递进去,就可以继续消费后续的消息。
创建消费组
消费者组相关命令:
- XGROUP CREATE – 创建消费者组
- XREADGROUP GROUP – 读取消费者组中的消息
- XACK – 将消息标记为”已处理”
- XGROUP SETID – 为消费者组设置新的最后递送消息ID
- XGROUP DELCONSUMER – 删除消费者
- XGROUP DESTROY – 删除消费者组
- XPENDING – 显示待处理消息的相关信息
- XCLAIM – 转移消息的归属权
- XINFO – 查看流和消费者组的相关信息;
- XINFO GROUPS – 打印消费者组的信息;
- XINFO STREAM – 打印流信息
Stream通过 xgroup create指令创建消费组(Consumer Group),需要传递起始消息ID参数用来初始化last_delivered_id 变量。
127.0.0.1:6379> XGROUP CREATE user group1 0-0 //从头开始消费
OK
127.0.0.1:6379> XGROUP CREATE user group2 $ //从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
OK
127.0.0.1:6379> XINFO stream user
1) "length" //总消息长度 :3个消息
2) (integer) 3
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1615122579688-0"
9) "groups" // 两个消费者
10) (integer) 2
11) "first-entry"
12) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
13) "last-entry"
14) 1) "1615122579688-0"
2) 1) "name"
2) "nana"
3) "age"
4) "10"
127.0.0.1:6379> XINFO groups user
1) 1) "name"
2) "group1"
3) "consumers" //消费者
4) (integer) 0
5) "pending" //处理的消息
6) (integer) 0
7) "last-delivered-id" //消费游标id
8) "0-0"
2) 1) "name"
2) "group2"
3) "consumers" //消费者
4) (integer) 0
5) "pending" //处理的消息
6) (integer) 0
7) "last-delivered-id" //消费游标id
8) "1615122579688-0"
消费
Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。
# >号表示从当前消费组的last_delivered_id后面开始读
# 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> XREADGROUP GROUP group1 g1 COUNT 1 STREAMS user >
1) 1) "user"
2) 1) 1) "1615116774494-0"
2) 1) "name"
2) "chris"
3) "age"
4) "20"
127.0.0.1:6379> XREADGROUP GROUP group1 g1 COUNT 1 STREAMS user >
1) 1) "user"
2) 1) 1) "1615116813152-0"
2) 1) "name"
2) "tony"
3) "age"
4) "20"
127.0.0.1:6379> XINFO GROUPS user
1) 1) "name"
2) "group1" //我们前面消费了2条消息
3) "consumers" //消费者1
4) (integer) 1
5) "pending" //消费消息
6) (integer) 2
7) "last-delivered-id" //消费游标ID
8) "1615116813152-0"
2) 1) "name"
2) "group2"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1615122579688-0"
127.0.0.1:6379> XREADGROUP GROUP group1 g2 COUNT 1 STREAMS user > //加入一个消费者,也是从组的消费游标的位置开始的
1) 1) "user"
2) 1) 1) "1615122579688-0"
2) 1) "name"
2) "nana"
3) "age"
4) "10"
127.0.0.1:6379> XREADGROUP GROUP group2 g2 COUNT 1 BLOCK 0 STREAMS user > //阻塞等待

从另一个客户端写入消息
127.0.0.1:6379> XREADGROUP GROUP group2 g2 COUNT 1 BLOCK 0 STREAMS user > //收到
1) 1) "user"
2) 1) 1) "1615190777520-0"
2) 1) "name"
2) "tiantian"
3) "age"
4) "20"
(204.47s)
127.0.0.1:6379> xinfo GROUPS user //查看消费组信息
1) 1) "name"
2) "group1"
3) "consumers" //消费者2
4) (integer) 2
5) "pending" //消费未确认3
6) (integer) 3
7) "last-delivered-id"
8) "1615122579688-0"
2) 1) "name"
2) "group2" //这个是从尾部消费,也就是消费最新消息 ,我们用它来阻塞获取了tiantian
3) "consumers" //消费者1
4) (integer) 1
5) "pending" //消费未确认
6) (integer) 1
7) "last-delivered-id" //消费游标
8) "1615190777520-0"
127.0.0.1:6379> XINFO CONSUMERS user group1 //查看消费组的消费者信息
1) 1) "name"
2) "g1" //消费者1
3) "pending"
4) (integer) 2 //消费2条,未确认
5) "idle"
6) (integer) 926443 //空闲时间
2) 1) "name"
2) "g2" //消费者2
3) "pending"
4) (integer) 1 //消费1条,未确认
5) "idle"
6) (integer) 74567 //空闲时间
127.0.0.1:6379> XACK user group1 1615122579688-0//确认消息 ACK ,nana
(integer) 1
127.0.0.1:6379> XINFO CONSUMERS user group1 //消费者2的 nana 已经确认
1) 1) "name"
2) "g1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 1101325
2) 1) "name"
2) "g2"
3) "pending" //为0
4) (integer) 0
5) "idle"
6) (integer) 920559
127.0.0.1:6379> XACK user group1 1615116774494-0 //确认chris
(integer) 1
127.0.0.1:6379> XINFO CONSUMERS user group1
1) 1) "name"
2) "g1"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1246351
2) 1) "name"
2) "g2"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1065585
Stream消息太多怎么办
Xdel 6.0是有效的删除消息,5.0版本删除标记,用xlen查还存在,如果是xdel无法真正删除情况下,我们可以在xadd的指令提供一个定长长度maxlen,就可以将老的消息干掉,确保最多不超过指定长度。
消息如果忘记ACK会怎样
Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。

PEL如何避免消息丢失
在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息ID不能为参数>,而必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的PEL消息以及自last_delivered_id
之后的新消息。
结论
1.如果使用xrange和xrevrange命令,则Stream和list功能类同
2.如果使用xread命令,则有其非常独特的地方
2.1与redis的pub/sub不同,pub/sub多个客户端是收到相同的数据,而stream的多个客户端是竞争关系,每个客户端收到的数据是不相同的;
2.2pub/sub中一旦触发数据获取,不会记录下上一次拿的位置,意味着客户端无法重复去拿以前的数据,而blpop方式一旦pop,数据就会永久的删除,也无法重复去拿以前的数据。而Stream会永久的存放数据,并且客户端会保留上一次拿的id,甚至通过修改id可以拿回以前的数据。和kafka的机制类似;
2.3.Stream提供了消费者组(kafka也有),不同组接收到的数据完全一样(前提是条件一样),但是组内的消费者则是竞争关系(还是和kafka一样);
2.4.可以设置为阻塞与非阻塞模式;
2.5.多客户端时,遵循FIFO特性。
理论内容学习来源 :Redis官方, codehole_,菜鸟教程,知乎等 。