Source code for jupyterhub_traefik_proxy.redis

"""Redis backend"""

import asyncio
from urllib.parse import urlparse

from traitlets import Any, Dict, Unicode, default

from .kv_proxy import TKvProxy
from .traefik_utils import deep_merge


[docs] class TraefikRedisProxy(TKvProxy): """JupyterHub Proxy implementation using traefik and redis""" provider_name = "redis" redis_url = Unicode( 'redis://localhost:6379', config=True, help="""The URL for the redis endpoint""" ) redis_username = Unicode(config=True, help="The redis username") redis_password = Unicode(config=True, help="The redis password") redis_client_kwargs = Dict( config=True, help="Additional keyword arguments to pass through to the `redis.asyncio.Redis` constructor", ) redis = Any() @default("redis") def _connect_redis(self): try: from redis.asyncio import Redis except ImportError: raise ImportError( "Please install `redis` package to use traefik-proxy with redis" ) url = urlparse(self.redis_url) if url.port: port = url.port else: # default port port = 6379 kwargs = dict( host=url.hostname, port=port, decode_responses=True, ) if self.redis_password: kwargs["password"] = self.redis_password if self.redis_username: kwargs["username"] = self.redis_username kwargs.update(self.redis_client_kwargs) return Redis(**kwargs) async def _cleanup(self): f = super()._cleanup() if f is not None: await f await self.redis.close() def _setup_traefik_static_config(self): self.log.debug("Setting up the redis provider in the traefik static config") url = urlparse(self.redis_url) redis_config = { "endpoints": [url.netloc], "rootKey": self.kv_traefik_prefix, } if self.redis_username: redis_config["username"] = self.redis_username if self.redis_password: redis_config["password"] = self.redis_password self.static_config = deep_merge( self.static_config, {"providers": {"redis": redis_config}} ) return super()._setup_traefik_static_config() async def _kv_atomic_set(self, to_set: dict): """Set a collection of keys and values Should be done atomically (i.e. in a transaction), setting nothing on failure. Args: to_set (dict): key/value pairs to set Will always be a flattened dict of single key-value pairs, not a nested structure. """ self.log.debug("Setting redis keys %s", to_set.keys()) await self.redis.mset(to_set) _delete_script = Any() @default("_delete_script") def _register_delete_script(self): """Register LUA script for deleting all keys matching in a prefix Doing the scan & delete from Python is _extremely_ slow for some reason """ _delete_lua = """ local all_keys = {}; local cursor = ""; repeat local result = redis.call("SCAN", cursor, "match", ARGV[1], "count", ARGV[2]) cursor = result[1]; for i, key in ipairs(result[2]) do table.insert(all_keys, key); end until cursor == "0" for i, key in ipairs(all_keys) do redis.call("DEL", key); end return #all_keys; """ return self.redis.register_script(_delete_lua) async def _kv_atomic_delete(self, *keys): """Delete one or more keys If a key ends with `self.kv_separator`, it should be a recursive delete """ to_delete = [] futures = [] for key in keys: if key.endswith(self.kv_separator): prefix = key + "*" self.log.debug("Deleting redis tree %s", prefix) f = asyncio.ensure_future(self._delete_script(args=[prefix, 100])) f.add_done_callback( lambda f: self.log.debug( "Deleted %i keys in %s", f.result(), prefix ) ) futures.append(f) else: to_delete.append(key) if to_delete: self.log.debug("Deleting redis keys %s", to_delete) futures.append(self.redis.delete(*to_delete)) await asyncio.gather(*futures) async def _kv_get_tree(self, prefix): """Return all data under prefix as a dict""" if not prefix.endswith(self.kv_separator): prefix = prefix + self.kv_separator keys = [] # is there a possibility this could get too big? # should we batch? async for key in self.redis.scan_iter(match=prefix + "*"): keys.append(key) self.log.debug("Getting redis keys %s", keys) values = list(await self.redis.mget(keys)) kv_list = zip(keys, values) return self.unflatten_dict_from_kv(kv_list, root_key=prefix)