Redis Stream的结构如图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。

每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。

每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。

每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。

同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。

消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息ID

消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第5条消息。消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。

消息内容

消息内容就是键值对,形如hash结构的键值对,这没什么特别之处。

增删改查

消息队列相关命令:

  • XADD – 添加消息到末尾
  • XTRIM – 对流进行修剪,限制长度
  • XDEL – 删除消息
  • XLEN – 获取流包含的元素数量,即消息长度
  • XRANGE – 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE – 反向获取消息列表,ID 从大到小
  • XREAD – 以阻塞或非阻塞方式获取消息列表
127.0.0.1:6379> XADD user * name chris age 20
 "1615116774494-0"
 127.0.0.1:6379> XADD user * name may age 20
 "1615116808386-0"
 127.0.0.1:6379> XADD user * name tony age 20
 "1615116813152-0"
 127.0.0.1:6379> XLEN user
 (integer) 3
 127.0.0.1:6379> XRANGE user - +
 1) 1) "1615116774494-0"
    2) 1) "name"
       2) "chris"
       3) "age"
       4) "20"
 2) 1) "1615116808386-0"
    2) 1) "name"
       2) "may"
       3) "age"
       4) "20"
 3) 1) "1615116813152-0"
    2) 1) "name"
       2) "tony"
       3) "age"
       4) "20"
 127.0.0.1:6379> XRANGE user 1615116808386-0 +
 1) 1) "1615116808386-0"
    2) 1) "name"
       2) "may"
       3) "age"
       4) "20"
 2) 1) "1615116813152-0"
    2) 1) "name"
       2) "tony"
       3) "age"
       4) "20"
 127.0.0.1:6379> XRANGE user - 1615116808386-0
 1) 1) "1615116774494-0"
    2) 1) "name"
       2) "chris"
       3) "age"
       4) "20"
 2) 1) "1615116808386-0"
    2) 1) "name"
       2) "may"
       3) "age"
       4) "20"
 127.0.0.1:6379> xdel user 1615116808386-0
 (integer) 1
 127.0.0.1:6379> XLEN user
 (integer) 2
 127.0.0.1:6379> XRANGE user - +
 1) 1) "1615116774494-0"
    2) 1) "name"
       2) "chris"
       3) "age"
       4) "20"
 2) 1) "1615116813152-0"
    2) 1) "name"
       2) "tony"
       3) "age"
       4) "20"

独立消费

我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(Consumer Group)的存在,就好比Stream就是一个普通的列表(list)。

# 从Stream头部读取两条消息
127.0.0.1:6379> XREAD count 2 STREAMS user 0-0 
 1) 1) "user"
    2) 1) 1) "1615116774494-0"
          2) 1) "name"
             2) "chris"
             3) "age"
             4) "20"
       2) 1) "1615116813152-0"
          2) 1) "name"
             2) "tony"
             3) "age"
             4) "20"
# 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息 127.0.0.1:6379> XREAD count 2 STREAMS user $
 (nil)

# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来,block 0表示永远阻塞,直到消息到来,block 1000表示阻塞1s,如果1s内没有任何消息到来,就返回nil
 127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS user $
 
# 我们从新打开一个窗口,在这个窗口往Stream里塞消息
# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容 # 而且还显示了一个等待时间,这里我们等待了147.10s 127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS user $
1) 1) "user"
    2) 1) 1) "1615122579688-0"
          2) 1) "name"
             2) "nana"
             3) "age"
             4) "10"
 (147.10s)

客户端如果想要使用xread进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用xread时,将上次返回的最后一个消息ID作为参数传递进去,就可以继续消费后续的消息。

创建消费组

消费者组相关命令:

  • XGROUP CREATE – 创建消费者组
  • XREADGROUP GROUP – 读取消费者组中的消息
  • XACK – 将消息标记为”已处理”
  • XGROUP SETID – 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER – 删除消费者
  • XGROUP DESTROY – 删除消费者组
  • XPENDING – 显示待处理消息的相关信息
  • XCLAIM – 转移消息的归属权
  • XINFO – 查看流和消费者组的相关信息;
  • XINFO GROUPS – 打印消费者组的信息;
  • XINFO STREAM – 打印流信息

Stream通过 xgroup create指令创建消费组(Consumer Group),需要传递起始消息ID参数用来初始化last_delivered_id 变量。127.0.0.1:6379> XGROUP CREATE user group1 0-0  //从头开始消费
 OK
 127.0.0.1:6379> XGROUP CREATE user group2 $  //从尾部开始消费,只接受新消息,当前Stream消息会全部忽略 OK
127.0.0.1:6379> XINFO stream user
  1) "length"  //总消息长度 :3个消息
  2) (integer) 3
  3) "radix-tree-keys"
  4) (integer) 1
  5) "radix-tree-nodes"
  6) (integer) 2
  7) "last-generated-id"
  8) "1615122579688-0"
  9) "groups" // 两个消费者
 10) (integer) 2
 11) "first-entry"
 12) 1) "1615116774494-0"
     2) 1) "name"
        2) "chris"
        3) "age"
        4) "20"
 13) "last-entry"
 14) 1) "1615122579688-0"
     2) 1) "name"
        2) "nana"
        3) "age"
        4) "10"
127.0.0.1:6379> XINFO groups user
 1) 1) "name"
    2) "group1"
    3) "consumers" //消费者
    4) (integer) 0
    5) "pending" //处理的消息
    6) (integer) 0
    7) "last-delivered-id" //消费游标id
    8) "0-0"
 2) 1) "name"
    2) "group2"
    3) "consumers" //消费者
    4) (integer) 0
    5) "pending" //处理的消息
    6) (integer) 0
    7) "last-delivered-id" //消费游标id
    8) "1615122579688-0"

