Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)
RabbitMQ의 발행/구독 패턴은 생성자에 의해 전송된 메시지가 다중 소비자에 의해 처리됨을 의미합니다.
설명:
- P는 생성자를, C1과 C2는 소비자를 나타내며, 빨간색으로 표시된 것은 대기열을 나타내고, X는 교환을 나타냅니다.
- 교환은 교환에 바인딩된 모든 대기열로 메시지를 전달하는 역할을 합니다.
- 여러 대기열을 정의할 수 있으며, 각각은 동일한 교환에 바인딩됩니다.
- 각 대기열은 하나 이상의 소비자를 가질 수 있습니다.
참고: RabbitMQ에 익숙하지 않다면 먼저 RabbitMQ 기본 개념 섹션을 읽어주십시오.
1. 종속 패키지 설치
go get github.com/streadway/amqp
2. 메시지 전송
다음 단계에서는 메시지 생성자가 메시지를 전송하는 방법을 보여줍니다.
2.1. RabbitMQ 서버에 연결
// RabbitMQ 서버에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
연결 주소 설명:
amqp://사용자명:비밀번호@RabbitMQ주소:포트/
2.2. 채널 생성
대부분의 작업은 채널에서 수행됩니다.
ch, err := conn.Channel()
defer ch.Close()
2.3. 교환 선언
메시지는 먼저 교환으로 전송됩니다. 교환은 자신의 전략에 따라 메시지를 대기열로 전달합니다.
err = ch.ExchangeDeclare(
"tizi365", // 교환 이름
"fanout", // 교환 유형, 여기서는 팬아웃 유형을 사용하여 발행/구독 패턴을 사용합니다
true, // 지속성
false, // 자동 삭제
false, // 내부
false, // No-wait
nil, // 인수
)
2.4. 메시지 발행
// 메시지 내용
body := "안녕하세요, Tizi365.com!"
// 메시지 발행
err = ch.Publish(
"tizi365", // Exchange (이전 선언과 일치하는 교환 이름)
"", // 라우팅 키, 팬아웃 유형 교환의 경우 자동으로 무시되므로 제공할 필요가 없습니다.
false, // 필수값
false, // 즉시 전달
amqp.Publishing {
ContentType: "text/plain", // 메시지 콘텐츠 유형, 여기서는 일반 텍스트입니다
Body: []byte(body), // 메시지 내용
})
2.5. 메시지 전송 코드 완성
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ에 연결하는 데 실패했습니다.")
defer conn.Close()
// 채널 생성
ch, err := conn.Channel()
failOnError(err, "채널 열기에 실패했습니다.")
defer ch.Close()
// 교환 선언
err = ch.ExchangeDeclare(
"tizi365", // 교환 이름
"fanout", // 교환 유형, 발행/구독 모드에는 팬아웃 사용
true, // 지속성
false, // 자동 삭제
false, // 내부
false, // No-wait
nil, // Arguments
)
failOnError(err, "교환 선언에 실패했습니다.")
// 메시지 내용
body := "안녕하세요, Tizi365.com!"
// 메시지 전송
err = ch.Publish(
"tizi365", // Exchange (위의 선언과 일치)
"", // 라우팅 키, 팬아웃 유형 교환의 경우 자동으로 무시됨
false, // 필수값
false, // 즉시 전달
amqp.Publishing {
ContentType: "text/plain", // 메시지 콘텐츠 유형, 여기서는 일반 텍스트입니다
Body: []byte(body), // 메시지 내용
})
log.Printf("보낸 내용: %s", body)
}
3. 메시지 수신
메시지를 수신하기 위한 처음 세 단계 - RabbitMQ에 연결, 채널 생성, 교환 선언 -은 메시지 전송과 동일합니다. 이전 섹션 2.1, 2.2 및 2.3을 참조하십시오.
3.1. 큐 선언
작업할 큐를 선언합니다.
q, err := ch.QueueDeclare(
"", // 큐 이름 (지정하지 않으면 무작위로 생성됨)
false, // 내구성
false, // 미사용 시 삭제
true, // 배타적
false, // No-wait
nil, // 인수
)
3.2. 큐를 교환과 바인딩
메시지를 수신하기 위해 큐를 교환에 바인딩해야 합니다.
err = ch.QueueBind(
q.Name, // 큐 이름
"", // 라우팅 키, 팬아웃 타입의 교환에 대해 라우팅 키는 자동으로 무시됨
"tizi365", // 교환 이름, 메시지 발신자가 정의한 것과 일치해야 함
false,
nil)
참고: 실제 응용 프로그램에서는 교환에 바인딩된 N개의 큐를 정의하여 교환에서 전달된 메시지를 수신하는 용도로 사용할 수 있습니다. 이는 발행-구독 패턴이 반영된 곳입니다.
3.3. 소비자 생성
msgs, err := ch.Consume(
q.Name, // 위에서 참조한 큐 이름
"", // 소비자 이름 (지정하지 않으면 자동으로 생성됨)
true, // 메시지 처리 자동 확인
false, // 배타적
false, // No-local
false, // No-wait
nil, // 인수
)
// 메시지 처리를 위한 루프
for d := range msgs {
log.Printf("받은 메시지=%s", d.Body)
}
3.4. 소비자 코드 완성
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ 연결 실패")
defer conn.Close()
// 보통 1개의 소비자 당 1개의 채널을 생성함
ch, err := conn.Channel()
failOnError(err, "채널 열기 실패")
defer ch.Close()
// 교환을 선언함
err = ch.ExchangeDeclare(
"tizi365", // 교환 이름, 메시지 발신자가 사용한 것과 일치해야 함
"fanout", // 교환 타입
true, // 내구성
false, // 자동 삭제
false, // 내부
false, // No-wait
nil, // 인수
)
failOnError(err, "교환 선언 실패")
// 작업할 큐를 선언함
q, err := ch.QueueDeclare(
"", // 큐 이름 (빈 경우 무작위 이름이 생성됨)
false, // 내구성
false, // 미사용 시 삭제
true, // 배타적
false, // No-wait
nil, // 인수
)
failOnError(err, "큐 선언 실패")
// 큐를 지정된 교환에 바인딩함
err = ch.QueueBind(
q.Name, // 큐 이름
"", // 라우팅 키, 팬아웃 교환에는 무시됨
"tizi365", // 교환 이름, 메시지 발신자가 정의한 것과 일치해야 함
false,
nil)
failOnError(err, "큐 바인딩 실패")
// 소비자 생성
msgs, err := ch.Consume(
q.Name, // 앞서 정의한 큐 이름 참조
"", // 소비자 이름 (비어 있으면 자동 생성됨)
true, // 자동 확인
false, // 배타적
false, // No-local
false, // No-wait
nil, // 인수
)
failOnError(err, "소비자 등록 실패")
// 큐에서 메시지를 루프로 소비
for d := range msgs {
log.Printf("받은 메시지: %s", d.Body)
}
}
3.5. 다중 소비자
작동 모드 섹션을 참조하여 고루틴을 사용하여 간단히 여러 소비자를 시작합니다.