Redis®*: Używanie z Pythonem
Jak połączyć Redis z Pythonem
👋 Witamy w dokumentacji Stackhero!
Stackhero oferuje gotowe do użycia rozwiązanie Redis cloud, które zapewnia wiele korzyści, w tym:
- Włączony web UI
Redis Commander.- Nieograniczona wielkość i transfer wiadomości.
- Bezproblemowe aktualizacje za pomocą jednego kliknięcia.
- Optymalna wydajność i solidne zabezpieczenia dzięki prywatnej i dedykowanej VM.
Oszczędzaj czas i upraszczaj sobie życie: wystarczy 5 minut, aby wypróbować rozwiązanie hostingu Redis cloud Stackhero!
Wybór odpowiedniej biblioteki Python do połączenia z Redis
Aby płynnie połączyć aplikację Python z instancją Redis, biblioteka redis jest popularnym i efektywnym wyborem. Możesz zacząć od zainstalowania jej za pomocą następującej sugestii:
pip install redis
pip freeze > requirements.txt
Łączenie Pythona z Redis
Przyjrzyjmy się prostemu przykładowi używającemu wartości domyślnych, które powinny spełniać większość potrzeb:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Aby zwiększyć bezpieczeństwo, rozważ użycie zmiennych środowiskowych dla swoich danych uwierzytelniających. Oto jak można to zrobić:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Upewnij się, że Twoje zmienne środowiskowe zawierają definicję taką jak: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Używanie Pub/Sub z Redis i Python
Funkcjonalność Publish/Subscribe w Redis może być łatwo wykorzystana z Pythonem. Oto prosty przykład, który Cię poprowadzi:
import redis
# Połącz z Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Utwórz instancję PubSub
p = r.pubsub()
# Subskrybuj kanał "test"
p.subscribe('test')
# Opublikuj wiadomość na kanale "test"
r.publish('test', 'To jest wiadomość testowa')
# Pobierz pierwszą dostępną wiadomość z kanału "test"
p.get_message()
# Anuluj subskrypcję kanału "test"
p.unsubscribe('test')
Zaawansowane przykłady Redis Pub/Sub z Pythonem
Możesz chcieć zbadać bardziej zaawansowane techniki Pub/Sub za pomocą poniższych przykładów:
# Utwórz instancję PubSub i ignoruj wiadomości subskrypcyjne
p = r.pubsub(ignore_subscribe_messages=True)
# Subskrybuj wiele kanałów
p.subscribe('test-1', 'test-2', ...)
# Anuluj subskrypcję wielu kanałów
p.unsubscribe('test-1', 'test-2', ...)
# Możesz również użyć "unsubscribe" bez argumentów, aby odłączyć się od wszystkich subskrybowanych kanałów
p.unsubscribe()
# Subskrybuj kanały używając wzorca
p.psubscribe('my-*')
# Anuluj subskrypcję kanałów używając wzorca
p.punsubscribe('my-*')
Unikanie błędu "Connection Closed by Server" w Redis i Python
Możesz napotkać błąd redis.exceptions.ConnectionError: Connection closed by server, który może wystąpić z powodu bezczynności w Twojej aplikacji Python, powodując automatyczne zamknięcie połączenia. Gdy Twoja aplikacja próbuje się ponownie połączyć, może się nie udać, co skutkuje tym błędem.
Aby to złagodzić, rozważ ustawienie parametru health_check_interval w swoim połączeniu Redis w następujący sposób:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Podczas korzystania z funkcji Pub/Sub Redis, upewnij się, że Twoja aplikacja wywołuje get_message() lub listen() częściej niż określony health_check_interval (w tym przykładzie co 10 sekund). Możesz odnieść się do oficjalnej dokumentacji redis-py po więcej szczegółów.
Jeśli te wywołania nie są wykonywane w tym przedziale, nadal możesz napotkać błąd "Connection closed by server". Praktycznym rozwiązaniem jest regularne używanie funkcji check_health().
Oto jak można to zaimplementować:
import redis
import threading
# Połącz z Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Utwórz instancję PubSub
p = r.pubsub()
# Subskrybuj kanał "test"
p.subscribe('test')
# Utwórz funkcję, która będzie wywoływać `check_health` co 5 sekund
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Wywołaj funkcję redis_auto_check
redis_auto_check(p)