Redis®*: Gebruik met Python
Hoe Redis met Python te verbinden
👋 Welkom bij de Stackhero-documentatie!
Stackhero biedt een kant-en-klare Redis cloud oplossing die tal van voordelen biedt, waaronder:
Redis Commanderweb UI inbegrepen.- Onbeperkte berichtgrootte en overdrachten.
- Moeiteloze updates met slechts één klik.
- Optimale prestaties en robuuste beveiliging aangedreven door een privé en dedicated VM.
Bespaar tijd en vereenvoudig uw leven: het kost slechts 5 minuten om de Redis cloud hosting oplossing van Stackhero te proberen!
De juiste Python-bibliotheek kiezen om verbinding te maken met Redis
Om uw Python-applicatie naadloos te verbinden met een Redis-instantie, is de redis-bibliotheek een populaire en efficiënte keuze. U kunt beginnen door deze te installeren met de volgende suggestie:
pip install redis
pip freeze > requirements.txt
Python verbinden met Redis
Laten we een eenvoudig voorbeeld verkennen met standaardwaarden die aan de meeste behoeften zouden moeten voldoen:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Om de beveiliging te verbeteren, overweeg om omgevingsvariabelen te gebruiken voor uw inloggegevens. Hier is hoe u dat kunt doen:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Zorg ervoor dat uw omgevingsvariabelen een definitie bevatten zoals: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Pub/Sub gebruiken met Redis en Python
De Publish/Subscribe-functie van Redis kan eenvoudig worden gebruikt met Python. Hier is een eenvoudig voorbeeld om u te begeleiden:
import redis
# Verbinden met Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Maak een PubSub-instantie
p = r.pubsub()
# Abonneren op het kanaal "test"
p.subscribe('test')
# Een bericht publiceren naar het kanaal "test"
r.publish('test', 'Dit is een testbericht')
# Het eerste beschikbare bericht van kanaal "test" ophalen
p.get_message()
# Uitschrijven van kanaal "test"
p.unsubscribe('test')
Geavanceerde Redis Pub/Sub-voorbeelden met Python
U wilt misschien meer geavanceerde Pub/Sub-technieken verkennen met de volgende voorbeelden:
# Maak een PubSub-instantie en negeer abonnementsberichten
p = r.pubsub(ignore_subscribe_messages=True)
# Abonneren op meerdere kanalen
p.subscribe('test-1', 'test-2', ...)
# Uitschrijven van meerdere kanalen
p.unsubscribe('test-1', 'test-2', ...)
# U kunt ook "unsubscribe" gebruiken zonder argumenten om van alle geabonneerde kanalen los te koppelen
p.unsubscribe()
# Abonneren op kanalen met een patroon
p.psubscribe('my-*')
# Uitschrijven van kanalen met een patroon
p.punsubscribe('my-*')
Het vermijden van de "Connection Closed by Server"-fout in Redis en Python
U kunt de fout redis.exceptions.ConnectionError: Connection closed by server tegenkomen, die kan optreden vanwege inactiviteit in uw Python-applicatie, waardoor de verbinding automatisch wordt gesloten. Wanneer uw app probeert opnieuw verbinding te maken, kan dit mislukken, wat resulteert in deze fout.
Om dit te verminderen, overweeg om de parameter health_check_interval in uw Redis-verbinding als volgt in te stellen:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Bij het gebruik van de Pub/Sub-functie van Redis, zorg ervoor dat uw applicatie get_message() of listen() vaker aanroept dan het gespecificeerde health_check_interval (in dit voorbeeld elke 10 seconden). U kunt de officiële documentatie van redis-py raadplegen voor meer details.
Als deze oproepen niet binnen het interval worden gedaan, kunt u nog steeds de "Connection closed by server"-fout tegenkomen. Een praktische oplossing is om regelmatig de functie check_health() te gebruiken.
Hier is hoe u het kunt implementeren:
import redis
import threading
# Verbinden met Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Maak een PubSub-instantie
p = r.pubsub()
# Abonneren op het kanaal "test"
p.subscribe('test')
# Maak een functie die `check_health` elke 5 seconden aanroept
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Roep de functie redis_auto_check aan
redis_auto_check(p)