Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

RabbitMQ의 발행/구독 패턴은 생성자에 의해 전송된 메시지가 다중 소비자에 의해 처리됨을 의미합니다.

팬아웃 모드

설명:

  • P는 생성자를, C1과 C2는 소비자를 나타내며, 빨간색으로 표시된 것은 대기열을 나타내고, X는 교환을 나타냅니다.
  • 교환은 교환에 바인딩된 모든 대기열로 메시지를 전달하는 역할을 합니다.
  • 여러 대기열을 정의할 수 있으며, 각각은 동일한 교환에 바인딩됩니다.
  • 각 대기열은 하나 이상의 소비자를 가질 수 있습니다.

참고: RabbitMQ에 익숙하지 않다면 먼저 RabbitMQ 기본 개념 섹션을 읽어주십시오.

1. 종속 패키지 설치

go get github.com/streadway/amqp

2. 메시지 전송

다음 단계에서는 메시지 생성자가 메시지를 전송하는 방법을 보여줍니다.

2.1. RabbitMQ 서버에 연결

// RabbitMQ 서버에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

연결 주소 설명:

amqp://사용자명:비밀번호@RabbitMQ주소:포트/

2.2. 채널 생성

대부분의 작업은 채널에서 수행됩니다.

ch, err := conn.Channel()
defer ch.Close()

2.3. 교환 선언

메시지는 먼저 교환으로 전송됩니다. 교환은 자신의 전략에 따라 메시지를 대기열로 전달합니다.

err = ch.ExchangeDeclare(
	"tizi365",   // 교환 이름
	"fanout", // 교환 유형, 여기서는 팬아웃 유형을 사용하여 발행/구독 패턴을 사용합니다
	true,     // 지속성
	false,    // 자동 삭제
	false,    // 내부
	false,    // No-wait
	nil,      // 인수
)

2.4. 메시지 발행

// 메시지 내용
body := "안녕하세요, Tizi365.com!"

// 메시지 발행
err = ch.Publish(
  "tizi365",     // Exchange (이전 선언과 일치하는 교환 이름)
  "", // 라우팅 키, 팬아웃 유형 교환의 경우 자동으로 무시되므로 제공할 필요가 없습니다.
  false,  // 필수값
  false,  // 즉시 전달
  amqp.Publishing {
    ContentType: "text/plain", // 메시지 콘텐츠 유형, 여기서는 일반 텍스트입니다
    Body:        []byte(body),  // 메시지 내용
  })

2.5. 메시지 전송 코드 완성

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ에 연결
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ에 연결하는 데 실패했습니다.")
	defer conn.Close()

	// 채널 생성
	ch, err := conn.Channel()
	failOnError(err, "채널 열기에 실패했습니다.")
	defer ch.Close()

	// 교환 선언
	err = ch.ExchangeDeclare(
		"tizi365",   // 교환 이름
		"fanout", // 교환 유형, 발행/구독 모드에는 팬아웃 사용
		true,     // 지속성
		false,    // 자동 삭제
		false,    // 내부
		false,    // No-wait
		nil,      // Arguments
	)
	failOnError(err, "교환 선언에 실패했습니다.")

	// 메시지 내용
	body := "안녕하세요, Tizi365.com!"

	// 메시지 전송
	err = ch.Publish(
		"tizi365",     // Exchange (위의 선언과 일치)
		"", // 라우팅 키, 팬아웃 유형 교환의 경우 자동으로 무시됨
		false,  // 필수값
		false,  // 즉시 전달
		amqp.Publishing {
			ContentType: "text/plain", // 메시지 콘텐츠 유형, 여기서는 일반 텍스트입니다
			Body:        []byte(body),  // 메시지 내용
		})

	log.Printf("보낸 내용: %s", body)
}

3. 메시지 수신

메시지를 수신하기 위한 처음 세 단계 - RabbitMQ에 연결, 채널 생성, 교환 선언 -은 메시지 전송과 동일합니다. 이전 섹션 2.1, 2.2 및 2.3을 참조하십시오.

3.1. 큐 선언

작업할 큐를 선언합니다.

q, err := ch.QueueDeclare(
		"",    // 큐 이름 (지정하지 않으면 무작위로 생성됨)
		false, // 내구성
		false, // 미사용 시 삭제
		true,  // 배타적
		false, // No-wait
		nil,   // 인수
	)

3.2. 큐를 교환과 바인딩

메시지를 수신하기 위해 큐를 교환에 바인딩해야 합니다.

err = ch.QueueBind(
		q.Name, // 큐 이름
		"",     // 라우팅 키, 팬아웃 타입의 교환에 대해 라우팅 키는 자동으로 무시됨
		"tizi365", // 교환 이름, 메시지 발신자가 정의한 것과 일치해야 함
		false,
		nil)

참고: 실제 응용 프로그램에서는 교환에 바인딩된 N개의 큐를 정의하여 교환에서 전달된 메시지를 수신하는 용도로 사용할 수 있습니다. 이는 발행-구독 패턴이 반영된 곳입니다.

3.3. 소비자 생성

msgs, err := ch.Consume(
		q.Name, // 위에서 참조한 큐 이름
		"",     // 소비자 이름 (지정하지 않으면 자동으로 생성됨)
		true,   // 메시지 처리 자동 확인
		false,  // 배타적
		false,  // No-local
		false,  // No-wait
		nil,    // 인수
	)
	
// 메시지 처리를 위한 루프
for d := range msgs {
	log.Printf("받은 메시지=%s", d.Body)
}

3.4. 소비자 코드 완성

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ에 연결
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ 연결 실패")
	defer conn.Close()

	// 보통 1개의 소비자 당 1개의 채널을 생성함
	ch, err := conn.Channel()
	failOnError(err, "채널 열기 실패")
	defer ch.Close()

	// 교환을 선언함
	err = ch.ExchangeDeclare(
		"tizi365",   // 교환 이름, 메시지 발신자가 사용한 것과 일치해야 함
		"fanout", // 교환 타입
		true,     // 내구성
		false,    // 자동 삭제
		false,    // 내부
		false,    // No-wait
		nil,      // 인수
	)
	failOnError(err, "교환 선언 실패")

	// 작업할 큐를 선언함
	q, err := ch.QueueDeclare(
		"",    // 큐 이름 (빈 경우 무작위 이름이 생성됨)
		false, // 내구성
		false, // 미사용 시 삭제
		true,  // 배타적
		false, // No-wait
		nil,   // 인수
	)
	failOnError(err, "큐 선언 실패")

	// 큐를 지정된 교환에 바인딩함
	err = ch.QueueBind(
		q.Name, // 큐 이름
		"",     // 라우팅 키, 팬아웃 교환에는 무시됨
		"tizi365", // 교환 이름, 메시지 발신자가 정의한 것과 일치해야 함
		false,
		nil)
	failOnError(err, "큐 바인딩 실패")

	// 소비자 생성
	msgs, err := ch.Consume(
		q.Name, // 앞서 정의한 큐 이름 참조
		"",     // 소비자 이름 (비어 있으면 자동 생성됨)
		true,   // 자동 확인
		false,  // 배타적
		false,  // No-local
		false,  // No-wait
		nil,    // 인수
	)
	failOnError(err, "소비자 등록 실패")

	// 큐에서 메시지를 루프로 소비
	for d := range msgs {
		log.Printf("받은 메시지: %s", d.Body)
	}
}

3.5. 다중 소비자

작동 모드 섹션을 참조하여 고루틴을 사용하여 간단히 여러 소비자를 시작합니다.