Redis®*: Utilisation avec Python
Comment connecter Redis avec Python
👋 Bienvenue sur la documentation de Stackhero !
Stackhero propose une solution Redis cloud prête à l'emploi offrant de nombreux avantages, notamment :
- Interface web
Redis Commanderincluse.- Taille et transferts de messages illimités.
- Mises à jour simplifiées en un clic.
- Performance optimale et sécurité renforcée grâce à une VM privée et dédiée.
Gagnez du temps et simplifiez-vous la vie : il suffit de 5 minutes pour essayer la solution d'hébergement Redis cloud de Stackhero !
Choisir la bonne bibliothèque Python pour se connecter à Redis
Pour connecter votre application Python à une instance Redis de manière fluide, la bibliothèque redis est un choix populaire et efficace. Vous pouvez commencer par l'installer avec la suggestion suivante :
pip install redis
pip freeze > requirements.txt
Connecter Python à Redis
Explorons un exemple simple utilisant des valeurs par défaut qui devraient répondre à la plupart des besoins :
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Pour renforcer la sécurité, envisagez d'utiliser des variables d'environnement pour vos identifiants. Voici comment procéder :
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Assurez-vous que vos variables d'environnement incluent une définition comme : STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Utiliser Pub/Sub avec Redis et Python
La fonctionnalité Publish/Subscribe de Redis peut être facilement utilisée avec Python. Voici un exemple simple pour vous guider :
import redis
# Connecter à Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Créer une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Publier un message sur le canal "test"
r.publish('test', 'Ceci est un message de test')
# Obtenir le premier message disponible du canal "test"
p.get_message()
# Se désabonner du canal "test"
p.unsubscribe('test')
Exemples avancés de Redis Pub/Sub avec Python
Vous pourriez vouloir explorer des techniques Pub/Sub plus avancées avec les exemples suivants :
# Créer une instance PubSub et ignorer les messages d'abonnement
p = r.pubsub(ignore_subscribe_messages=True)
# S'abonner à plusieurs canaux
p.subscribe('test-1', 'test-2', ...)
# Se désabonner de plusieurs canaux
p.unsubscribe('test-1', 'test-2', ...)
# Vous pouvez également utiliser "unsubscribe" sans arguments pour se déconnecter de tous les canaux abonnés
p.unsubscribe()
# S'abonner à des canaux en utilisant un motif
p.psubscribe('my-*')
# Se désabonner de canaux en utilisant un motif
p.punsubscribe('my-*')
Éviter l'erreur "Connection Closed by Server" dans Redis et Python
Vous pourriez rencontrer l'erreur redis.exceptions.ConnectionError: Connection closed by server, qui peut survenir en raison de l'inactivité de votre application Python, entraînant la fermeture automatique de la connexion. Lorsque votre application tente de se reconnecter, elle peut échouer, entraînant cette erreur.
Pour atténuer cela, envisagez de définir le paramètre health_check_interval dans votre connexion Redis comme suit :
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Lors de l'utilisation de la fonctionnalité Pub/Sub de Redis, assurez-vous que votre application appelle get_message() ou listen() plus fréquemment que l'intervalle spécifié par health_check_interval (dans cet exemple, toutes les 10 secondes). Vous pouvez vous référer à la documentation officielle de redis-py pour plus de détails.
Si ces appels ne sont pas effectués dans l'intervalle, vous pourriez toujours rencontrer l'erreur "Connection closed by server". Une solution pratique est d'utiliser régulièrement la fonction check_health().
Voici comment vous pouvez l'implémenter :
import redis
import threading
# Connecter à Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Créer une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Créer une fonction qui appellera `check_health` toutes les 5 secondes
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Appeler la fonction redis_auto_check
redis_auto_check(p)