Day 13: Implementing Redis Client
Today we will implement a redis client for Nim. Requires reading Day 12 to create redis parser
Redisclient
We want to create a client to communicate with redis servers
As library designers we should keep in mind How people are going to use our library, specially if it's doing IO Operations and we need to make decisions about what kind of APIs are we going to support (blocking or nonblocking ones) or should we duplicate the functionality for both interfaces. Lucky us Nim is pretty neat when it comes to providing async, sync interfaces for your library.
What do we expect?
- Sync APIs: blocking APIs
let con = open("localhost", 6379.Port)
echo $con.execCommand("PING", @[])
echo $con.execCommand("SET", @["auser", "avalue"])
echo $con.execCommand("GET", @["auser"])
echo $con.execCommand("SCAN", @["0"])
- Async APIs: Nonblocking APIs around
async/await
let con = await openAsync("localhost", 6379.Port)
echo await con.execCommand("PING", @[])
echo await con.execCommand("SET", @["auser", "avalue"])
echo await con.execCommand("GET", @["auser"])
echo await con.execCommand("SCAN", @["0"])
echo await con.execCommand("SET", @["auser", "avalue"])
echo await con.execCommand("GET", @["auser"])
echo await con.execCommand("SCAN", @["0"])
await con.enqueueCommand("PING", @[])
await con.enqueueCommand("PING", @[])
await con.enqueueCommand("PING", @[])
echo await con.commitCommands()
- Pipelining
con.enqueueCommand("PING", @[])
con.enqueueCommand("PING", @[])
con.enqueueCommand("PING", @[])
echo $con.commitCommands()
Implementation
Imports and constants
Let's starts with main imports
import redisparser, strformat, tables, json, strutils, sequtils, hashes, net, asyncdispatch, asyncnet, os, strutils, parseutils, deques, options, net
Mainly
redisparser
because we will be manipulating redis values so let's not decouple the parsing and transportasyncnet, asyncdispatch
for async sockets APIsnet
for SSL and blocking APIs
Data types
Thinking of the expected APIs we talked about earlier we have some sort of client that has exactly the same operations with different blocking policies, so we can abstract it a bit
type
RedisBase[TSocket] = ref object of RootObj
socket: TSocket
connected: bool
timeout*: int
pipeline*: seq[RedisValue]
Base class parameterized on TSocket
that has
- socket: socket object that can be the blocking
net.Socket
or the nonoblockingasyncnet.AsyncSocket
- connected: flag to indicate the connection status
- timeout: to timeout (raise TimeoutError) after certain amount of seconds
Redis* = ref object of RedisBase[net.Socket]
Here we say Redis
is a sub type of RedisBase
and the type of transport socket we are using is the blocking net.Socket
AsyncRedis* = ref object of RedisBase[asyncnet.AsyncSocket]
Same, but here we say the socket we use is non blocking of type asyncnet.AsyncSocket
Opening Connection
proc open*(host = "localhost", port = 6379.Port, ssl=false, timeout=0): Redis =
result = Redis(
socket: newSocket(buffered = true),
)
result.pipeline = @[]
result.timeout = timeout
## .. code omitted for supporting SSL
result.socket.connect(host, port)
result.connected = true
Here we define open
proc the entry point to get sync redis client Redis
. We do some initializations regarding the endpoint and the timeout and setting that on our Redis
new object.
proc openAsync*(host = "localhost", port = 6379.Port, ssl=false, timeout=0): Future[AsyncRedis] {.async.} =
## Open an asynchronous connection to a redis server.
result = AsyncRedis(
socket: newAsyncSocket(buffered = true),
)
## .. code omitted for supporting SSL
result.pipeline = @[]
result.timeout = timeout
await result.socket.connect(host, port)
result.connected = true
Exactly the same thing for openAsync, but instead of returning Redis
we return a Future
of potential AsyncRedis
object
Executing commands
Our APIs will be created around execCommand
proc that will send some command
with arguments
formatted with redis
protocol (using the redisparser library) to a server using Our socket and then read a complete parsable RedisValue
back to the user (using readForm
proc)
- Sync version
proc execCommand*(this: Redis|AsyncRedis, command: string, args:seq[string]): RedisValue =
let cmdArgs = concat(@[command], args)
var cmdAsRedisValues = newSeq[RedisValue]()
for cmd in cmdArgs:
cmdAsRedisValues.add(RedisValue(kind:vkBulkStr, bs:cmd))
var arr = RedisValue(kind:vkArray, l: cmdAsRedisValues)
this.socket.send(encode(arr))
let form = this.readForm()
let val = decodeString(form)
return val
- Async version
proc execCommandAsync*(this: Redis|AsyncRedis, command: string, args:seq[string]): Future[RedisValue] =
let cmdArgs = concat(@[command], args)
var cmdAsRedisValues = newSeq[RedisValue]()
for cmd in cmdArgs:
cmdAsRedisValues.add(RedisValue(kind:vkBulkStr, bs:cmd))
var arr = RedisValue(kind:vkArray, l: cmdAsRedisValues)
await this.socket.send(encode(arr))
let form = await this.readForm()
let val = decodeString(form)
return val
It'd be very annoying to do provide duplicate procs for every single API get
and asyncGet
... etc
Multisync FTW!
Nim provides a very neat feature multisync
pragma that allows us to use the async
definition in sync scopes
Here is the details from nim
Macro which processes async procedures into both asynchronous and synchronous procedures. The generated async procedures use the
async
macro, whereas the generated synchronous procedures simply strip off theawait
calls.
proc execCommand*(this: Redis|AsyncRedis, command: string, args:seq[string]): Future[RedisValue] {.multisync.} =
let cmdArgs = concat(@[command], args)
var cmdAsRedisValues = newSeq[RedisValue]()
for cmd in cmdArgs:
cmdAsRedisValues.add(RedisValue(kind:vkBulkStr, bs:cmd))
var arr = RedisValue(kind:vkArray, l: cmdAsRedisValues)
await this.socket.send(encode(arr))
let form = await this.readForm()
let val = decodeString(form)
return val
Readers
readForm
is the other main proc in our client. readForm
is responsible for reading X amount of bytes from the socket until we have a complete RedisValue
object.
readMany
as the redis protocol encodes some information about the values lengths we can totally make use of that, so let's build a primitivereadMany
that reads X amount of the socket
proc readMany(this:Redis|AsyncRedis, count:int=1): Future[string] {.multisync.} =
if count == 0:
return ""
let data = await this.receiveManaged(count)
return data
Here again to make sure our code works with sync
and async
usages we use multisync
if the count required is 0 we return empty string without any fancy things with the socket otherwise we delegate to the receiveManaged
proc
receivedManaged
a bit into details version on how we read the data from the socket (could be combined in the readMany proc code)
proc receiveManaged*(this:Redis|AsyncRedis, size=1): Future[string] {.multisync.} =
result = newString(size)
when this is Redis:
if this.timeout == 0:
discard this.socket.recv(result, size)
else:
discard this.socket.recv(result, size, this.timeout)
else:
discard await this.socket.recvInto(addr result[0], size)
return result
We check the type of this
object using when/is
combo to dispatch to the correct implementation (sync or async) with timeouts or not
recv
has multiple versions one of them takes aTimeout
this.timeout
if the user wants to timeout after a whilerecvInto
is theasync
version and doesn't support timeouts
readForm
readForm
is used to retrieve a complete RedisValue
from the server using the primitives we provided like 1readManyor
receiveManaged`
Remember how we decode strings into RedisValue objects?
echo decodeString("*3\r\n:1\r\n:2\r\n:3\r\n\r\n")
# # @[1, 2, 3]
echo decodeString("+Hello, World\r\n")
# # Hello, World
echo decodeString("-Not found\r\n")
# # Not found
echo decodeString(":1512\r\n")
# # 1512
echo $decodeString("$32\r\nHello, World THIS IS REALLY NICE\r\n")
# Hello, World THIS IS REALLY NICE
echo decodeString("*2\r\n+Hello World\r\n:23\r\n")
# @[Hello World, 23]
echo decodeString("*2\r\n*3\r\n:1\r\n:2\r\n:3\r\n\r\n*5\r\n:5\r\n:7\r\n+Hello Word\r\n-Err\r\n$6\r\nfoobar\r\n")
# @[@[1, 2, 3], @[5, 7, Hello Word, Err, foobar]]
echo $decodeString("*4\r\n:51231\r\n$3\r\nfoo\r\n$-1\r\n$3\r\nbar\r\n")
# @[51231, foo, , bar]
We will be doing exactly the same, but the only tricky part is we are reading from a socket and we can't move freely forward/backward without consuming data.
The way we were decoding strings into RedisValues was by peeking on the first character to see what type we are decoding simple string
, bulkstring
, error
, int
, array
proc readForm(this:Redis|AsyncRedis): Future[string] {.multisync.} =
var form = ""
## code responsible of reading a complete parsable string representing RedisValue from the socket
return form
- Setup the loop
while true:
let b = await this.receiveManaged()
form &= b
## ...
as long as we aren't done reading a complete form yet we read just 1 byte and append it to the form string we will be returning (in the beginning that byte can be one of (+
, -
, :
, $
, *
)
- Simple String
if b == "+":
form &= await this.readStream(CRLF)
return form
If the character we peeking at is +
we read until we consume the \r\n
CRLF
(from redisparser library) because strings in redis protocl are contained between +
and CRLF
but wait! what's readStream
?
It's a small proc we need to consume bytes from the socket until we reach [and consume] a certain character
proc readStream(this:Redis|AsyncRedis, breakAfter:string): Future[string] {.multisync.} =
var data = ""
while true:
if data.endsWith(breakAfter):
break
let strRead = await this.receiveManaged()
data &= strRead
return data
- Errors
elif b == "-":
form &= await this.readStream(CRLF)
return form
Exactly the same as Simple strings
but we check on -
instead of +
- Ints
elif b == ":":
form &= await this.readStream(CRLF)
return form
Same, serialized between :
and CRLF
- Bulkstrings
elif b == "$":
let bulklenstr = await this.readStream(CRLF)
let bulklenI = parseInt(bulklenstr.strip())
form &= bulklenstr
if bulklenI == -1:
form &= CRLF
else:
form &= await this.readMany(bulklenI)
form &= await this.readStream(CRLF)
return form
From RESP page
Bulk Strings are used in order to represent a single binary safe string up to 512 MB in length.
Bulk Strings are encoded in the following way:
A "$" byte followed by the number of bytes composing the string (a prefixed length), terminated by CRLF.
The actual string data.
A final CRLF.
So the string "foobar" is encoded as follows:
"$6\r\nfoobar\r\n"
When an empty string is just:
"$0\r\n\r\n"
RESP Bulk Strings can also be used in order to signal non-existence of a value using a special format that is used to represent a Null value. In this special format the length is -1, and there is no data, so a Null is represented as:
"$-1\r\n"
So we can have
1- 0
for empty strings $0\r\n\r\n
:read from $
until we consume CRLF and CRLF
2- number
of bytes to read: read from $
N amounts of bytes then consume CRLF
3- -1
for nils read from $
until we consume CRLF
- Arrays
elif b == "*":
let lenstr = await this.readStream(CRLF)
form &= lenstr
let lenstrAsI = parseInt(lenstr.strip())
for i in countup(1, lenstrAsI):
form &= await this.readForm()
return form
Arrays can be bit tricky. To encode an array we do *
followed by array length then \r\n
then encode each element then end the array encoding with \r\n
As the arrays encode their length
we know how many inner forms
or items we need to read from the socket while reading the array
Pipelining
From redis pipelining page
A Request/Response server can be implemented so that it is able to process new requests even if the client didn't already read the old responses. This way it is possible to send multiple commands to the server without waiting for the replies at all, and finally read the replies in a single step.
This is called pipelining, and is a technique widely in use since many decades. For instance many POP3 protocol implementations already supported this feature, dramatically speeding up the process of downloading new emails from the server.
Redis supports pipelining since the very early days, so whatever version you are running, you can use pipelining with Redis. This is an example using the raw netcat utility:
$ (printf "PING\r\nPING\r\nPING\r\n"; sleep 1) | nc localhost 6379
+PONG
+PONG
+PONG
So the idea we maintain a sequence of commands commands to be executed enqueueCommand
and send them commitCommands
and reset the pipeline
sequence afterwards
proc enqueueCommand*(this:Redis|AsyncRedis, command:string, args: seq[string]): Future[void] {.multisync.} =
let cmdArgs = concat(@[command], args)
var cmdAsRedisValues = newSeq[RedisValue]()
for cmd in cmdArgs:
cmdAsRedisValues.add(RedisValue(kind:vkBulkStr, bs:cmd))
var arr = RedisValue(kind:vkArray, l: cmdAsRedisValues)
this.pipeline.add(arr)
proc commitCommands*(this:Redis|AsyncRedis) : Future[RedisValue] {.multisync.} =
for cmd in this.pipeline:
await this.socket.send(cmd.encode())
var responses = newSeq[RedisValue]()
for i in countup(0, len(this.pipeline)-1):
responses.add(decodeString(await this.readForm()))
this.pipeline = @[]
return RedisValue(kind:vkArray, l:responses)
Higher level APIs
are basically proc
s around the execCommand
proc and with using multisync
pargma you can have them enabled for both sync
and async
execution
proc del*(this: Redis | AsyncRedis, keys: seq[string]): Future[RedisValue] {.multisync.} =
## Delete a key or multiple keys
return await this.execCommand("DEL", keys)
proc exists*(this: Redis | AsyncRedis, key: string): Future[bool] {.multisync.} =
## Determine if a key exists
let val = await this.execCommand("EXISTS", @[key])
result = val.i == 1
nim-redisclient
That day is based on nim-redisclient project which is using some higher level API code from Nim/redis. Feel free to send PRs or open issues