消费

Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。

# >号表示从当前消费组的last_delivered_id后面开始读
 # 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> XREADGROUP GROUP group1 g1 COUNT 1 STREAMS user >
 1) 1) "user"
    2) 1) 1) "1615116774494-0"
          2) 1) "name"
             2) "chris"
             3) "age"
             4) "20"
 127.0.0.1:6379> XREADGROUP GROUP group1 g1 COUNT 1 STREAMS user >
 1) 1) "user"
    2) 1) 1) "1615116813152-0"
          2) 1) "name"
             2) "tony"
             3) "age"
             4) "20"
127.0.0.1:6379> XINFO GROUPS user
 1) 1) "name"
    2) "group1" //我们前面消费了2条消息
    3) "consumers" //消费者1
    4) (integer) 1
    5) "pending" //消费消息
    6) (integer) 2
    7) "last-delivered-id" //消费游标ID
    8) "1615116813152-0"
 2) 1) "name"
    2) "group2"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1615122579688-0"
127.0.0.1:6379> XREADGROUP GROUP group1 g2 COUNT 1 STREAMS user > //加入一个消费者,也是从组的消费游标的位置开始的
 1) 1) "user"
    2) 1) 1) "1615122579688-0"
          2) 1) "name"
             2) "nana"
             3) "age"
             4) "10"
127.0.0.1:6379> XREADGROUP GROUP group2 g2 COUNT 1 BLOCK 0 STREAMS user > //阻塞等待

从另一个客户端写入消息

127.0.0.1:6379> XREADGROUP GROUP group2 g2 COUNT 1 BLOCK 0 STREAMS user > //收到
 1) 1) "user"
    2) 1) 1) "1615190777520-0"
          2) 1) "name"
             2) "tiantian"
             3) "age"
             4) "20"
 (204.47s)
127.0.0.1:6379> xinfo GROUPS user //查看消费组信息
 1) 1) "name"
    2) "group1"
    3) "consumers" //消费者2
    4) (integer) 2
    5) "pending"  //消费未确认3
    6) (integer) 3
    7) "last-delivered-id"
    8) "1615122579688-0"
 2) 1) "name"  
    2) "group2"  //这个是从尾部消费,也就是消费最新消息 ,我们用它来阻塞获取了tiantian
    3) "consumers" //消费者1
    4) (integer) 1
    5) "pending" //消费未确认
    6) (integer) 1
    7) "last-delivered-id"   //消费游标
    8) "1615190777520-0"
127.0.0.1:6379> XINFO CONSUMERS user group1  //查看消费组的消费者信息
 1) 1) "name"
    2) "g1" //消费者1
    3) "pending"
    4) (integer) 2  //消费2条,未确认
    5) "idle"
    6) (integer) 926443 //空闲时间
 2) 1) "name"
    2) "g2"  //消费者2 
    3) "pending"
    4) (integer) 1  //消费1条,未确认
    5) "idle"
    6) (integer) 74567 //空闲时间
127.0.0.1:6379> XACK user group1 1615122579688-0//确认消息 ACK ,nana
 (integer) 1
127.0.0.1:6379> XINFO CONSUMERS user group1  //消费者2的 nana 已经确认
 1) 1) "name"
    2) "g1"
    3) "pending"
    4) (integer) 2
    5) "idle"
    6) (integer) 1101325
 2) 1) "name"
    2) "g2"
    3) "pending"  //为0
    4) (integer) 0 
    5) "idle"
    6) (integer) 920559
127.0.0.1:6379> XACK user group1 1615116774494-0  //确认chris
 (integer) 1
 127.0.0.1:6379> XINFO CONSUMERS user group1
 1) 1) "name"
    2) "g1"
    3) "pending"
    4) (integer) 1
    5) "idle"
    6) (integer) 1246351
 2) 1) "name"
    2) "g2"
    3) "pending"
    4) (integer) 0
    5) "idle"
    6) (integer) 1065585

Stream消息太多怎么办

Xdel 6.0是有效的删除消息,5.0版本删除标记,用xlen查还存在,如果是xdel无法真正删除情况下,我们可以在xadd的指令提供一个定长长度maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

消息如果忘记ACK会怎样

Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。

PEL如何避免消息丢失

在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息ID不能为参数>,而必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的PEL消息以及自last_delivered_id之后的新消息。

结论

​ 1.如果使用xrange和xrevrange命令,则Stream和list功能类同

​ 2.如果使用xread命令,则有其非常独特的地方

​ 2.1与redis的pub/sub不同,pub/sub多个客户端是收到相同的数据,而stream的多个客户端是竞争关系,每个客户端收到的数据是不相同的;
2.2pub/sub中一旦触发数据获取,不会记录下上一次拿的位置,意味着客户端无法重复去拿以前的数据,而blpop方式一旦pop,数据就会永久的删除,也无法重复去拿以前的数据。而Stream会永久的存放数据,并且客户端会保留上一次拿的id,甚至通过修改id可以拿回以前的数据。和kafka的机制类似;

2.3.Stream提供了消费者组(kafka也有),不同组接收到的数据完全一样(前提是条件一样),但是组内的消费者则是竞争关系(还是和kafka一样);

​ 2.4.可以设置为阻塞与非阻塞模式;

2.5.多客户端时,遵循FIFO特性。

理论内容学习来源 :Redis官方, codehole_,菜鸟教程,知乎等 。