Valkey: Utilisation avec Python

Comment connecter Valkey avec Python

👋 Bienvenue sur la documentation de Stackhero !

Stackhero propose une solution Valkey cloud prête à l'emploi offrant de nombreux avantages, notamment :

  • Interface web UI Redis Commander incluse.
  • Taille et transferts de messages illimités.
  • Mises à jour simplifiées en un clic.
  • Performance optimale et sécurité robuste grâce à une VM privée et dédiée.

Gagnez du temps et simplifiez-vous la vie : il suffit de 5 minutes pour essayer la solution Valkey cloud hosting de Stackhero !

Pour intégrer votre application avec Valkey de manière fluide, envisagez d'utiliser la bibliothèque redis pour Python. Cette bibliothèque est bien adaptée pour se connecter à Valkey, un fork open-source de Redis. Vous pouvez l'installer facilement en exécutant :

pip install redis
pip freeze > requirements.txt

Explorons comment vous pouvez connecter votre application Python à Valkey. Voici un exemple simple utilisant les paramètres par défaut :

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Pour une sécurité renforcée, il est conseillé de gérer les identifiants à l'aide de variables d'environnement. Voici comment procéder :

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_VALKEY_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Assurez-vous que vos variables d'environnement incluent une entrée comme celle-ci : STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.

Exploiter la fonctionnalité Publish/Subscribe (Pub/Sub) en Python avec Valkey est simple. Voici un exemple :

import redis

# Connecter à Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# Créer une instance PubSub
p = r.pubsub()

# S'abonner au canal "test"
p.subscribe('test')

# Publier un message sur le canal "test"
r.publish('test', 'Ceci est un message de test')

# Obtenir le premier message disponible du canal "test"
p.get_message()

# Se désabonner du canal "test"
p.unsubscribe('test')

Étendez vos capacités Pub/Sub avec ces exemples avancés :

# Créer une instance PubSub et ignorer les messages d'abonnement
p = r.pubsub(ignore_subscribe_messages=True)

# S'abonner à plusieurs canaux
p.subscribe('test-1', 'test-2', ...)

# Se désabonner de plusieurs canaux
p.unsubscribe('test-1', 'test-2', ...)

# Vous pouvez également utiliser "unsubscribe" sans arguments pour se déconnecter de tous les canaux abonnés
p.unsubscribe()

# S'abonner à des canaux en utilisant un motif
p.psubscribe('my-*')

# Se désabonner de canaux en utilisant un motif
p.punsubscribe('my-*')

L'erreur redis.exceptions.ConnectionError: Connection closed by server peut survenir si votre application n'interagit pas avec Valkey pendant un certain temps, entraînant une déconnexion automatique. Pour éviter cela, vous pouvez définir le paramètre health_check_interval comme indiqué ci-dessous :

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Lors de l'utilisation de la fonctionnalité Pub/Sub de Valkey, la bibliothèque redis-py s'attend à ce que des fonctions comme get_message() ou listen() soient appelées plus fréquemment que l'intervalle health_check_interval. Dans notre exemple, nous avons défini cet intervalle à 10 secondes, assurez-vous donc d'appeler get_message() ou listen() au moins une fois toutes les 10 secondes (référez-vous à la documentation officielle de redis-py).

Si cela n'est pas fait, vous pourriez rencontrer la même erreur de connexion. Pour éviter cela, envisagez d'appeler régulièrement check_health().

Voici comment vous pouvez l'implémenter :

import redis
import threading

# Connecter à Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Créer une instance PubSub
p = r.pubsub()

# S'abonner au canal "test"
p.subscribe('test')

# Créer une fonction qui appellera `check_health` toutes les 5 secondes
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Appeler la fonction valkey_auto_check
valkey_auto_check(p)