Valkey: Using with Python

How to connect Valkey with Python

👋 Welcome to the Stackhero documentation!

Stackhero offers a ready-to-use Valkey cloud solution that provides a host of benefits, including:

  • Redis Commander web UI included.
  • Unlimited message size and transfers.
  • Effortless updates with just a click.
  • Optimal performance and robust security powered by a private and dedicated VM.

Save time and simplify your life: it only takes 5 minutes to try Stackhero's Valkey cloud hosting solution!

To seamlessly integrate your application with Valkey, consider using the redis library for Python. This library is well-suited for connecting to Valkey, an open-source fork of Redis. You can easily install it by running:

pip install redis
pip freeze > requirements.txt

Let us explore how you can connect your Python application to Valkey. Here is a straightforward example using default settings:

import redis

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

For enhanced security, it is advised to manage credentials using environment variables. Here is how you can do that:

import os
import redis

r = redis.from_url(
  os.environ.get("STACKHERO_VALKEY_URL_TLS"),
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Ensure your environment variables include an entry like this: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.

Leveraging the Publish/Subscribe (Pub/Sub) functionality in Python with Valkey is straightforward. Here is an example:

import redis

# Connect to Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  socket_keepalive=True,
  retry_on_timeout=True
)

# Create a PubSub instance
p = r.pubsub()

# Subscribe to the channel "test"
p.subscribe('test')

# Publish a message to the channel "test"
r.publish('test', 'This is a test message')

# Get the first available message from channel "test"
p.get_message()

# Unsubscribe from channel "test"
p.unsubscribe('test')

Expand your Pub/Sub capabilities with these advanced examples:

# Create a PubSub instance and ignore subscribe messages
p = r.pubsub(ignore_subscribe_messages=True)

# Subscribe to multiple channels
p.subscribe('test-1', 'test-2', ...)

# Unsubscribe from multiple channels
p.unsubscribe('test-1', 'test-2', ...)

# You can also use "unsubscribe" with no arguments, to disconnect from all subscribed channels
p.unsubscribe()

# Subscribe to channels using a pattern
p.psubscribe('my-*')

# Unsubscribe from channels using a pattern
p.punsubscribe('my-*')

The error redis.exceptions.ConnectionError: Connection closed by server may occur if your app does not interact with Valkey for a period, leading to an automatic disconnection. To prevent this, you can set the health_check_interval parameter as shown below:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

When using Valkey's Pub/Sub feature, the redis-py library expects functions like get_message() or listen() to be called more frequently than the health_check_interval. In our example, we have set this interval to 10 seconds, so make sure to call get_message() or listen() at least once every 10 seconds (refer to the redis-py official documentation).

If this is not done, you might encounter the same connection error. To avoid this, consider regularly calling check_health().

Here is how you can implement it:

import redis
import threading

# Connect to Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Create a PubSub instance
p = r.pubsub()

# Subscribe to the channel "test"
p.subscribe('test')

# Create a function that will call `check_health` every 5 seconds
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Call the valkey_auto_check function
valkey_auto_check(p)