Redis®*: Verwendung mit Python
Wie man Redis mit Python verbindet
👋 Willkommen in der Stackhero-Dokumentation!
Stackhero bietet eine einsatzbereite Redis Cloud Lösung mit zahlreichen Vorteilen, darunter:
Redis CommanderWeb-UI inklusive.- Unbegrenzte Nachrichtengröße und Übertragungen.
- Mühelose Updates mit nur einem Klick.
- Optimale Performance und robuste Sicherheit durch eine private und dedizierte VM.
Sparen Sie Zeit und vereinfachen Sie Ihr Leben: Es dauert nur 5 Minuten, um die Redis Cloud Hosting Lösung von Stackhero auszuprobieren!
Die richtige Python-Bibliothek zur Verbindung mit Redis wählen
Um Ihre Python-Anwendung nahtlos mit einer Redis-Instanz zu verbinden, ist die redis-Bibliothek eine beliebte und effiziente Wahl. Sie können beginnen, indem Sie sie mit folgendem Vorschlag installieren:
pip install redis
pip freeze > requirements.txt
Python mit Redis verbinden
Lassen Sie uns ein einfaches Beispiel mit Standardwerten erkunden, das den meisten Anforderungen gerecht werden sollte:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Um die Sicherheit zu erhöhen, sollten Sie in Betracht ziehen, Umgebungsvariablen für Ihre Anmeldedaten zu verwenden. So können Sie es tun:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Stellen Sie sicher, dass Ihre Umgebungsvariablen eine Definition wie diese enthalten: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Verwendung von Pub/Sub mit Redis und Python
Die Publish/Subscribe-Funktion von Redis kann einfach mit Python genutzt werden. Hier ist ein einfaches Beispiel, um Sie zu leiten:
import redis
# Verbindung zu Redis herstellen
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Erstellen einer PubSub-Instanz
p = r.pubsub()
# Abonnieren des Kanals "test"
p.subscribe('test')
# Eine Nachricht an den Kanal "test" senden
r.publish('test', 'Dies ist eine Testnachricht')
# Die erste verfügbare Nachricht vom Kanal "test" abrufen
p.get_message()
# Vom Kanal "test" abmelden
p.unsubscribe('test')
Erweiterte Redis Pub/Sub-Beispiele mit Python
Sie möchten möglicherweise fortgeschrittenere Pub/Sub-Techniken mit den folgenden Beispielen erkunden:
# Erstellen einer PubSub-Instanz und Abonnementnachrichten ignorieren
p = r.pubsub(ignore_subscribe_messages=True)
# Abonnieren mehrerer Kanäle
p.subscribe('test-1', 'test-2', ...)
# Von mehreren Kanälen abmelden
p.unsubscribe('test-1', 'test-2', ...)
# Sie können auch "unsubscribe" ohne Argumente verwenden, um sich von allen abonnierten Kanälen abzumelden
p.unsubscribe()
# Kanäle mit einem Muster abonnieren
p.psubscribe('my-*')
# Von Kanälen mit einem Muster abmelden
p.punsubscribe('my-*')
Vermeidung des Fehlers "Connection Closed by Server" in Redis und Python
Sie könnten auf den Fehler redis.exceptions.ConnectionError: Connection closed by server stoßen, der aufgrund von Inaktivität in Ihrer Python-Anwendung auftreten kann, was dazu führt, dass die Verbindung automatisch geschlossen wird. Wenn Ihre App versucht, sich erneut zu verbinden, kann dies fehlschlagen und zu diesem Fehler führen.
Um dies zu mildern, sollten Sie den Parameter health_check_interval in Ihrer Redis-Verbindung wie folgt festlegen:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Bei der Verwendung der Pub/Sub-Funktion von Redis stellen Sie sicher, dass Ihre Anwendung get_message() oder listen() häufiger aufruft als das angegebene health_check_interval (in diesem Beispiel alle 10 Sekunden). Sie können die offizielle redis-py-Dokumentation für weitere Details konsultieren.
Wenn diese Aufrufe nicht innerhalb des Intervalls erfolgen, könnten Sie weiterhin auf den Fehler "Connection closed by server" stoßen. Eine praktische Lösung ist die regelmäßige Verwendung der Funktion check_health().
Hier ist, wie Sie es implementieren können:
import redis
import threading
# Verbindung zu Redis herstellen
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Erstellen einer PubSub-Instanz
p = r.pubsub()
# Abonnieren des Kanals "test"
p.subscribe('test')
# Erstellen einer Funktion, die `check_health` alle 5 Sekunden aufruft
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Die Funktion redis_auto_check aufrufen
redis_auto_check(p)