Redis®*: Utilizar com Python
Como conectar o Redis com Python
👋 Bem-vindo à documentação da Stackhero!
A Stackhero oferece uma solução Redis cloud pronta a usar que proporciona uma série de benefícios, incluindo:
- Interface web
Redis Commanderincluída.- Tamanho e transferências de mensagens ilimitados.
- Atualizações simplificadas com apenas um clique.
- Desempenho ótimo e segurança robusta garantidos por uma VM privada e dedicada.
Poupe tempo e simplifique a sua vida: são necessários apenas 5 minutos para experimentar a solução de hospedagem Redis cloud da Stackhero!
Escolher a biblioteca Python certa para se conectar ao Redis
Para conectar a sua aplicação Python a uma instância Redis de forma fluida, a biblioteca redis é uma escolha popular e eficiente. Pode começar por instalá-la com a seguinte sugestão:
pip install redis
pip freeze > requirements.txt
Conectar Python ao Redis
Vamos explorar um exemplo simples usando valores padrão que devem atender à maioria das necessidades:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Para aumentar a segurança, considere usar variáveis de ambiente para as suas credenciais. Veja como pode fazer isso:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Certifique-se de que as suas variáveis de ambiente incluem uma definição como: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Usar Pub/Sub com Redis e Python
A funcionalidade Publish/Subscribe do Redis pode ser facilmente utilizada com Python. Aqui está um exemplo simples para o guiar:
import redis
# Conectar ao Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Criar uma instância PubSub
p = r.pubsub()
# Subscrever ao canal "test"
p.subscribe('test')
# Publicar uma mensagem no canal "test"
r.publish('test', 'Esta é uma mensagem de teste')
# Obter a primeira mensagem disponível do canal "test"
p.get_message()
# Cancelar a subscrição do canal "test"
p.unsubscribe('test')
Exemplos avançados de Redis Pub/Sub com Python
Pode querer explorar técnicas Pub/Sub mais avançadas com os seguintes exemplos:
# Criar uma instância PubSub e ignorar mensagens de subscrição
p = r.pubsub(ignore_subscribe_messages=True)
# Subscrever a múltiplos canais
p.subscribe('test-1', 'test-2', ...)
# Cancelar a subscrição de múltiplos canais
p.unsubscribe('test-1', 'test-2', ...)
# Também pode usar "unsubscribe" sem argumentos para se desconectar de todos os canais subscritos
p.unsubscribe()
# Subscrever a canais usando um padrão
p.psubscribe('my-*')
# Cancelar a subscrição de canais usando um padrão
p.punsubscribe('my-*')
Evitar o erro "Connection Closed by Server" no Redis e Python
Pode encontrar o erro redis.exceptions.ConnectionError: Connection closed by server, que pode ocorrer devido à inatividade na sua aplicação Python, causando o encerramento automático da conexão. Quando a sua aplicação tenta reconectar, pode falhar, resultando neste erro.
Para mitigar isso, considere definir o parâmetro health_check_interval na sua conexão Redis da seguinte forma:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Ao usar a funcionalidade Pub/Sub do Redis, certifique-se de que a sua aplicação chama get_message() ou listen() mais frequentemente do que o intervalo especificado por health_check_interval (neste exemplo, a cada 10 segundos). Pode consultar a documentação oficial do redis-py para mais detalhes.
Se estas chamadas não forem feitas dentro do intervalo, ainda pode encontrar o erro "Connection closed by server". Uma solução prática é usar regularmente a função check_health().
Aqui está como pode implementá-la:
import redis
import threading
# Conectar ao Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Criar uma instância PubSub
p = r.pubsub()
# Subscrever ao canal "test"
p.subscribe('test')
# Criar uma função que chamará `check_health` a cada 5 segundos
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Chamar a função redis_auto_check
redis_auto_check(p)