Redis®*: Naudojimas su Python
Kaip sujungti Redis su Python
👋 Sveiki atvykę į Stackhero dokumentaciją!
Stackhero siūlo paruoštą naudoti Redis cloud sprendimą, kuris suteikia daugybę privalumų, įskaitant:
- Įtraukta
Redis Commanderweb UI.- Neribotas žinučių dydis ir perdavimai.
- Paprasti atnaujinimai vienu spustelėjimu.
- Optimali veikla ir stiprus saugumas, užtikrinamas privačiu ir dedikuotu VM.
Taupykite laiką ir supaprastinkite savo gyvenimą: tereikia 5 minučių, kad išbandytumėte Stackhero Redis cloud hosting sprendimą!
Tinkamos Python bibliotekos pasirinkimas jungiantis prie Redis
Norint sklandžiai sujungti jūsų Python aplikaciją su Redis instancija, redis biblioteka yra populiarus ir efektyvus pasirinkimas. Galite pradėti ją įdiegdami su šiuo pasiūlymu:
pip install redis
pip freeze > requirements.txt
Python sujungimas su Redis
Išnagrinėkime paprastą pavyzdį, naudojant numatytąsias reikšmes, kurios turėtų atitikti daugumą poreikių:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Norėdami padidinti saugumą, apsvarstykite galimybę naudoti aplinkos kintamuosius savo kredencialams. Štai kaip tai galite padaryti:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Įsitikinkite, kad jūsų aplinkos kintamieji apima tokią apibrėžtį: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Pub/Sub naudojimas su Redis ir Python
Redis Publish/Subscribe funkcija gali būti lengvai naudojama su Python. Štai paprastas pavyzdys, kuris padės jums:
import redis
# Prisijungti prie Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Sukurti PubSub instanciją
p = r.pubsub()
# Prenumeruoti kanalą "test"
p.subscribe('test')
# Paskelbti žinutę kanale "test"
r.publish('test', 'Tai yra testinė žinutė')
# Gauti pirmą prieinamą žinutę iš kanalo "test"
p.get_message()
# Atsisakyti prenumeratos kanale "test"
p.unsubscribe('test')
Pažangūs Redis Pub/Sub pavyzdžiai su Python
Galbūt norėsite išnagrinėti pažangesnes Pub/Sub technikas su šiais pavyzdžiais:
# Sukurti PubSub instanciją ir ignoruoti prenumeratos žinutes
p = r.pubsub(ignore_subscribe_messages=True)
# Prenumeruoti kelis kanalus
p.subscribe('test-1', 'test-2', ...)
# Atsisakyti prenumeratos keliuose kanaluose
p.unsubscribe('test-1', 'test-2', ...)
# Taip pat galite naudoti "unsubscribe" be argumentų, kad atsijungtumėte nuo visų prenumeruotų kanalų
p.unsubscribe()
# Prenumeruoti kanalus naudojant šabloną
p.psubscribe('my-*')
# Atsisakyti prenumeratos kanaluose naudojant šabloną
p.punsubscribe('my-*')
Kaip išvengti "Connection Closed by Server" klaidos Redis ir Python
Galite susidurti su redis.exceptions.ConnectionError: Connection closed by server klaida, kuri gali atsirasti dėl jūsų Python aplikacijos neveiklumo, dėl ko ryšys automatiškai uždaromas. Kai jūsų programa bando prisijungti iš naujo, ji gali nepavykti, sukeldama šią klaidą.
Norėdami tai sumažinti, apsvarstykite galimybę nustatyti health_check_interval parametrą jūsų Redis ryšyje taip:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Naudojant Redis Pub/Sub funkciją, įsitikinkite, kad jūsų programa kviečia get_message() arba listen() dažniau nei nurodytas health_check_interval (šiame pavyzdyje, kas 10 sekundžių). Galite kreiptis į redis-py oficialią dokumentaciją dėl daugiau detalių.
Jei šie kvietimai nėra atliekami per intervalą, vis tiek galite susidurti su "Connection closed by server" klaida. Praktinis sprendimas yra reguliariai naudoti check_health() funkciją.
Štai kaip galite tai įgyvendinti:
import redis
import threading
# Prisijungti prie Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Sukurti PubSub instanciją
p = r.pubsub()
# Prenumeruoti kanalą "test"
p.subscribe('test')
# Sukurti funkciją, kuri kvies `check_health` kas 5 sekundes
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Kviesti redis_auto_check funkciją
redis_auto_check(p)