Redis®*: Utilizzo con Python

Come connettere Redis con Python

👋 Benvenuti nella documentazione di Stackhero!

Stackhero offre una soluzione Redis cloud pronta all'uso che fornisce numerosi vantaggi, tra cui:

  • Interfaccia web Redis Commander inclusa.
  • Dimensione e trasferimenti di messaggi illimitati.
  • Aggiornamenti senza sforzo con un solo clic.
  • Prestazioni ottimali e sicurezza robusta grazie a una VM privata e dedicata.

Risparmia tempo e semplifica la tua vita: bastano solo 5 minuti per provare la soluzione di hosting Redis cloud di Stackhero!

Per connettere senza problemi la tua applicazione Python a un'istanza Redis, la libreria redis è una scelta popolare ed efficiente. Puoi iniziare installandola con il seguente suggerimento:

pip install redis
pip freeze > requirements.txt

Esploriamo un esempio semplice utilizzando valori predefiniti che dovrebbero soddisfare la maggior parte delle esigenze:

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Per migliorare la sicurezza, considera l'uso di variabili d'ambiente per le tue credenziali. Ecco come puoi farlo:

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_REDIS_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Assicurati che le tue variabili d'ambiente includano una definizione come: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>

La funzionalità Publish/Subscribe di Redis può essere facilmente utilizzata con Python. Ecco un esempio semplice per guidarti:

import redis

# Connettersi a Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Pubblicare un messaggio sul canale "test"
r.publish('test', 'Questo è un messaggio di test')

# Ottenere il primo messaggio disponibile dal canale "test"
p.get_message()

# Annullare l'iscrizione dal canale "test"
p.unsubscribe('test')

Potresti voler esplorare tecniche Pub/Sub più avanzate con i seguenti esempi:

# Creare un'istanza PubSub e ignorare i messaggi di iscrizione
p = r.pubsub(ignore_subscribe_messages=True)

# Iscriversi a più canali
p.subscribe('test-1', 'test-2', ...)

# Annullare l'iscrizione da più canali
p.unsubscribe('test-1', 'test-2', ...)

# Puoi anche usare "unsubscribe" senza argomenti per disconnetterti da tutti i canali sottoscritti
p.unsubscribe()

# Iscriversi a canali usando un pattern
p.psubscribe('my-*')

# Annullare l'iscrizione da canali usando un pattern
p.punsubscribe('my-*')

Potresti incontrare l'errore redis.exceptions.ConnectionError: Connection closed by server, che può verificarsi a causa dell'inattività della tua applicazione Python, causando la chiusura automatica della connessione. Quando la tua app tenta di riconnettersi, potrebbe fallire, risultando in questo errore.

Per mitigare questo, considera di impostare il parametro health_check_interval nella tua connessione Redis come segue:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Quando utilizzi la funzionalità Pub/Sub di Redis, assicurati che la tua applicazione chiami get_message() o listen() più frequentemente dell'intervallo specificato da health_check_interval (in questo esempio, ogni 10 secondi). Puoi fare riferimento alla documentazione ufficiale di redis-py per maggiori dettagli.

Se queste chiamate non vengono effettuate entro l'intervallo, potresti ancora incontrare l'errore "Connection closed by server". Una soluzione pratica è utilizzare regolarmente la funzione check_health().

Ecco come puoi implementarla:

import redis
import threading

# Connettersi a Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Creare una funzione che chiamerà `check_health` ogni 5 secondi
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Chiamare la funzione redis_auto_check
redis_auto_check(p)