Valkey: Using with Python
How to connect Valkey with Python
👋 Welcome to the Stackhero documentation!
Stackhero offers a ready-to-use Valkey cloud solution that provides a host of benefits, including:
Redis Commanderweb UI included.- Unlimited message size and transfers.
- Effortless updates with just a click.
- Optimal performance and robust security powered by a private and dedicated VM.
Save time and simplify your life: it only takes 5 minutes to try Stackhero's Valkey cloud hosting solution!
Choosing a Python library for Valkey integration
To seamlessly integrate your application with Valkey, consider using the redis library for Python. This library is well-suited for connecting to Valkey, an open-source fork of Redis. You can easily install it by running:
pip install redis
pip freeze > requirements.txt
Connecting Python to Valkey
Let us explore how you can connect your Python application to Valkey. Here is a straightforward example using default settings:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
For enhanced security, it is advised to manage credentials using environment variables. Here is how you can do that:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_VALKEY_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Ensure your environment variables include an entry like this: STACKHERO_VALKEY_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>.
Using Pub/Sub with Valkey and Python
Leveraging the Publish/Subscribe (Pub/Sub) functionality in Python with Valkey is straightforward. Here is an example:
import redis
# Connect to Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Create a PubSub instance
p = r.pubsub()
# Subscribe to the channel "test"
p.subscribe('test')
# Publish a message to the channel "test"
r.publish('test', 'This is a test message')
# Get the first available message from channel "test"
p.get_message()
# Unsubscribe from channel "test"
p.unsubscribe('test')
Advanced examples of Valkey Pub/Sub with Python
Expand your Pub/Sub capabilities with these advanced examples:
# Create a PubSub instance and ignore subscribe messages
p = r.pubsub(ignore_subscribe_messages=True)
# Subscribe to multiple channels
p.subscribe('test-1', 'test-2', ...)
# Unsubscribe from multiple channels
p.unsubscribe('test-1', 'test-2', ...)
# You can also use "unsubscribe" with no arguments, to disconnect from all subscribed channels
p.unsubscribe()
# Subscribe to channels using a pattern
p.psubscribe('my-*')
# Unsubscribe from channels using a pattern
p.punsubscribe('my-*')
Avoiding "Connection Closed by Server" errors
The error redis.exceptions.ConnectionError: Connection closed by server may occur if your app does not interact with Valkey for a period, leading to an automatic disconnection. To prevent this, you can set the health_check_interval parameter as shown below:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
When using Valkey's Pub/Sub feature, the redis-py library expects functions like get_message() or listen() to be called more frequently than the health_check_interval. In our example, we have set this interval to 10 seconds, so make sure to call get_message() or listen() at least once every 10 seconds (refer to the redis-py official documentation).
If this is not done, you might encounter the same connection error. To avoid this, consider regularly calling check_health().
Here is how you can implement it:
import redis
import threading
# Connect to Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Create a PubSub instance
p = r.pubsub()
# Subscribe to the channel "test"
p.subscribe('test')
# Create a function that will call `check_health` every 5 seconds
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Call the valkey_auto_check function
valkey_auto_check(p)