sonic.client
module
Source code
from enum import Enum
import socket
import re
from queue import Queue
import itertools
class SonicServerError(Exception):
"""Generic Sonic Server exception"""
pass
class ChannelError(Exception):
"""Sonic Channel specific exception"""
pass
# Commands available on all channels + START that's available on the uninitialized channel
COMMON_CMDS = [
'START',
'PING',
'HELP',
'QUIT'
]
# Channels commands
ALL_CMDS = {
# FIXME: unintialized entry isn't needed anymore.
'UNINITIALIZED': [
*COMMON_CMDS,
],
'ingest': [
*COMMON_CMDS,
# PUSH <collection> <bucket> <object> "<text>" [LANG(<locale>)]?
'PUSH',
'POP', # POP <collection> <bucket> <object> "<text>"
'COUNT', # COUNT <collection> [<bucket> [<object>]?]?
'FLUSHC', # FLUSHC <collection>
'FLUSHB', # FLUSHB <collection> <bucket>
'FLUSHO', # FLUSHO <collection> <bucket> <object>
],
'search': [
*COMMON_CMDS,
# QUERY <collection> <bucket> "<terms>" [LIMIT(<count>)]? [OFFSET(<count>)]? [LANG(<locale>)]?
'QUERY',
'SUGGEST', # SUGGEST <collection> <bucket> "<word>" [LIMIT(<count>)]?
],
'control': [
*COMMON_CMDS,
'TRIGGER', # TRIGGER [<action>]?
]
}
# snippet from asonic code.
def quote_text(text):
"""Quote text and normalize it in sonic protocol context.
Arguments:
text str -- text to quote/escape
Returns:
str -- quoted text
"""
if text is None:
return ""
return '"' + text.replace('"', '\\"').replace('\r\n', ' ') + '"'
def is_error(response):
"""Check if the response is Error or not in sonic context.
Errors start with `ERR`
Arguments:
response {str} -- response string
Returns:
[bool] -- true if response is an error.
"""
if response.startswith('ERR '):
return True
return False
def raise_for_error(response):
"""Raise SonicServerError in case of error response.
Arguments:
response {str} -- message to check if it's error or not.
Raises:
SonicServerError --
Returns:
str -- the response message
"""
if is_error(response):
raise SonicServerError(response)
return response
def _parse_protocol_version(text):
"""Extracts protocol version from response message
Arguments:
text {str} -- text that may contain protocol version info (e.g STARTED search protocol(1) buffer(20000) )
Raises:
ValueError -- Raised when s doesn't have protocol information
Returns:
str -- protocol version.
"""
matches = re.findall("protocol\((\w+)\)", text)
if not matches:
raise ValueError("{} doesn't contain protocol(NUMBER)".format(text))
return matches[0]
def _parse_buffer_size(text):
"""Extracts buffering from response message
Arguments:
text {str} -- text that may contain buffering info (e.g STARTED search protocol(1) buffer(20000) )
Raises:
ValueError -- Raised when s doesn't have buffering information
Returns:
str -- buffering.
"""
matches = re.findall("buffer\((\w+)\)", text)
if not matches:
raise ValueError("{} doesn't contain buffer(NUMBER)".format(text))
return matches[0]
def _get_async_response_id(text):
"""Extract async response message id.
Arguments:
text {str} -- text that may contain async response id (e.g PENDING gn4RLF8M )
Raises:
ValueError -- [description]
Returns:
str -- async response id
"""
text = text.strip()
matches = re.findall("PENDING (\w+)", text)
if not matches:
raise ValueError("{} doesn't contain async response id".format(text))
return matches[0]
def pythonify_result(resp):
if resp in ["OK", "PONG"]:
return True
if resp.startswith("EVENT QUERY") or resp.startswith("EVENT SUGGEST"):
return resp.split()[3:]
if resp.startswith("RESULT"):
return int(resp.split()[-1])
return resp
# Channels names
INGEST = 'ingest'
SEARCH = 'search'
CONTROL = 'control'
class SonicConnection:
def __init__(self, host: str, port: int, password: str, channel: str, keepalive: bool=True, timeout: int=60):
"""Base for sonic connections
bufsize: indicates the buffer size to be used while communicating with the server.
protocol: sonic protocol version
Arguments:
host {str} -- sonic server host
port {int} -- sonic server port
password {str} -- user password defined in `config.cfg` file on the server side.
channel {str} -- channel name one of (ingest, search, control)
Keyword Arguments:
keepalive {bool} -- sets keepalive socket option (default: {True})
timeout {int} -- sets socket timeout (default: {60})
"""
self.host = host
self.port = port
self._password = password
self.channel = channel
self.raw = False
self.address = self.host, self.port
self.keepalive = keepalive
self.timeout = timeout
self.socket_connect_timeout = 10
self.__socket = None
self.__reader = None
self.__writer = None
self.bufize = None
self.protocol = None
def connect(self):
"""Connects to sonic server endpoint
Returns:
bool: True when connection happens and successfully switched to a channel.
"""
resp = self._reader.readline()
if 'CONNECTED' in resp:
self.connected = True
resp = self._execute_command("START", self.channel, self._password)
self.protocol = _parse_protocol_version(resp)
self.bufsize = _parse_buffer_size(resp)
return self.ping()
def ping(self):
return self._execute_command("PING") == "PONG"
def __create_connection(self, address):
"Create a TCP socket connection"
# we want to mimic what socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
# snippet taken from redis client code.
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# set the socket_connect_timeout before we connect
if self.socket_connect_timeout:
sock.settimeout(self.timeout)
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
if self.timeout:
sock.settimeout(self.timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
@property
def _socket(self):
if not self.__socket:
# socket.create_connection(self.address)
self.__socket = self.__create_connection(self.address)
return self.__socket
@property
def _reader(self):
if not self.__reader:
self.__reader = self._socket.makefile('r')
return self.__reader
@property
def _writer(self):
if not self.__writer:
self.__writer = self._socket.makefile('w')
return self.__writer
def close(self):
"""
Closes the connection and its resources.
"""
resources = (self.__reader, self.__writer, self.__socket)
for rc in resources:
if rc is not None:
rc.close()
self.__reader = None
self.__writer = None
self.__socket = None
def _format_command(self, cmd, *args):
"""Format command according to sonic protocol
Arguments:
cmd {str} -- a valid sonic command
Returns:
str -- formatted command string to be sent on the wire.
"""
cmd_str = cmd + " "
cmd_str += " ".join(args)
cmd_str += "\n" # specs says \n, asonic does \r\n
return cmd_str
def _execute_command(self, cmd, *args):
"""Formats and sends command with suitable arguments on the wire to sonic server
Arguments:
cmd {str} -- valid command
Raises:
ChannelError -- Raised for unsupported channel commands
Returns:
object|str -- depends on the `self.raw` mode
if mode is raw: result is always a string
else the result is converted to suitable python response (e.g boolean, int, list)
"""
if cmd not in ALL_CMDS[self.channel]:
raise ChannelError(
"command {} isn't allowed in channel {}".format(cmd, self.channel))
cmd_str = self._format_command(cmd, *args)
self._writer.write(cmd_str)
self._writer.flush()
resp = self._get_response()
return resp
def _get_response(self):
"""Gets a response string from sonic server.
Returns:
object|str -- depends on the `self.raw` mode
if mode is raw: result is always a string
else the result is converted to suitable python response (e.g boolean, int, list)
"""
resp = raise_for_error(self._reader.readline()).strip()
if not self.raw:
return pythonify_result(resp)
return resp
class ConnectionPool:
def __init__(self, **create_kwargs):
"""ConnectionPool for Sonic connections.
create_kwargs: SonicConnection create kwargs (passed to the connection constructor.)
"""
self._inuse_connections = set()
self._available_connections = Queue()
self._create_kwargs = create_kwargs
def get_connection(self) -> SonicConnection:
"""Gets a connection from the pool or creates one.
Returns:
SonicConnection -- Sonic connection.
"""
conn = None
if not self._available_connections.empty():
conn = self._available_connections.get()
else:
# make connection and add to active connections
conn = self._make_connection()
self._inuse_connections.add(conn)
return conn
def release(self, conn:SonicConnection) -> None:
"""Releases connection `conn` to the pool
Arguments:
conn {SonicConnection} -- Connection to release back to the pool.
"""
self._inuse_connections.remove(conn)
if conn.ping():
self._available_connections.put_nowait(conn)
def _make_connection(self) -> SonicConnection:
"""Creates SonicConnection object and returns it.
Returns:
SonicConnection -- newly created sonic connection.
"""
con = SonicConnection(**self._create_kwargs)
con.connect()
return con
def close(self) -> None:
"""Closes the pool and all of the connections.
"""
for con in itertools.chain(self._inuse_connections, self._available_connections):
con.close()
class SonicClient:
def __init__(self, host: str, port: int, password: str, channel: str, pool: ConnectionPool=None):
"""Base for sonic clients
bufsize: indicates the buffer size to be used while communicating with the server.
protocol: sonic protocol version
Arguments:
host {str} -- sonic server host
port {int} -- sonic server port
password {str} -- user password defined in `config.cfg` file on the server side.
channel {str} -- channel name one of (ingest, search, control)
"""
self.host = host
self.port = port
self._password = password
self.channel = channel
self.bufsize = 0
self.protocol = 1
self.raw = False
self.address = self.host, self.port
if not pool:
self.pool = ConnectionPool(
host=host, port=port, password=password, channel=channel)
def close(self):
"""close the connection and clean up open resources.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_active_connection(self) -> SonicConnection:
"""Gets a connection from the pool
Returns:
SonicConnection -- connection from the pool
"""
active = self.pool.get_connection()
active.raw = self.raw
return active
def _execute_command(self, cmd, *args):
"""Executes command `cmd` with arguments `args`
Arguments:
cmd {str} -- command to execute
*args -- `cmd`'s arguments
Returns:
str|object -- result of execution
"""
active = self.get_active_connection()
try:
res = active._execute_command(cmd, *args)
finally:
self.pool.release(active)
return res
def _execute_command_async(self, cmd, *args):
"""Executes async command `cmd` with arguments `args` and awaits its result.
Arguments:
cmd {str} -- command to execute
*args -- `cmd`'s arguments
Returns:
str|object -- result of execution
"""
active = self.get_active_connection()
try:
active._execute_command(cmd, *args)
resp = active._get_response()
finally:
self.pool.release(active)
return resp
class CommonCommandsMixin:
"""Mixin of the commands used by all sonic channels."""
def ping(self):
"""Send ping command to the server
Returns:
bool -- True if successfully reaching the server.
"""
return self._execute_command("PING")
def quit(self):
"""Quit the channel and closes the connection.
"""
self._execute_command("QUIT")
self.close()
# TODO: check help.
def help(self, *args):
"""Sends Help query."""
return self._execute_command("HELP", *args)
class IngestClient(SonicClient, CommonCommandsMixin):
def __init__(self, host: str, port: str, password: str):
super().__init__(host, port, password, INGEST)
def push(self, collection: str, bucket: str, object: str, text: str, lang: str=None):
"""Push search data in the index
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits
Keyword Arguments:
lang {str} -- [description] (default: {None})
Returns:
bool -- True if search data are pushed in the index.
"""
lang = "LANG({})".format(lang) if lang else ''
text = quote_text(text)
return self._execute_command("PUSH", collection, bucket, object, text, lang)
def pop(self, collection: str, bucket: str, object: str, text: str):
"""Pop search data from the index
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits
Returns:
int
"""
text = quote_text(text)
return self._execute_command("POP", collection, bucket, object, text)
def count(self, collection: str, bucket: str=None, object: str=None):
"""Count indexed search data
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
Keyword Arguments:
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns:
int -- count of index search data.
"""
bucket = bucket or ''
object = object or ''
return self._execute_command('COUNT', collection, bucket, object)
def flush_collection(self, collection: str):
"""Flush all indexed data from a collection
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
Returns:
int -- number of flushed data
"""
return self._execute_command('FLUSHC', collection)
def flush_bucket(self, collection: str, bucket: str):
"""Flush all indexed data from a bucket in a collection
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
Returns:
int -- number of flushed data
"""
return self._execute_command('FLUSHB', collection, bucket)
def flush_object(self, collection: str, bucket: str, object: str):
"""Flush all indexed data from an object in a bucket in collection
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns:
int -- number of flushed data
"""
return self._execute_command('FLUSHO', collection, bucket, object)
def flush(self, collection: str, bucket: str=None, object: str=None):
"""Flush indexed data in a collection, bucket, or in an object.
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
Keyword Arguments:
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns:
int -- number of flushed data
"""
if not bucket and not object:
return self.flush_collection(collection)
elif bucket and not object:
return self.flush_bucket(collection, bucket)
elif object and bucket:
return self.flush_object(collection, bucket, object)
class SearchClient(SonicClient, CommonCommandsMixin):
def __init__(self, host: str, port: int, password: str):
"""Create Sonic client that operates on the Search Channel
Arguments:
host {str} -- valid reachable host address
port {int} -- port number
password {str} -- password (defined in config.cfg file on the server side)
"""
super().__init__(host, port, password, SEARCH)
def query(self, collection: str, bucket: str, terms: str, limit: int=None, offset: int=None, lang: str=None):
"""Query the database
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
terms {str} -- text for search terms
Keyword Arguments:
limit {int} -- a positive integer number; set within allowed maximum & minimum limits
offset {int} -- a positive integer number; set within allowed maximum & minimum limits
lang {str} -- an ISO 639-3 locale code eg. eng for English (if set, the locale must be a valid ISO 639-3 code; if not set, the locale will be guessed from text).
Returns:
list -- list of objects ids.
"""
limit = "LIMIT({})".format(limit) if limit else ''
lang = "LANG({})".format(lang) if lang else ''
offset = "OFFSET({})".format(offset) if offset else ''
terms = quote_text(terms)
return self._execute_command_async(
'QUERY', collection, bucket, terms, limit, offset, lang)
def suggest(self, collection: str, bucket: str, word: str, limit: int=None):
"""auto-completes word.
Arguments:
collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.)
bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
word {str} -- word to autocomplete
Keyword Arguments:
limit {int} -- a positive integer number; set within allowed maximum & minimum limits (default: {None})
Returns:
list -- list of suggested words.
"""
limit = "LIMIT({})".format(limit) if limit else ''
word = quote_text(word)
return self._execute_command(
'SUGGEST', collection, bucket, word, limit)
class ControlClient(SonicClient, CommonCommandsMixin):
def __init__(self, host: str, port: int, password: str):
"""Create Sonic client that operates on the Control Channel
Arguments:
host {str} -- valid reachable host address
port {int} -- port number
password {str} -- password (defined in config.cfg file on the server side)
"""
super().__init__(host, port, password, CONTROL)
def trigger(self, action: str=''):
"""Trigger an action
Keyword Arguments:
action {str} -- text for action
"""
self._execute_command('TRIGGER', action)
def test_ingest():
with IngestClient("127.0.0.1", 1491, 'password') as ingestcl:
print(ingestcl.ping())
print(ingestcl.protocol)
print(ingestcl.bufsize)
ingestcl.push("wiki", "articles", "article-1",
"for the love of god hell")
ingestcl.push("wiki", "articles", "article-2",
"for the love of satan heaven")
ingestcl.push("wiki", "articles", "article-3",
"for the love of lorde hello")
ingestcl.push("wiki", "articles", "article-4",
"for the god of loaf helmet")
def test_search():
with SearchClient("127.0.0.1", 1491, 'password') as querycl:
print(querycl.ping())
print(querycl.query("wiki", "articles", "for"))
print(querycl.query("wiki", "articles", "love"))
def test_control():
with ControlClient("127.0.0.1", 1491, 'password') as controlcl:
print(controlcl.ping())
controlcl.trigger("consolidate")
if __name__ == "__main__":
test_ingest()
test_search()
test_control()
Functions
def is_error(response)
-
Check if the response is Error or not in sonic context.
Errors start with
ERR
Arguments
response {str} – response string
Returns
[bool] – true if response is an error.
Source code
def is_error(response): """Check if the response is Error or not in sonic context. Errors start with `ERR` Arguments: response {str} -- response string Returns: [bool] -- true if response is an error. """ if response.startswith('ERR '): return True return False
def pythonify_result(resp)
-
Source code
def pythonify_result(resp): if resp in ["OK", "PONG"]: return True if resp.startswith("EVENT QUERY") or resp.startswith("EVENT SUGGEST"): return resp.split()[3:] if resp.startswith("RESULT"): return int(resp.split()[-1]) return resp
def quote_text(text)
-
Quote text and normalize it in sonic protocol context.
Arguments
text str – text to quote/escape
Returns
str – quoted text
Source code
def quote_text(text): """Quote text and normalize it in sonic protocol context. Arguments: text str -- text to quote/escape Returns: str -- quoted text """ if text is None: return "" return '"' + text.replace('"', '\\"').replace('\r\n', ' ') + '"'
def raise_for_error(response)
-
Raise SonicServerError in case of error response.
Arguments
response {str} – message to check if it's error or not.
Raises
SonicServerError –
Returns
str – the response message
Source code
def raise_for_error(response): """Raise SonicServerError in case of error response. Arguments: response {str} -- message to check if it's error or not. Raises: SonicServerError -- Returns: str -- the response message """ if is_error(response): raise SonicServerError(response) return response
def test_control()
-
Source code
def test_control(): with ControlClient("127.0.0.1", 1491, 'password') as controlcl: print(controlcl.ping()) controlcl.trigger("consolidate")
def test_ingest()
-
Source code
def test_ingest(): with IngestClient("127.0.0.1", 1491, 'password') as ingestcl: print(ingestcl.ping()) print(ingestcl.protocol) print(ingestcl.bufsize) ingestcl.push("wiki", "articles", "article-1", "for the love of god hell") ingestcl.push("wiki", "articles", "article-2", "for the love of satan heaven") ingestcl.push("wiki", "articles", "article-3", "for the love of lorde hello") ingestcl.push("wiki", "articles", "article-4", "for the god of loaf helmet")
def test_search()
-
Source code
def test_search(): with SearchClient("127.0.0.1", 1491, 'password') as querycl: print(querycl.ping()) print(querycl.query("wiki", "articles", "for")) print(querycl.query("wiki", "articles", "love"))
Classes
class ChannelError (ancestors: builtins.Exception, builtins.BaseException)
-
Sonic Channel specific exception
Source code
class ChannelError(Exception): """Sonic Channel specific exception""" pass
class CommonCommandsMixin
-
Mixin of the commands used by all sonic channels.
Source code
class CommonCommandsMixin: """Mixin of the commands used by all sonic channels.""" def ping(self): """Send ping command to the server Returns: bool -- True if successfully reaching the server. """ return self._execute_command("PING") def quit(self): """Quit the channel and closes the connection. """ self._execute_command("QUIT") self.close() # TODO: check help. def help(self, *args): """Sends Help query.""" return self._execute_command("HELP", *args)
Subclasses
Methods
def help(self, *args)
-
Sends Help query.
Source code
def help(self, *args): """Sends Help query.""" return self._execute_command("HELP", *args)
def ping(self)
-
Send ping command to the server
Returns
bool – True if successfully reaching the server.
Source code
def ping(self): """Send ping command to the server Returns: bool -- True if successfully reaching the server. """ return self._execute_command("PING")
def quit(self)
-
Quit the channel and closes the connection.
Source code
def quit(self): """Quit the channel and closes the connection. """ self._execute_command("QUIT") self.close()
class ConnectionPool
-
Source code
class ConnectionPool: def __init__(self, **create_kwargs): """ConnectionPool for Sonic connections. create_kwargs: SonicConnection create kwargs (passed to the connection constructor.) """ self._inuse_connections = set() self._available_connections = Queue() self._create_kwargs = create_kwargs def get_connection(self) -> SonicConnection: """Gets a connection from the pool or creates one. Returns: SonicConnection -- Sonic connection. """ conn = None if not self._available_connections.empty(): conn = self._available_connections.get() else: # make connection and add to active connections conn = self._make_connection() self._inuse_connections.add(conn) return conn def release(self, conn:SonicConnection) -> None: """Releases connection `conn` to the pool Arguments: conn {SonicConnection} -- Connection to release back to the pool. """ self._inuse_connections.remove(conn) if conn.ping(): self._available_connections.put_nowait(conn) def _make_connection(self) -> SonicConnection: """Creates SonicConnection object and returns it. Returns: SonicConnection -- newly created sonic connection. """ con = SonicConnection(**self._create_kwargs) con.connect() return con def close(self) -> None: """Closes the pool and all of the connections. """ for con in itertools.chain(self._inuse_connections, self._available_connections): con.close()
Methods
def __init__(self, **create_kwargs)
-
ConnectionPool for Sonic connections.
create_kwargs
:SonicConnection
create
kwargs
(passed
to
the
connection
constructor.
)
Source code
def __init__(self, **create_kwargs): """ConnectionPool for Sonic connections. create_kwargs: SonicConnection create kwargs (passed to the connection constructor.) """ self._inuse_connections = set() self._available_connections = Queue() self._create_kwargs = create_kwargs
def close(self)
-
Closes the pool and all of the connections.
Source code
def close(self) -> None: """Closes the pool and all of the connections. """ for con in itertools.chain(self._inuse_connections, self._available_connections): con.close()
def get_connection(self)
-
Gets a connection from the pool or creates one.
Returns
SonicConnection – Sonic connection.
Source code
def get_connection(self) -> SonicConnection: """Gets a connection from the pool or creates one. Returns: SonicConnection -- Sonic connection. """ conn = None if not self._available_connections.empty(): conn = self._available_connections.get() else: # make connection and add to active connections conn = self._make_connection() self._inuse_connections.add(conn) return conn
def release(self, conn)
-
Releases connection
conn
to the poolArguments
conn {SonicConnection} – Connection to release back to the pool.
Source code
def release(self, conn:SonicConnection) -> None: """Releases connection `conn` to the pool Arguments: conn {SonicConnection} -- Connection to release back to the pool. """ self._inuse_connections.remove(conn) if conn.ping(): self._available_connections.put_nowait(conn)
class ControlClient (ancestors: SonicClient, CommonCommandsMixin)
-
Mixin of the commands used by all sonic channels.
Source code
class ControlClient(SonicClient, CommonCommandsMixin): def __init__(self, host: str, port: int, password: str): """Create Sonic client that operates on the Control Channel Arguments: host {str} -- valid reachable host address port {int} -- port number password {str} -- password (defined in config.cfg file on the server side) """ super().__init__(host, port, password, CONTROL) def trigger(self, action: str=''): """Trigger an action Keyword Arguments: action {str} -- text for action """ self._execute_command('TRIGGER', action)
Methods
def __init__(self, host, port, password)
-
Create Sonic client that operates on the Control Channel
Arguments
host {str} – valid reachable host address port {int} – port number password {str} – password (defined in config.cfg file on the server side)
Source code
def __init__(self, host: str, port: int, password: str): """Create Sonic client that operates on the Control Channel Arguments: host {str} -- valid reachable host address port {int} -- port number password {str} -- password (defined in config.cfg file on the server side) """ super().__init__(host, port, password, CONTROL)
def trigger(self, action='')
-
Trigger an action
Keyword Arguments: action {str} – text for action
Source code
def trigger(self, action: str=''): """Trigger an action Keyword Arguments: action {str} -- text for action """ self._execute_command('TRIGGER', action)
Inherited members
class IngestClient (ancestors: SonicClient, CommonCommandsMixin)
-
Mixin of the commands used by all sonic channels.
Source code
class IngestClient(SonicClient, CommonCommandsMixin): def __init__(self, host: str, port: str, password: str): super().__init__(host, port, password, INGEST) def push(self, collection: str, bucket: str, object: str, text: str, lang: str=None): """Push search data in the index Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits Keyword Arguments: lang {str} -- [description] (default: {None}) Returns: bool -- True if search data are pushed in the index. """ lang = "LANG({})".format(lang) if lang else '' text = quote_text(text) return self._execute_command("PUSH", collection, bucket, object, text, lang) def pop(self, collection: str, bucket: str, object: str, text: str): """Pop search data from the index Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits Returns: int """ text = quote_text(text) return self._execute_command("POP", collection, bucket, object, text) def count(self, collection: str, bucket: str=None, object: str=None): """Count indexed search data Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- count of index search data. """ bucket = bucket or '' object = object or '' return self._execute_command('COUNT', collection, bucket, object) def flush_collection(self, collection: str): """Flush all indexed data from a collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Returns: int -- number of flushed data """ return self._execute_command('FLUSHC', collection) def flush_bucket(self, collection: str, bucket: str): """Flush all indexed data from a bucket in a collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) Returns: int -- number of flushed data """ return self._execute_command('FLUSHB', collection, bucket) def flush_object(self, collection: str, bucket: str, object: str): """Flush all indexed data from an object in a bucket in collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- number of flushed data """ return self._execute_command('FLUSHO', collection, bucket, object) def flush(self, collection: str, bucket: str=None, object: str=None): """Flush indexed data in a collection, bucket, or in an object. Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- number of flushed data """ if not bucket and not object: return self.flush_collection(collection) elif bucket and not object: return self.flush_bucket(collection, bucket) elif object and bucket: return self.flush_object(collection, bucket, object)
Methods
def count(self, collection, bucket=None, object=None)
-
Count indexed search data
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} – object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns
int – count of index search data.
Source code
def count(self, collection: str, bucket: str=None, object: str=None): """Count indexed search data Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- count of index search data. """ bucket = bucket or '' object = object or '' return self._execute_command('COUNT', collection, bucket, object)
def flush(self, collection, bucket=None, object=None)
-
Flush indexed data in a collection, bucket, or in an object.
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} – object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns
int – number of flushed data
Source code
def flush(self, collection: str, bucket: str=None, object: str=None): """Flush indexed data in a collection, bucket, or in an object. Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Keyword Arguments: bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- number of flushed data """ if not bucket and not object: return self.flush_collection(collection) elif bucket and not object: return self.flush_bucket(collection, bucket) elif object and bucket: return self.flush_object(collection, bucket, object)
def flush_bucket(self, collection, bucket)
-
Flush all indexed data from a bucket in a collection
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..)
Returns
int – number of flushed data
Source code
def flush_bucket(self, collection: str, bucket: str): """Flush all indexed data from a bucket in a collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) Returns: int -- number of flushed data """ return self._execute_command('FLUSHB', collection, bucket)
def flush_collection(self, collection)
-
Flush all indexed data from a collection
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.)
Returns
int – number of flushed data
Source code
def flush_collection(self, collection: str): """Flush all indexed data from a collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) Returns: int -- number of flushed data """ return self._execute_command('FLUSHC', collection)
def flush_object(self, collection, bucket, object)
-
Flush all indexed data from an object in a bucket in collection
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} – object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact)
Returns
int – number of flushed data
Source code
def flush_object(self, collection: str, bucket: str, object: str): """Flush all indexed data from an object in a bucket in collection Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) Returns: int -- number of flushed data """ return self._execute_command('FLUSHO', collection, bucket, object)
def pop(self, collection, bucket, object, text)
-
Pop search data from the index
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} – object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} – search text to be indexed can be a single word, or a longer text; within maximum length safety limits
Returns
int
Source code
def pop(self, collection: str, bucket: str, object: str, text: str): """Pop search data from the index Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits Returns: int """ text = quote_text(text) return self._execute_command("POP", collection, bucket, object, text)
def push(self, collection, bucket, object, text, lang=None)
-
Push search data in the index
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} – object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} – search text to be indexed can be a single word, or a longer text; within maximum length safety limits Keyword Arguments: lang {str} – [description] (default: {None})
Returns
bool – True if search data are pushed in the index.
Source code
def push(self, collection: str, bucket: str, object: str, text: str, lang: str=None): """Push search data in the index Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) object {str} -- object identifier that refers to an entity in an external database, where the searched object is stored (eg. you use Sonic to index CRM contacts by name; full CRM contact data is stored in a MySQL database; in this case the object identifier in Sonic will be the MySQL primary key for the CRM contact) text {str} -- search text to be indexed can be a single word, or a longer text; within maximum length safety limits Keyword Arguments: lang {str} -- [description] (default: {None}) Returns: bool -- True if search data are pushed in the index. """ lang = "LANG({})".format(lang) if lang else '' text = quote_text(text) return self._execute_command("PUSH", collection, bucket, object, text, lang)
Inherited members
class SearchClient (ancestors: SonicClient, CommonCommandsMixin)
-
Mixin of the commands used by all sonic channels.
Source code
class SearchClient(SonicClient, CommonCommandsMixin): def __init__(self, host: str, port: int, password: str): """Create Sonic client that operates on the Search Channel Arguments: host {str} -- valid reachable host address port {int} -- port number password {str} -- password (defined in config.cfg file on the server side) """ super().__init__(host, port, password, SEARCH) def query(self, collection: str, bucket: str, terms: str, limit: int=None, offset: int=None, lang: str=None): """Query the database Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) terms {str} -- text for search terms Keyword Arguments: limit {int} -- a positive integer number; set within allowed maximum & minimum limits offset {int} -- a positive integer number; set within allowed maximum & minimum limits lang {str} -- an ISO 639-3 locale code eg. eng for English (if set, the locale must be a valid ISO 639-3 code; if not set, the locale will be guessed from text). Returns: list -- list of objects ids. """ limit = "LIMIT({})".format(limit) if limit else '' lang = "LANG({})".format(lang) if lang else '' offset = "OFFSET({})".format(offset) if offset else '' terms = quote_text(terms) return self._execute_command_async( 'QUERY', collection, bucket, terms, limit, offset, lang) def suggest(self, collection: str, bucket: str, word: str, limit: int=None): """auto-completes word. Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) word {str} -- word to autocomplete Keyword Arguments: limit {int} -- a positive integer number; set within allowed maximum & minimum limits (default: {None}) Returns: list -- list of suggested words. """ limit = "LIMIT({})".format(limit) if limit else '' word = quote_text(word) return self._execute_command( 'SUGGEST', collection, bucket, word, limit)
Methods
def __init__(self, host, port, password)
-
Create Sonic client that operates on the Search Channel
Arguments
host {str} – valid reachable host address port {int} – port number password {str} – password (defined in config.cfg file on the server side)
Source code
def __init__(self, host: str, port: int, password: str): """Create Sonic client that operates on the Search Channel Arguments: host {str} -- valid reachable host address port {int} -- port number password {str} -- password (defined in config.cfg file on the server side) """ super().__init__(host, port, password, SEARCH)
def query(self, collection, bucket, terms, limit=None, offset=None, lang=None)
-
Query the database
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) terms {str} – text for search terms Keyword Arguments: limit {int} – a positive integer number; set within allowed maximum & minimum limits offset {int} – a positive integer number; set within allowed maximum & minimum limits lang {str} – an ISO 639-3 locale code eg. eng for English (if set, the locale must be a valid ISO 639-3 code; if not set, the locale will be guessed from text).
Returns
list – list of objects ids.
Source code
def query(self, collection: str, bucket: str, terms: str, limit: int=None, offset: int=None, lang: str=None): """Query the database Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) terms {str} -- text for search terms Keyword Arguments: limit {int} -- a positive integer number; set within allowed maximum & minimum limits offset {int} -- a positive integer number; set within allowed maximum & minimum limits lang {str} -- an ISO 639-3 locale code eg. eng for English (if set, the locale must be a valid ISO 639-3 code; if not set, the locale will be guessed from text). Returns: list -- list of objects ids. """ limit = "LIMIT({})".format(limit) if limit else '' lang = "LANG({})".format(lang) if lang else '' offset = "OFFSET({})".format(offset) if offset else '' terms = quote_text(terms) return self._execute_command_async( 'QUERY', collection, bucket, terms, limit, offset, lang)
def suggest(self, collection, bucket, word, limit=None)
-
auto-completes word.
Arguments
collection {str} – index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} – index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) word {str} – word to autocomplete Keyword Arguments: limit {int} – a positive integer number; set within allowed maximum & minimum limits (default: {None})
Returns
list – list of suggested words.
Source code
def suggest(self, collection: str, bucket: str, word: str, limit: int=None): """auto-completes word. Arguments: collection {str} -- index collection (ie. what you search in, eg. messages, products, etc.) bucket {str} -- index bucket name (ie. user-specific search classifier in the collection if you have any eg. user-1, user-2, .., otherwise use a common bucket name eg. generic, default, common, ..) word {str} -- word to autocomplete Keyword Arguments: limit {int} -- a positive integer number; set within allowed maximum & minimum limits (default: {None}) Returns: list -- list of suggested words. """ limit = "LIMIT({})".format(limit) if limit else '' word = quote_text(word) return self._execute_command( 'SUGGEST', collection, bucket, word, limit)
Inherited members
class SonicClient
-
Source code
class SonicClient: def __init__(self, host: str, port: int, password: str, channel: str, pool: ConnectionPool=None): """Base for sonic clients bufsize: indicates the buffer size to be used while communicating with the server. protocol: sonic protocol version Arguments: host {str} -- sonic server host port {int} -- sonic server port password {str} -- user password defined in `config.cfg` file on the server side. channel {str} -- channel name one of (ingest, search, control) """ self.host = host self.port = port self._password = password self.channel = channel self.bufsize = 0 self.protocol = 1 self.raw = False self.address = self.host, self.port if not pool: self.pool = ConnectionPool( host=host, port=port, password=password, channel=channel) def close(self): """close the connection and clean up open resources. """ pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def get_active_connection(self) -> SonicConnection: """Gets a connection from the pool Returns: SonicConnection -- connection from the pool """ active = self.pool.get_connection() active.raw = self.raw return active def _execute_command(self, cmd, *args): """Executes command `cmd` with arguments `args` Arguments: cmd {str} -- command to execute *args -- `cmd`'s arguments Returns: str|object -- result of execution """ active = self.get_active_connection() try: res = active._execute_command(cmd, *args) finally: self.pool.release(active) return res def _execute_command_async(self, cmd, *args): """Executes async command `cmd` with arguments `args` and awaits its result. Arguments: cmd {str} -- command to execute *args -- `cmd`'s arguments Returns: str|object -- result of execution """ active = self.get_active_connection() try: active._execute_command(cmd, *args) resp = active._get_response() finally: self.pool.release(active) return resp
Subclasses
Methods
def __init__(self, host, port, password, channel, pool=None)
-
Base for sonic clients
- bufsize: indicates the buffer size to be used while communicating with the server.
protocol
:sonic
protocol
version
Arguments
host {str} – sonic server host port {int} – sonic server port password {str} – user password defined in
config.cfg
file on the server side. channel {str} – channel name one of (ingest, search, control)Source code
def __init__(self, host: str, port: int, password: str, channel: str, pool: ConnectionPool=None): """Base for sonic clients bufsize: indicates the buffer size to be used while communicating with the server. protocol: sonic protocol version Arguments: host {str} -- sonic server host port {int} -- sonic server port password {str} -- user password defined in `config.cfg` file on the server side. channel {str} -- channel name one of (ingest, search, control) """ self.host = host self.port = port self._password = password self.channel = channel self.bufsize = 0 self.protocol = 1 self.raw = False self.address = self.host, self.port if not pool: self.pool = ConnectionPool( host=host, port=port, password=password, channel=channel)
def close(self)
-
close the connection and clean up open resources.
Source code
def close(self): """close the connection and clean up open resources. """ pass
def get_active_connection(self)
-
Gets a connection from the pool
Returns
SonicConnection – connection from the pool
Source code
def get_active_connection(self) -> SonicConnection: """Gets a connection from the pool Returns: SonicConnection -- connection from the pool """ active = self.pool.get_connection() active.raw = self.raw return active
class SonicConnection
-
Source code
class SonicConnection: def __init__(self, host: str, port: int, password: str, channel: str, keepalive: bool=True, timeout: int=60): """Base for sonic connections bufsize: indicates the buffer size to be used while communicating with the server. protocol: sonic protocol version Arguments: host {str} -- sonic server host port {int} -- sonic server port password {str} -- user password defined in `config.cfg` file on the server side. channel {str} -- channel name one of (ingest, search, control) Keyword Arguments: keepalive {bool} -- sets keepalive socket option (default: {True}) timeout {int} -- sets socket timeout (default: {60}) """ self.host = host self.port = port self._password = password self.channel = channel self.raw = False self.address = self.host, self.port self.keepalive = keepalive self.timeout = timeout self.socket_connect_timeout = 10 self.__socket = None self.__reader = None self.__writer = None self.bufize = None self.protocol = None def connect(self): """Connects to sonic server endpoint Returns: bool: True when connection happens and successfully switched to a channel. """ resp = self._reader.readline() if 'CONNECTED' in resp: self.connected = True resp = self._execute_command("START", self.channel, self._password) self.protocol = _parse_protocol_version(resp) self.bufsize = _parse_buffer_size(resp) return self.ping() def ping(self): return self._execute_command("PING") == "PONG" def __create_connection(self, address): "Create a TCP socket connection" # we want to mimic what socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() # snippet taken from redis client code. err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # set the socket_connect_timeout before we connect if self.socket_connect_timeout: sock.settimeout(self.timeout) # connect sock.connect(socket_address) # set the socket_timeout now that we're connected if self.timeout: sock.settimeout(self.timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned an empty list") @property def _socket(self): if not self.__socket: # socket.create_connection(self.address) self.__socket = self.__create_connection(self.address) return self.__socket @property def _reader(self): if not self.__reader: self.__reader = self._socket.makefile('r') return self.__reader @property def _writer(self): if not self.__writer: self.__writer = self._socket.makefile('w') return self.__writer def close(self): """ Closes the connection and its resources. """ resources = (self.__reader, self.__writer, self.__socket) for rc in resources: if rc is not None: rc.close() self.__reader = None self.__writer = None self.__socket = None def _format_command(self, cmd, *args): """Format command according to sonic protocol Arguments: cmd {str} -- a valid sonic command Returns: str -- formatted command string to be sent on the wire. """ cmd_str = cmd + " " cmd_str += " ".join(args) cmd_str += "\n" # specs says \n, asonic does \r\n return cmd_str def _execute_command(self, cmd, *args): """Formats and sends command with suitable arguments on the wire to sonic server Arguments: cmd {str} -- valid command Raises: ChannelError -- Raised for unsupported channel commands Returns: object|str -- depends on the `self.raw` mode if mode is raw: result is always a string else the result is converted to suitable python response (e.g boolean, int, list) """ if cmd not in ALL_CMDS[self.channel]: raise ChannelError( "command {} isn't allowed in channel {}".format(cmd, self.channel)) cmd_str = self._format_command(cmd, *args) self._writer.write(cmd_str) self._writer.flush() resp = self._get_response() return resp def _get_response(self): """Gets a response string from sonic server. Returns: object|str -- depends on the `self.raw` mode if mode is raw: result is always a string else the result is converted to suitable python response (e.g boolean, int, list) """ resp = raise_for_error(self._reader.readline()).strip() if not self.raw: return pythonify_result(resp) return resp
Methods
def __init__(self, host, port, password, channel, keepalive=True, timeout=60)
-
Base for sonic connections
- bufsize: indicates the buffer size to be used while communicating with the server.
protocol
:sonic
protocol
version
Arguments
host {str} – sonic server host port {int} – sonic server port password {str} – user password defined in
config.cfg
file on the server side. channel {str} – channel name one of (ingest, search, control) Keyword Arguments: keepalive {bool} – sets keepalive socket option (default: {True}) timeout {int} – sets socket timeout (default: {60})Source code
def __init__(self, host: str, port: int, password: str, channel: str, keepalive: bool=True, timeout: int=60): """Base for sonic connections bufsize: indicates the buffer size to be used while communicating with the server. protocol: sonic protocol version Arguments: host {str} -- sonic server host port {int} -- sonic server port password {str} -- user password defined in `config.cfg` file on the server side. channel {str} -- channel name one of (ingest, search, control) Keyword Arguments: keepalive {bool} -- sets keepalive socket option (default: {True}) timeout {int} -- sets socket timeout (default: {60}) """ self.host = host self.port = port self._password = password self.channel = channel self.raw = False self.address = self.host, self.port self.keepalive = keepalive self.timeout = timeout self.socket_connect_timeout = 10 self.__socket = None self.__reader = None self.__writer = None self.bufize = None self.protocol = None
def close(self)
-
Closes the connection and its resources.
Source code
def close(self): """ Closes the connection and its resources. """ resources = (self.__reader, self.__writer, self.__socket) for rc in resources: if rc is not None: rc.close() self.__reader = None self.__writer = None self.__socket = None
def connect(self)
-
Connects to sonic server endpoint
Returns
bool
- True when connection happens and successfully switched to a channel.
Source code
def connect(self): """Connects to sonic server endpoint Returns: bool: True when connection happens and successfully switched to a channel. """ resp = self._reader.readline() if 'CONNECTED' in resp: self.connected = True resp = self._execute_command("START", self.channel, self._password) self.protocol = _parse_protocol_version(resp) self.bufsize = _parse_buffer_size(resp) return self.ping()
def ping(self)
-
Source code
def ping(self): return self._execute_command("PING") == "PONG"
class SonicServerError (ancestors: builtins.Exception, builtins.BaseException)
-
Generic Sonic Server exception
Source code
class SonicServerError(Exception): """Generic Sonic Server exception""" pass