RabbitMQ: Pradžia

Kaip naudoti Stackhero RabbitMQ

👋 Sveiki atvykę į Stackhero dokumentaciją!

Stackhero siūlo paruoštą naudoti RabbitMQ cloud sprendimą, kuris suteikia daugybę privalumų, įskaitant:

  • Visišką prieigą prie RabbitMQ web UI vartotojų, vhostų ir leidimų valdymui.
  • Neribotas eiles be saugojimo laiko apribojimų.
  • Palaiko AMQP, MQTT, STOMP ir WebSocket protokolus.
  • Daugybė įtrauktų įskiepių, tokių kaip Delayed Message Exchange, Message Deduplication ir Consistent-hash Exchange.
  • Lengvi atnaujinimai vienu paspaudimu.
  • Optimali veikla ir tvirta sauga, užtikrinama privačia ir dedikuota VM.

Taupykite laiką ir supaprastinkite savo gyvenimą: tereikia 5 minučių, kad išbandytumėte Stackhero RabbitMQ cloud hosting sprendimą!

Šis pavyzdys parodo, kaip naudoti Aio Pika biblioteką, kad Python prisijungtų prie RabbitMQ. Daugeliu atvejų pakanka nurodyti AMQPS URL:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

Žemiau pateiktas pilnas Python pavyzdys, kuris užmezga saugų ryšį su RabbitMQ. Galite sekti šiuos veiksmus, kad patikrintumėte savo ryšį ir nustatytumėte pagrindinę eilę:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # Atkomentuokite šią eilutę, kad įjungtumėte derinimo žurnalus
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("Ryšys veikia!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

Jei matote klaidą

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

prisijungiant prie RabbitMQ iš Python, tai dažniausiai reiškia, kad jūsų sistemoje trūksta Let's Encrypt sertifikato. Norėdami tai išspręsti, įdiekite bendrus CA sertifikatus:

  1. Ubuntu/Debian naudokite šią komandą:

    sudo apt install ca-certificates
    
  2. Alpine Linux naudokite šią komandą:

    apk add ca-certificates
    

Jei negalite naudoti šių komandų, galite įdiegti CA sertifikatą rankiniu būdu:

  1. Atsisiųskite Let's Encrypt CA sertifikatą iš https://letsencrypt.org/certs/isrgrootx1.pem.

  2. Tada prisijunkite prie RabbitMQ savo Python kode, nurodydami CA sertifikato failą:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

Žemiau pateiktas pilnas Python pavyzdys, kuris naudoja Let's Encrypt CA sertifikatą, kad užmegztų saugų ryšį:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # Atkomentuokite šią eilutę, kad įjungtumėte derinimo žurnalus
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # Rankiniu būdu įkelkite Let's Encrypt CA sertifikatą
    # Atsisiųskite jį naudodami: wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("Ryšys veikia!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)


if __name__ == "__main__":
    asyncio.run(main())

Žemiau pateiktas paprastas pavyzdys, kuris parodo, kaip prisijungti prie RabbitMQ iš GoLang programos naudojant oficialią Go RabbitMQ Client Library. Sekite šiuos veiksmus, kad nustatytumėte savo projektą:

  1. Sukurkite naują katalogą ir inicijuokite modulį:
go mod init rabbitmq-example
  1. Pridėkite RabbitMQ biblioteką:
go get github.com/rabbitmq/amqp091-go
  1. Sukurkite naują failą pavadinimu main.go ir pridėkite šį turinį:

    package main
    
    import (
      "fmt"
      amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
      connection, err := amqp.Dial("amqps://<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>")
      if err != nil {
        panic(err)
      }
      defer connection.Close()
    
      fmt.Println("Sėkmingai prisijungta prie RabbitMQ instancijos")
    }
    
  2. Paleiskite savo kodą:

go run main.go

Turėtumėte matyti "Sėkmingai prisijungta prie RabbitMQ instancijos", kas rodo, kad jūsų kodas prisijungė saugiai naudojant autentifikaciją ir TLS šifravimą.

Daugiau pavyzdžių galite rasti oficialiame RabbitMQ repozitoriume: https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go.

Žemiau pateiktas pavyzdys, kuris parodo, kaip prisijungti prie RabbitMQ iš PHP naudojant php-amqplib biblioteką. Kadangi Stackhero instancijos naudoja TLS šifravimą (SSL), ryšys turi būti užmegztas su AMQPSSLConnection:

use PhpAmqpLib\Connection\AMQPSSLConnection;

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  array()
);

