RabbitMQ: Pierwsze kroki

Jak używać Stackhero dla RabbitMQ

👋 Witamy w dokumentacji Stackhero!

Stackhero oferuje gotowe do użycia rozwiązanie RabbitMQ cloud, które zapewnia wiele korzyści, w tym:

  • Pełny dostęp do web UI RabbitMQ do zarządzania użytkownikami, vhostami i uprawnieniami.
  • Nieograniczone kolejki bez ograniczeń czasu przechowywania.
  • Obsługa protokołów AMQP, MQTT, STOMP i WebSocket.
  • Wiele wtyczek w zestawie, takich jak Delayed Message Exchange, Message Deduplication i Consistent-hash Exchange.
  • Bezproblemowe aktualizacje za pomocą jednego kliknięcia.
  • Optymalna wydajność i solidne zabezpieczenia dzięki prywatnej i dedykowanej VM.

Oszczędzaj czas i upraszczaj swoje życie: wystarczy 5 minut, aby wypróbować rozwiązanie hostingu w chmurze RabbitMQ Stackhero!

Ten przykład pokazuje, jak używać biblioteki Aio Pika do połączenia Pythona z RabbitMQ. W wielu przypadkach wystarczy podać URL AMQPS:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

Poniżej znajduje się kompletny przykład w Pythonie, który ustanawia bezpieczne połączenie z RabbitMQ. Możesz postępować zgodnie z tymi krokami, aby zweryfikować swoje połączenie i skonfigurować podstawową kolejkę:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # Odkomentuj poniższą linię, aby włączyć logi debugowania
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("Połączenie działa!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

Jeśli zobaczysz błąd

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

podczas łączenia się z RabbitMQ z Pythona, zazwyczaj oznacza to, że w systemie brakuje certyfikatu Let's Encrypt. Aby to rozwiązać, zainstaluj wspólne certyfikaty CA:

  1. Na Ubuntu/Debian, uruchom:

    sudo apt install ca-certificates
    
  2. Na Alpine Linux, uruchom:

    apk add ca-certificates
    

Jeśli nie możesz użyć tych poleceń, możesz zainstalować certyfikat CA ręcznie:

  1. Pobierz certyfikat CA Let's Encrypt z https://letsencrypt.org/certs/isrgrootx1.pem.

  2. Następnie połącz się z RabbitMQ w swoim kodzie Python, przekazując plik certyfikatu CA:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

Poniżej znajduje się kompletny przykład w Pythonie, który używa certyfikatu CA Let's Encrypt do ustanowienia bezpiecznego połączenia:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # Odkomentuj poniższą linię, aby włączyć logi debugowania
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # Ręcznie załaduj certyfikat CA Let's Encrypt
    # Pobierz go używając: wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("Połączenie działa!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)


if __name__ == "__main__":
    asyncio.run(main())

Poniżej znajduje się prosty przykład pokazujący, jak połączyć się z RabbitMQ z aplikacji GoLang, używając oficjalnej biblioteki Go RabbitMQ Client Library. Postępuj zgodnie z tymi krokami, aby skonfigurować swój projekt:

  1. Utwórz nowy katalog i zainicjuj moduł:
go mod init rabbitmq-example
  1. Dodaj bibliotekę RabbitMQ:
go get github.com/rabbitmq/amqp091-go
  1. Utwórz nowy plik o nazwie main.go i dodaj następującą zawartość:

    package main
    
    import (
      "fmt"
      amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
      connection, err := amqp.Dial("amqps://<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>")
      if err != nil {
        panic(err)
      }
      defer connection.Close()
    
      fmt.Println("Pomyślnie połączono z instancją RabbitMQ")
    }
    
  2. Uruchom swój kod:

go run main.go

Powinieneś zobaczyć "Pomyślnie połączono z instancją RabbitMQ", co oznacza, że twój kod połączył się bezpiecznie, używając uwierzytelniania i szyfrowania TLS.

Dla więcej przykładów, możesz eksplorować przykłady Go w oficjalnym repozytorium RabbitMQ: https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go.

Poniżej znajduje się przykład, który demonstruje, jak połączyć się z RabbitMQ z PHP, używając biblioteki php-amqplib. Ponieważ instancje Stackhero używają szyfrowania TLS (SSL), połączenie musi być ustanowione za pomocą AMQPSSLConnection:

use PhpAmqpLib\Connection\AMQPSSLConnection;

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  array()
);

