Valkey: Utilizar com Python

Como conectar Valkey com Python

👋 Bem-vindo à documentação do Stackhero!

A Stackhero oferece uma solução Valkey cloud pronta a usar que proporciona uma série de benefícios, incluindo:

  • Interface web UI Redis Commander incluída.
  • Tamanho e transferências de mensagens ilimitados.
  • Atualizações simplificadas com apenas um clique.
  • Desempenho ótimo e segurança robusta alimentados por uma VM privada e dedicada.

Poupe tempo e simplifique a sua vida: são necessários apenas 5 minutos para experimentar a solução Valkey cloud hosting da Stackhero!

Para integrar a sua aplicação com Valkey de forma fluida, considere usar a biblioteca redis para Python. Esta biblioteca é adequada para conectar-se ao Valkey, um fork open-source do Redis. Pode instalá-la facilmente executando:

pip install redis
pip freeze > requirements.txt

Vamos explorar como pode conectar a sua aplicação Python ao Valkey. Aqui está um exemplo simples usando as configurações padrão:

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Para maior segurança, é aconselhável gerir as credenciais usando variáveis de ambiente. Veja como pode fazer isso:

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_VALKEY_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Certifique-se de que as suas variáveis de ambiente incluam uma entrada como esta: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.

Aproveitar a funcionalidade Publish/Subscribe (Pub/Sub) em Python com Valkey é simples. Aqui está um exemplo:

import redis

# Conectar ao Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# Criar uma instância PubSub
p = r.pubsub()

# Subscrever ao canal "test"
p.subscribe('test')

# Publicar uma mensagem no canal "test"
r.publish('test', 'Esta é uma mensagem de teste')

# Obter a primeira mensagem disponível do canal "test"
p.get_message()

# Cancelar a subscrição do canal "test"
p.unsubscribe('test')

Expanda as suas capacidades de Pub/Sub com estes exemplos avançados:

# Criar uma instância PubSub e ignorar mensagens de subscrição
p = r.pubsub(ignore_subscribe_messages=True)

# Subscrever a múltiplos canais
p.subscribe('test-1', 'test-2', ...)

# Cancelar a subscrição de múltiplos canais
p.unsubscribe('test-1', 'test-2', ...)

# Também pode usar "unsubscribe" sem argumentos para se desconectar de todos os canais subscritos
p.unsubscribe()

# Subscrever a canais usando um padrão
p.psubscribe('my-*')

# Cancelar a subscrição de canais usando um padrão
p.punsubscribe('my-*')

O erro redis.exceptions.ConnectionError: Connection closed by server pode ocorrer se a sua aplicação não interagir com o Valkey por um período, levando a uma desconexão automática. Para evitar isso, pode definir o parâmetro health_check_interval como mostrado abaixo:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Ao usar a funcionalidade Pub/Sub do Valkey, a biblioteca redis-py espera que funções como get_message() ou listen() sejam chamadas mais frequentemente do que o intervalo health_check_interval. No nosso exemplo, definimos este intervalo para 10 segundos, por isso, certifique-se de chamar get_message() ou listen() pelo menos uma vez a cada 10 segundos (consulte a documentação oficial do redis-py).

Se isso não for feito, pode encontrar o mesmo erro de conexão. Para evitar isso, considere chamar regularmente check_health().

Aqui está como pode implementá-lo:

import redis
import threading

# Conectar ao Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Criar uma instância PubSub
p = r.pubsub()

# Subscrever ao canal "test"
p.subscribe('test')

# Criar uma função que chamará `check_health` a cada 5 segundos
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Chamar a função valkey_auto_check
valkey_auto_check(p)