Valkey: Utilizzo con Python

Come connettere Valkey con Python

👋 Benvenuti nella documentazione di Stackhero!

Stackhero offre una soluzione Valkey cloud pronta all'uso che fornisce numerosi vantaggi, tra cui:

  • Interfaccia web UI Redis Commander inclusa.
  • Dimensione e trasferimenti di messaggi illimitati.
  • Aggiornamenti semplificati con un solo clic.
  • Prestazioni ottimali e sicurezza robusta grazie a una VM privata e dedicata.

Risparmia tempo e semplifica la tua vita: bastano 5 minuti per provare la soluzione Valkey cloud hosting di Stackhero!

Per integrare senza problemi la tua applicazione con Valkey, considera l'uso della libreria redis per Python. Questa libreria è ben adatta per connettersi a Valkey, un fork open-source di Redis. Puoi installarla facilmente eseguendo:

pip install redis
pip freeze > requirements.txt

Esploriamo come puoi collegare la tua applicazione Python a Valkey. Ecco un esempio semplice utilizzando le impostazioni predefinite:

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Per una maggiore sicurezza, è consigliabile gestire le credenziali utilizzando variabili d'ambiente. Ecco come puoi farlo:

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_VALKEY_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Assicurati che le tue variabili d'ambiente includano una voce come questa: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.

Sfruttare la funzionalità Publish/Subscribe (Pub/Sub) in Python con Valkey è semplice. Ecco un esempio:

import redis

# Connettersi a Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Pubblicare un messaggio sul canale "test"
r.publish('test', 'Questo è un messaggio di test')

# Ottenere il primo messaggio disponibile dal canale "test"
p.get_message()

# Disiscriversi dal canale "test"
p.unsubscribe('test')

Espandi le tue capacità Pub/Sub con questi esempi avanzati:

# Creare un'istanza PubSub e ignorare i messaggi di iscrizione
p = r.pubsub(ignore_subscribe_messages=True)

# Iscriversi a più canali
p.subscribe('test-1', 'test-2', ...)

# Disiscriversi da più canali
p.unsubscribe('test-1', 'test-2', ...)

# Puoi anche usare "unsubscribe" senza argomenti, per disconnetterti da tutti i canali sottoscritti
p.unsubscribe()

# Iscriversi a canali usando un pattern
p.psubscribe('my-*')

# Disiscriversi da canali usando un pattern
p.punsubscribe('my-*')

L'errore redis.exceptions.ConnectionError: Connection closed by server può verificarsi se la tua app non interagisce con Valkey per un periodo, portando a una disconnessione automatica. Per evitare ciò, puoi impostare il parametro health_check_interval come mostrato di seguito:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Quando si utilizza la funzionalità Pub/Sub di Valkey, la libreria redis-py si aspetta che funzioni come get_message() o listen() vengano chiamate più frequentemente dell'intervallo health_check_interval. Nel nostro esempio, abbiamo impostato questo intervallo a 10 secondi, quindi assicurati di chiamare get_message() o listen() almeno una volta ogni 10 secondi (consulta la documentazione ufficiale di redis-py).

Se ciò non viene fatto, potresti incontrare lo stesso errore di connessione. Per evitare ciò, considera di chiamare regolarmente check_health().

Ecco come puoi implementarlo:

import redis
import threading

# Connettersi a Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Creare una funzione che chiamerà `check_health` ogni 5 secondi
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Chiamare la funzione valkey_auto_check
valkey_auto_check(p)