/**
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($connection)
{
  $connection->close();
}

register_shutdown_function('shutdown', $connection);

Połączenie TLS może wymagać certyfikatu Urzędu Certyfikacji (CA). Chociaż wiele serwerów ma to już zainstalowane, może być konieczne pobranie go ręcznie. Postępuj zgodnie z tymi krokami:

  1. Pobierz certyfikat z https://letsencrypt.org/certs/isrgrootx1.pem i zapisz go na swoim serwerze.
  2. Użyj poniższego kodu PHP, aby połączyć się, używając pobranego certyfikatu:
$sslOptions = array(
  'cafile' => realpath(__DIR__ . '/isrgrootx1.pem'),
);

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  $sslOptions
);

Symfony może używać RabbitMQ jako brokera wiadomości, ustawiając zmienną środowiskową MESSENGER_TRANSPORT_DSN. Aby to skonfigurować, edytuj plik .env i ustaw zmienną w następujący sposób:

MESSENGER_TRANSPORT_DSN=amqps://<USER>:<PASSWORD>@<HOST>:<PORT>/%2f/messages?cacert=%2Fetc%2Fssl%2Fcerts%2Fca-certificates.crt

Zamień <USER>, <PASSWORD>, <HOST> i <PORT> na swoje dane RabbitMQ.

Upewnij się również, że plik config/packages/messenger.yaml używa zmiennej MESSENGER_TRANSPORT_DSN. Konfiguracja powinna wyglądać tak:

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'

Poniżej znajduje się przykład, jak skonfigurować Spring Boot do bezpiecznego połączenia z instancją RabbitMQ Stackhero. Zaktualizuj właściwości aplikacji tymi ustawieniami:

spring.rabbitmq.host=<XXXXXX>.stackhero-network.com
spring.rabbitmq.port=<AMQP_PORT_TLS>
spring.rabbitmq.username=admin
spring.rabbitmq.password=<PASSWORD>
spring.rabbitmq.ssl.enabled=true
spring.rabbitmq.ssl.algorithm=TLSv1.2

Oto przykład pokazujący, jak połączyć się z Stackhero RabbitMQ, używając .NET i MassTransit. Ten przykład konfiguruje hosta z niezbędnymi ustawieniami dla szyfrowania TLS:

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

public class Program
{
  public static void Main(string[] args)
  {
    var host = Host.CreateDefaultBuilder(args)
      .ConfigureServices((context, services) =>
      {
        services.AddMassTransit(x =>
        {
          x.UsingRabbitMq((context, cfg) =>
          {
            cfg.Host(new Uri("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"), h =>
            {
              h.UseSsl(s =>
              {
                s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
              });
            });
          });
        });

        services.AddMassTransitHostedService(true);
      })
      .Build();

    host.Run();
  }
}

Dzięki wtyczce "Wiadomości opóźnione" możesz opóźniać lub planować wiadomości, ustawiając opóźnienie w milisekundach. Wtyczkę można aktywować bezpośrednio z panelu Stackhero. Po aktywacji wtyczki, utwórz opóźnioną wymianę, albo przez panel administracyjny RabbitMQ, albo bezpośrednio w swoim kodzie.

Dla więcej szczegółów, zapoznaj się z oficjalnym repozytorium: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

Jeśli wyłączysz wtyczkę, wszystkie opóźnione wiadomości, które nie zostały jeszcze dostarczone, zostaną utracone.

Po aktywacji wtyczki na panelu Stackhero, przejdź do panelu administracyjnego RabbitMQ i utwórz wymianę typu "x-delayed-message". Następnie dodaj argument z kluczem x-delayed-type i wartością "direct". Ta konfiguracja jest zilustrowana na poniższym zrzucie ekranu.

Tworzenie wymianyTworzenie wymiany

Jeśli napotkasz błąd "Invalid argument, 'x-delayed-type' must be an existing exchange type", upewnij się, że poprawnie ustawiłeś argument x-delayed-type.

Podczas używania Elixir do połączenia z RabbitMQ, możesz napotkać błąd

CLIENT ALERT: Fatal - Handshake Failure

Ten problem jest związany z błędem w obsłudze TLS 1.3 w bibliotece AMQP dla Elixir. Praktycznym rozwiązaniem jest wymuszenie użycia TLS 1.2. Możesz to osiągnąć, dodając następującą opcję podczas otwierania połączenia:

AMQP.Connection.open("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", :undefined, ssl_options: [ versions: [ :"tlsv1.2" ] ])

Błąd Error: Socket closed abruptly during opening handshake może wystąpić podczas używania wersji biblioteki amqplib Node.js starszej niż 0.10.7 podczas łączenia się z RabbitMQ 4.1.0 lub nowszym. Ten problem jest związany ze zmianą w ustawieniu frame_max wprowadzoną w RabbitMQ 4.1.0.

Aby rozwiązać błąd, zaktualizuj swoją bibliotekę amqplib do wersji 0.10.7 lub nowszej.