1. Introduction
There is a key new feature in redis 5: stream.
Stream is a storage structure in the log form, and you can append data into it. It will generate a timestamp ID for each data. And stream also has a convenient model for reading data.
So stream is suitable for message queues and time series storage.
2. Installation
We need to use the latest version of Redis 5.0, and here we use the docker redis container:
dockerrun
--name
redis5
-p
6379
:6379-d
redis:5.0-rc3
The redis client:
dockerrun
-it
--link
redis5:redis
--rm
redis
redis-cli
-h
redis
-p
6379
It’ll enter the interactive command line after starting:
redis:6379>
3. Usage
3.1 Adding elements to stream
The stream elements can be one or more key-value pairs. Let’s add elements to the stream:
redis:6379>
XADDmystream
*
sensor-id1234
temperature19
.8
1531989605376
-0
What we can know from the above are:
mystream
is the key of stream;- the parameter at the location of
*
is the element ID, and*
indicates that an element ID is generated by the system automatically; - the added element contains 2 key-value pairs,
sensor-id 1234
andtemperature 19.8
; - the returned value is the ID of the newly added element, consisting of a timestamp and an incrementing number.
You can also get the number of elements in the Stream:
redis:6379>
XLENmystream
(
integer) 1
3.2 Range query
When we want to use a range query, we need to specify the start and end IDs, which is equivalent to giving a time range:
redis:
6379>
XRANGEmystream
15319896053761531989605377
1)
1)
1531989605376-
0
2
)
1)
"sensor-id"
2
)
"1234"
3
)
"temperature"
4
)
"19.8"
You can use -
for the smallest ID and +
for the biggest ID:
redis:
6379>
XRANGEmystream - +
1)
1)
1531989605376-
0
2
)
1)
"sensor-id"
2
)
"1234"
3
)
"temperature"
4
)
"19.8"
When there are too many elements returned, you can limit the number of returned results, which is just like the paging when querying the database and specify it by the COUNT
parameter:
redis:
6379>
XRANGEmystream - +
COUNT2
1)
1)
1531989605376-
0
2
)
1)
"sensor-id"
2
)
"1234"
3
)
"temperature"
4
)
"19.8"
You can also use the XREVRANGE
command to reverse the query, and the usage is the same as XRANGE
.
3.3 Listening for new elements of stream
redis:
6379>
XREADCOUNT
2
STREAMS
mystream
0
1)
1)
"mystream"
2
)
1)
1)
1531989605376-
0
2
)
1)
"sensor-id"
2
)
"1234"
3
)
"temperature"
4
)
"19.8"
The mystream
after STREAMS
specifies the key of the target stream; 0
is the smallest ID, and we need to obtain the elements that is greater than the specified ID in the specified stream; COUNT
refers to the number of the elements we want to obtain.
Multiple streams can be specified together, such as STREAMS mystream otherstream 0 0
.
3.3.1 Blocking listener
If you execute the following in client 1:
redis:6379> XREAD BLOCK 0 STREAMS mystream $
it will enter the waiting state.
And if you add elements to client 2:
redis:6379>
XADDmystream
*
test1
the elements just added will be displayed in client 1:
1)
1)
"mystream"
2
)
1)
1)
1531994510562-
0
2
)
1)
"test"
2
)
"1"
0
is the specified timeout, so 0
means never timeout here; $
means the maximum ID in the stream.
3.4 Consumer Group
When the amount of stream is very large, or when the consumer processing is very time consuming, it’ll under greater pressure if there is only one consumer. Thus redis stream provides the concept of the consumer group, allowing multiple consumers to process the same stream to implement load balancing.
For example, if there are 3 consumers C1, C2, and C3, and there are 7 message elements in the stream, then the allocation for the consumers is:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
3.4.1 Create a Consumer Group
redis:6379> XGROUP CREATE mystream mygroup01 $
OK
Here a consumer group is created for the stream mystream
, and the name of the group is mygroup01
; $
means to read the element that is after the current maximum ID.
3.4.2 Add test data
Add some new data:
redis:6379>
XADDmystream
*
messageapple
1531999977149
-0
redis:6379>
XADDmystream
*
messageorange
1531999980272
-0
redis:6379>
XADDmystream
*
messagestrawberry
1531999983493
-0
redis:6379>
XADDmystream
*
messageapricot
1531999988458
-0
redis:6379>
XADDmystream
*
messagebanana
1531999991782
-0
3.4.3 Read data through the consumer group
redis:
6379>
XREADGROUPGROUP
mygroup01
AliceCOUNT
1
STREAMS
mystream >
1)
1)
"mystream"
2
)
1)
1)
1531999977149-
0
2
)
1)
"message"
2
)
"apple"
Alice
is the name of the group member, and >
means that the data has not been read by the members of the group so far.
As you can see, you don’t need to create group members in advance, as they will be created automatically the first time they are used.
Then let’s create another member to read the data:
redis:
6379>
XREADGROUPGROUP
mygroup01
BobCOUNT
1
STREAMS
mystream >
1)
1)
"mystream"
2
)
1)
1)
1531999980272-
0
2
)
1)
"message"
2
)
"orange"
3.4.4 Consumption history
redis:
6379>
XREADGROUPGROUP
mygroup01
AliceSTREAMS
mystream
0
1)
1)
"mystream"
2
)
1)
1)
1531999977149-
0
2
)
1)
"message"
2
)
"apple"
The last specified ID here is 0
, so you can get the history data that is pending(the data that you have used, but did not send the consumption confirmation for it), and it can help with the work after recovery.
3.4.5 Consumption confirmation
redis:6379>
XACKmystream
mygroup01
1531999977149
-0
(
integer) 1
1531999977149-0
is the data of apple
consumed by Alice
. Let’s check Alice’s consumption history now:
redis:
6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream
0
1)
1)
"mystream"
2
) (
emptylist
or
set)
It’s already empty.
3.4.6 Failure Processing
So you can get the message data that you have not confirmed yet, when there is a problem with a consumer and then you recover it. This is a security mechanism, but what if the problematic consumer can not be recovered any more? Is there any way to deal with the message data that it has not confirmed yet?
There is a solution provided in redis stream to deal with this situation:
- Find out all the message data that has been delivered but not confirmed;
- Change the owner of these data.
Now it allows a new consumer to process the data.
List the unprocessed data:
redis:6379> XPENDING mystream mygroup01 - + 10
1) 1) 1531999980272-0
2)
"Bob"
3) (
integer) 45126376
4) (
integer) 2
2) 1) 1531999983493-0
2)
"Tom"
3) (
integer) 867475
4) (
integer) 1
You can see that there are 2 pieces of data unprocessed, and it has listed the ID of each piece of data, the owner, the idle time of this message (in milliseconds), and the number of times this message was delivered.
Change the owner:
redis:
6379>
XCLAIMmystream mygroup01 Gates
36001531999980272-0
1531999983493-0
1)
1)
1531999980272-0
2
)
1)
"message"
2
)
"orange"
2)
1)
1531999983493-0
2
)
1)
"message"
2
)
"strawberry"
It passes the message specifying the 2 IDs to Gates
. The 3600
refers to the minimum idle time, and it assigns the specified message of which the idle time is more than 3600 to Gates
. Note that Gates
is a new consumer. It has not been declared before.
Check out the data that Gates
has not processed so far:
redis:
6379>
XREADGROUPGROUP
mygroup01
GatesSTREAMS
mystream
0
1)
1)
"mystream"
2
)
1)
1)
1531999980272-
0
2
)
1)
"message"
2
)
"orange"
2
)
1)
1531999983493-
0
2
)
1)
"message"
2
)
"strawberry"
There are 2 pieces of data newly allocated.
3.5 View related information
View the basic information:
redis:6379> XINFO STREAM mystream
1) length
2) (
integer) 15
3) radix-tree-keys
4) (
integer) 1
5) radix-tree-nodes
6) (
integer) 2
7) groups
8) (
integer) 2
9) first-entry
10) 1) 1531989605376-0
2) 1)
"sensor-id"
2)
"1234"
3)
"temperature"
4)
"19.8"
11) last-entry
12) 1) 1531999991782-0
2) 1)
"message"
2)
View the consumer group information:
redis:6379> XINFO GROUPS mystream
1) 1) name
2)
"mygroup"
3) consumers
4) (
integer) 3
5) pending
6) (
integer) 5
2) 1) name
2)
"mygroup01"
3) consumers
4) (
integer) 4
5) pending
6) (
integer) 2
View the information about consumers in a group:
redis:6379> XINFO CONSUMERS mystream mygroup
1) 1) name
2)
"Alice"
3) pending
4) (
integer) 3
5) idle
6) (
integer) 2483388
2) 1) name
2)
"Bob"
3) pending
4) (
integer) 2
5) idle
6) (
integer) 48453755
3) 1) name
2)
"Gates"
3) pending
4) (
integer) 0
5) idle
6) (
integer) 2385114
3.7 Delete the message data
First let’s check the existing data:
redis:
6379>
XRANGEmystream - +
COUNT2
1)
1)
1531989605376-
0
2
)
1)
"sensor-id"
2
)
"1234"
3
)
"temperature"
4
)
"19.8"
2)
1)
1531994510562-
0
2
)
1)
"test"
2
)
"1"
Delete the first piece of data:
redis:6379>
XDELmystream
1531989605376
-0
(
integer) 1
Check it again and you can find that the previous first data is gone:
redis:
6379>
XRANGEmystream - +
COUNT2
1)
1)
1531994510562-
0
2
)
1)
"test"
2
)
"1"
2)
1)
1531994516257-
0
2
)
1)
"test"
2
)
"2"
Note: It doesn’t really delete the data from memory if you use XDEL, but just marks the data. It doesn’t reclaim memory.
3.8 Set the maximum length of stream
Add data and specify the maximum length as 2:
redis:6379>
XADDmystream
MAXLEN
2 *
value1
1532049865028
-0
redis:6379>
XADDmystream
MAXLEN
2 *
value2
1532049872075
-0
redis:6379>
XADDmystream
MAXLEN
2 *
value3
1532049877554
-0
We added 3 pieces of data. Now let’s take a look at the length of the stream and the current content:
redis:6379> XLEN mystream
(
integer) 2
redis:6379> XRANGE mystream - +
1) 1) 1532049872075-0
2) 1)
"value"
2)
"2"
2) 1) 1532049877554-0
2) 1)
"value"
2)
You can see that there are only 2 pieces of data.