/**
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($connection)
{
  $connection->close();
}

register_shutdown_function('shutdown', $connection);

TLS ryšiui gali prireikti Sertifikavimo institucijos (CA) sertifikato. Nors daugelis serverių jau turi jį įdiegtą, gali tekti jį atsisiųsti rankiniu būdu. Sekite šiuos veiksmus:

  1. Atsisiųskite sertifikatą iš https://letsencrypt.org/certs/isrgrootx1.pem ir išsaugokite jį savo serveryje.
  2. Naudokite šį PHP kodą, kad prisijungtumėte naudodami atsisiųstą sertifikatą:
$sslOptions = array(
  'cafile' => realpath(__DIR__ . '/isrgrootx1.pem'),
);

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  $sslOptions
);

Symfony gali naudoti RabbitMQ kaip pranešimų brokerį nustatant aplinkos kintamąjį MESSENGER_TRANSPORT_DSN. Norėdami tai sukonfigūruoti, redaguokite .env failą ir nustatykite kintamąjį taip:

MESSENGER_TRANSPORT_DSN=amqps://<USER>:<PASSWORD>@<HOST>:<PORT>/%2f/messages?cacert=%2Fetc%2Fssl%2Fcerts%2Fca-certificates.crt

Pakeiskite <USER>, <PASSWORD>, <HOST> ir <PORT> savo RabbitMQ duomenimis.

Taip pat įsitikinkite, kad failas config/packages/messenger.yaml naudoja kintamąjį MESSENGER_TRANSPORT_DSN. Konfigūracija turėtų atrodyti taip:

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'

Žemiau pateiktas pavyzdys, kaip galite sukonfigūruoti Spring Boot, kad saugiai prisijungtumėte prie Stackhero RabbitMQ instancijos. Atnaujinkite savo programos savybes su šiais nustatymais:

spring.rabbitmq.host=<XXXXXX>.stackhero-network.com
spring.rabbitmq.port=<AMQP_PORT_TLS>
spring.rabbitmq.username=admin
spring.rabbitmq.password=<PASSWORD>
spring.rabbitmq.ssl.enabled=true
spring.rabbitmq.ssl.algorithm=TLSv1.2

Čia pateiktas pavyzdys, kaip prisijungti prie Stackhero RabbitMQ naudojant .NET ir MassTransit. Šis pavyzdys konfigūruoja hostą su reikalingais nustatymais TLS šifravimui:

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

public class Program
{
  public static void Main(string[] args)
  {
    var host = Host.CreateDefaultBuilder(args)
      .ConfigureServices((context, services) =>
      {
        services.AddMassTransit(x =>
        {
          x.UsingRabbitMq((context, cfg) =>
          {
            cfg.Host(new Uri("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"), h =>
            {
              h.UseSsl(s =>
              {
                s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
              });
            });
          });
        });

        services.AddMassTransitHostedService(true);
      })
      .Build();

    host.Run();
  }
}

Naudodami „Atidėtų pranešimų“ įskiepį, galite atidėti arba suplanuoti pranešimus nustatydami vėlavimą milisekundėmis. Įskiepis gali būti aktyvuotas tiesiogiai iš Stackhero prietaisų skydelio. Po įskiepio aktyvavimo, sukurkite atidėtą mainus arba per RabbitMQ administravimo skydelį, arba tiesiogiai savo kode.

Daugiau informacijos rasite oficialiame repozitoriume: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

Jei išjungsite įskiepį, visi atidėti pranešimai, kurie dar nebuvo pristatyti, bus prarasti.

Po įskiepio aktyvavimo Stackhero prietaisų skydelyje, eikite į savo RabbitMQ administravimo skydelį ir sukurkite mainus su tipu "x-delayed-message". Tada pridėkite argumentą su raktu x-delayed-type ir verte "direct". Ši konfigūracija pavaizduota žemiau esančiame ekrano vaizde.

Mainų kūrimasMainų kūrimas

Jei susiduriate su klaida "Invalid argument, 'x-delayed-type' must be an existing exchange type", įsitikinkite, kad teisingai nustatėte x-delayed-type argumentą.

Naudojant Elixir prisijungti prie RabbitMQ, galite susidurti su klaida

CLIENT ALERT: Fatal - Handshake Failure

Ši problema susijusi su TLS 1.3 palaikymo klaida AMQP bibliotekoje Elixir. Praktinis sprendimas yra priversti naudoti TLS 1.2. Tai galite pasiekti įtraukdami šią parinktį atidarant ryšį:

AMQP.Connection.open("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", :undefined, ssl_options: [ versions: [ :"tlsv1.2" ] ])

Klaida Error: Socket closed abruptly during opening handshake gali atsirasti naudojant Node.js amqplib bibliotekos versiją senesnę nei 0.10.7, kai jungiatės prie RabbitMQ 4.1.0 ar naujesnės. Ši problema susijusi su frame_max nustatymo pakeitimu, įvestu RabbitMQ 4.1.0.

Norėdami išspręsti klaidą, atnaujinkite savo amqplib biblioteką į versiją 0.10.7 ar naujesnę.