Redis®*: Using with Python
How to connect Redis with Python
👋 Welcome to the Stackhero documentation!
Stackhero offers a ready-to-use Redis cloud solution that provides a host of benefits, including:
Redis Commanderweb UI included.- Unlimited message size and transfers.
- Effortless updates with just a click.
- Optimal performance and robust security powered by a private and dedicated VM.
Save time and simplify your life: it only takes 5 minutes to try Stackhero's Redis cloud hosting solution!
Choosing the right Python library to connect to Redis
To seamlessly connect your Python application to a Redis instance, the redis library is a popular and efficient choice. You can get started by installing it with the following suggestion:
pip install redis
pip freeze > requirements.txt
Connecting Python to Redis
Let us explore a straightforward example using default values that should cater to most needs:
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
To enhance security, consider using environment variables for your credentials. Here is how you can do it:
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Ensure your environment variables include a definition like: STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Using Pub/Sub with Redis and Python
Redis' Publish/Subscribe feature can be easily utilized with Python. Here is a simple example to guide you:
import redis
# Connect to Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Create a PubSub instance
p = r.pubsub()
# Subscribe to the channel "test"
p.subscribe('test')
# Publish a message to the channel "test"
r.publish('test', 'This is a test message')
# Get the first available message from channel "test"
p.get_message()
# Unsubscribe from channel "test"
p.unsubscribe('test')
Advanced Redis Pub/Sub examples with Python
You might want to explore more advanced Pub/Sub techniques with the following examples:
# Create a PubSub instance and ignore subscribe messages
p = r.pubsub(ignore_subscribe_messages=True)
# Subscribe to multiple channels
p.subscribe('test-1', 'test-2', ...)
# Unsubscribe from multiple channels
p.unsubscribe('test-1', 'test-2', ...)
# You can also use "unsubscribe" with no arguments to disconnect from all subscribed channels
p.unsubscribe()
# Subscribe to channels using a pattern
p.psubscribe('my-*')
# Unsubscribe from channels using a pattern
p.punsubscribe('my-*')
Avoiding the "Connection Closed by Server" error in Redis and Python
You might encounter the redis.exceptions.ConnectionError: Connection closed by server error, which can occur due to inactivity in your Python application, causing the connection to close automatically. When your app tries to reconnect, it may fail, resulting in this error.
To mitigate this, consider setting the health_check_interval parameter in your Redis connection like this:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
When using Redis' Pub/Sub feature, ensure that your application calls get_message() or listen() more frequently than the specified health_check_interval (in this example, every 10 seconds). You can refer to the redis-py official documentation for more details.
If these calls are not made within the interval, you may still encounter the "Connection closed by server" error. A practical solution is to use the check_health() function regularly.
Here is how you can implement it:
import redis
import threading
# Connect to Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Create a PubSub instance
p = r.pubsub()
# Subscribe to the channel "test"
p.subscribe('test')
# Create a function that will call `check_health` every 5 seconds
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Call the redis_auto_check function
redis_auto_check(p)