Redis®*: Uso con Python
Cómo conectar Redis con Python
👋 ¡Bienvenido a la documentación de Stackhero!
Stackhero ofrece una solución Redis cloud lista para usar que proporciona una serie de beneficios, incluyendo:
- Interfaz web
Redis Commanderincluida.- Tamaño y transferencias de mensajes ilimitados.
- Actualizaciones sin esfuerzo con solo un clic.
- Rendimiento óptimo y seguridad robusta gracias a una VM privada y dedicada.
Ahorra tiempo y simplifica tu vida: ¡solo toma 5 minutos probar la solución de alojamiento Redis cloud de Stackhero!
Elegir la biblioteca de Python adecuada para conectar con Redis
Para conectar su aplicación de Python a una instancia de Redis de manera fluida, la biblioteca redis es una opción popular y eficiente. Puede comenzar instalándola con la siguiente sugerencia:
pip install redis
pip freeze > requirements.txt
Conectar Python a Redis
Exploremos un ejemplo sencillo utilizando valores predeterminados que deberían satisfacer la mayoría de las necesidades:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Para mejorar la seguridad, considere usar variables de entorno para sus credenciales. Aquí le mostramos cómo hacerlo:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Asegúrese de que sus variables de entorno incluyan una definición como: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Usar Pub/Sub con Redis y Python
La funcionalidad de Publicar/Suscribirse de Redis se puede utilizar fácilmente con Python. Aquí tiene un ejemplo sencillo para guiarle:
import redis
# Conectar a Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Crear una instancia de PubSub
p = r.pubsub()
# Suscribirse al canal "test"
p.subscribe('test')
# Publicar un mensaje en el canal "test"
r.publish('test', 'Este es un mensaje de prueba')
# Obtener el primer mensaje disponible del canal "test"
p.get_message()
# Cancelar la suscripción del canal "test"
p.unsubscribe('test')
Ejemplos avanzados de Redis Pub/Sub con Python
Puede que desee explorar técnicas más avanzadas de Pub/Sub con los siguientes ejemplos:
# Crear una instancia de PubSub e ignorar los mensajes de suscripción
p = r.pubsub(ignore_subscribe_messages=True)
# Suscribirse a múltiples canales
p.subscribe('test-1', 'test-2', ...)
# Cancelar la suscripción de múltiples canales
p.unsubscribe('test-1', 'test-2', ...)
# También puede usar "unsubscribe" sin argumentos para desconectarse de todos los canales suscritos
p.unsubscribe()
# Suscribirse a canales usando un patrón
p.psubscribe('my-*')
# Cancelar la suscripción de canales usando un patrón
p.punsubscribe('my-*')
Evitar el error "Connection Closed by Server" en Redis y Python
Puede encontrarse con el error redis.exceptions.ConnectionError: Connection closed by server, que puede ocurrir debido a la inactividad en su aplicación de Python, causando que la conexión se cierre automáticamente. Cuando su aplicación intenta reconectarse, puede fallar, resultando en este error.
Para mitigar esto, considere establecer el parámetro health_check_interval en su conexión Redis de la siguiente manera:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Al usar la funcionalidad Pub/Sub de Redis, asegúrese de que su aplicación llame a get_message() o listen() con más frecuencia que el health_check_interval especificado (en este ejemplo, cada 10 segundos). Puede consultar la documentación oficial de redis-py para más detalles.
Si estas llamadas no se realizan dentro del intervalo, aún puede encontrarse con el error "Connection closed by server". Una solución práctica es usar la función check_health() regularmente.
Aquí le mostramos cómo puede implementarlo:
import redis
import threading
# Conectar a Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Crear una instancia de PubSub
p = r.pubsub()
# Suscribirse al canal "test"
p.subscribe('test')
# Crear una función que llame a `check_health` cada 5 segundos
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Llamar a la función redis_auto_check
redis_auto_check(p)