Valkey: Gebruik met Python
Hoe Valkey met Python te verbinden
👋 Welkom bij de Stackhero-documentatie!
Stackhero biedt een kant-en-klare Valkey cloud oplossing die tal van voordelen biedt, waaronder:
Redis Commanderweb UI inbegrepen.- Onbeperkte berichtgrootte en overdrachten.
- Moeiteloze updates met slechts één klik.
- Optimale prestaties en robuuste beveiliging aangedreven door een privé en dedicated VM.
Bespaar tijd en vereenvoudig uw leven: het kost slechts 5 minuten om de Valkey cloud hosting oplossing van Stackhero te proberen!
Een Python-bibliotheek kiezen voor Valkey-integratie
Om uw applicatie naadloos met Valkey te integreren, overweeg het gebruik van de redis-bibliotheek voor Python. Deze bibliotheek is goed geschikt voor verbinding met Valkey, een open-source fork van Redis. U kunt het eenvoudig installeren door het volgende uit te voeren:
pip install redis
pip freeze > requirements.txt
Python verbinden met Valkey
Laten we verkennen hoe u uw Python-applicatie met Valkey kunt verbinden. Hier is een eenvoudig voorbeeld met standaardinstellingen:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Voor verbeterde beveiliging wordt geadviseerd om inloggegevens te beheren met behulp van omgevingsvariabelen. Hier is hoe u dat kunt doen:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_VALKEY_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Zorg ervoor dat uw omgevingsvariabelen een invoer bevatten zoals deze: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.
Pub/Sub gebruiken met Valkey en Python
Het benutten van de Publish/Subscribe (Pub/Sub) functionaliteit in Python met Valkey is eenvoudig. Hier is een voorbeeld:
import redis
# Verbinden met Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Maak een PubSub-instantie
p = r.pubsub()
# Abonneer op het kanaal "test"
p.subscribe('test')
# Publiceer een bericht naar het kanaal "test"
r.publish('test', 'Dit is een testbericht')
# Haal het eerste beschikbare bericht van kanaal "test" op
p.get_message()
# Uitschrijven van kanaal "test"
p.unsubscribe('test')
Geavanceerde voorbeelden van Valkey Pub/Sub met Python
Breid uw Pub/Sub-mogelijkheden uit met deze geavanceerde voorbeelden:
# Maak een PubSub-instantie en negeer abonnementsberichten
p = r.pubsub(ignore_subscribe_messages=True)
# Abonneer op meerdere kanalen
p.subscribe('test-1', 'test-2', ...)
# Uitschrijven van meerdere kanalen
p.unsubscribe('test-1', 'test-2', ...)
# U kunt ook "unsubscribe" zonder argumenten gebruiken om van alle geabonneerde kanalen los te koppelen
p.unsubscribe()
# Abonneer op kanalen met een patroon
p.psubscribe('my-*')
# Uitschrijven van kanalen met een patroon
p.punsubscribe('my-*')
Vermijden van "Connection Closed by Server" fouten
De fout redis.exceptions.ConnectionError: Connection closed by server kan optreden als uw app niet gedurende een bepaalde periode met Valkey communiceert, wat leidt tot een automatische ontkoppeling. Om dit te voorkomen, kunt u de parameter health_check_interval instellen zoals hieronder weergegeven:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Bij het gebruik van de Pub/Sub-functie van Valkey verwacht de redis-py-bibliotheek dat functies zoals get_message() of listen() vaker worden aangeroepen dan het health_check_interval. In ons voorbeeld hebben we dit interval ingesteld op 10 seconden, dus zorg ervoor dat u get_message() of listen() ten minste één keer per 10 seconden aanroept (raadpleeg de officiële documentatie van redis-py).
Als dit niet wordt gedaan, kunt u dezelfde verbindingsfout tegenkomen. Om dit te vermijden, overweeg regelmatig check_health() aan te roepen.
Hier is hoe u het kunt implementeren:
import redis
import threading
# Verbinden met Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Maak een PubSub-instantie
p = r.pubsub()
# Abonneer op het kanaal "test"
p.subscribe('test')
# Maak een functie die `check_health` elke 5 seconden aanroept
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Roep de functie valkey_auto_check aan
valkey_auto_check(p)