Valkey: Używanie z Pythonem
Jak połączyć Valkey z Pythonem
👋 Witamy w dokumentacji Stackhero!
Stackhero oferuje gotowe do użycia rozwiązanie Valkey cloud, które zapewnia wiele korzyści, w tym:
- Włączony web UI
Redis Commander.- Nieograniczony rozmiar wiadomości i transfery.
- Bezproblemowe aktualizacje za jednym kliknięciem.
- Optymalna wydajność i solidne zabezpieczenia dzięki prywatnej i dedykowanej VM.
Oszczędzaj czas i uprość sobie życie: wystarczy 5 minut, aby wypróbować rozwiązanie Valkey cloud hosting Stackhero!
Wybór biblioteki Python do integracji z Valkey
Aby płynnie zintegrować swoją aplikację z Valkey, rozważ użycie biblioteki redis dla Pythona. Ta biblioteka jest dobrze dostosowana do łączenia się z Valkey, otwartoźródłowym forkiem Redis. Możesz ją łatwo zainstalować, uruchamiając:
pip install redis
pip freeze > requirements.txt
Łączenie Pythona z Valkey
Przyjrzyjmy się, jak można połączyć aplikację Python z Valkey. Oto prosty przykład z użyciem domyślnych ustawień:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Dla zwiększenia bezpieczeństwa zaleca się zarządzanie poświadczeniami za pomocą zmiennych środowiskowych. Oto jak to zrobić:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_VALKEY_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Upewnij się, że Twoje zmienne środowiskowe zawierają wpis taki jak: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.
Używanie Pub/Sub z Valkey i Pythonem
Wykorzystanie funkcjonalności Publish/Subscribe (Pub/Sub) w Pythonie z Valkey jest proste. Oto przykład:
import redis
# Połącz z Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Utwórz instancję PubSub
p = r.pubsub()
# Subskrybuj kanał "test"
p.subscribe('test')
# Opublikuj wiadomość na kanale "test"
r.publish('test', 'To jest wiadomość testowa')
# Pobierz pierwszą dostępną wiadomość z kanału "test"
p.get_message()
# Anuluj subskrypcję kanału "test"
p.unsubscribe('test')
Zaawansowane przykłady Valkey Pub/Sub z Pythonem
Rozszerz swoje możliwości Pub/Sub za pomocą tych zaawansowanych przykładów:
# Utwórz instancję PubSub i ignoruj wiadomości subskrypcyjne
p = r.pubsub(ignore_subscribe_messages=True)
# Subskrybuj wiele kanałów
p.subscribe('test-1', 'test-2', ...)
# Anuluj subskrypcję wielu kanałów
p.unsubscribe('test-1', 'test-2', ...)
# Możesz również użyć "unsubscribe" bez argumentów, aby rozłączyć się ze wszystkimi subskrybowanymi kanałami
p.unsubscribe()
# Subskrybuj kanały używając wzorca
p.psubscribe('my-*')
# Anuluj subskrypcję kanałów używając wzorca
p.punsubscribe('my-*')
Unikanie błędów "Connection Closed by Server"
Błąd redis.exceptions.ConnectionError: Connection closed by server może wystąpić, jeśli Twoja aplikacja nie wchodzi w interakcję z Valkey przez pewien czas, co prowadzi do automatycznego rozłączenia. Aby tego uniknąć, możesz ustawić parametr health_check_interval jak pokazano poniżej:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Podczas korzystania z funkcji Pub/Sub Valkey, biblioteka redis-py oczekuje, że funkcje takie jak get_message() lub listen() będą wywoływane częściej niż health_check_interval. W naszym przykładzie ustawiliśmy ten interwał na 10 sekund, więc upewnij się, że wywołujesz get_message() lub listen() co najmniej raz na 10 sekund (odwołaj się do oficjalnej dokumentacji redis-py).
Jeśli tego nie zrobisz, możesz napotkać ten sam błąd połączenia. Aby tego uniknąć, rozważ regularne wywoływanie check_health().
Oto jak można to zaimplementować:
import redis
import threading
# Połącz z Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Utwórz instancję PubSub
p = r.pubsub()
# Subskrybuj kanał "test"
p.subscribe('test')
# Utwórz funkcję, która będzie wywoływać `check_health` co 5 sekund
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Wywołaj funkcję valkey_auto_check
valkey_auto_check(p)