간단한 고랭 RabbitMQ 큐 모드
설명: P는 제작자, C는 소비자를 나타내며, 빨간색은 큐를 나타냅니다.
참고: RabbitMQ에 익숙하지 않은 경우 먼저 RabbitMQ 기본 개념 섹션을 읽어주십시오.
1. 종속성 설치
go get github.com/streadway/amqp
종속성 패키지 가져오기
import (
"github.com/streadway/amqp"
)
2. 메시지 보내기
다음 단계에서는 메시지 제작자가 메시지 전달을 완료하는 방법을 보여줍니다.
2.1. RabbitMQ 서버에 연결
// RabbitMQ 서버에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
연결 주소 설명:
amqp://사용자이름:암호@RabbitMQ주소:포트/
2.2. 채널 생성
대부분의 작업은 채널에서 수행됩니다.
ch, err := conn.Channel()
defer ch.Close()
2.3. 큐 선언
읽거나 쓸 큐를 나타냅니다.
q, err := ch.QueueDeclare(
"hello", // 큐 이름
false, // 메시지 지속성
false, // 사용하지 않을 때 큐 삭제
false, // 배타적
false, // 노-웨이트
nil, // 매개변수
)
2.4. 메시지 전송
// 메시지 내용
body := "안녕, 세상아!"
// 메시지 전송
err = ch.Publish(
"", // 교환 (여기서는 무시)
q.Name, // 라우팅 매개변수, 큐 이름을 라우팅 매개변수로 사용
false, // 필수
false, // 즉시
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // 메시지 내용
})
2.5. 메시지 송신을 위한 완전한 코드
package main
// 패키지 가져오기
import (
"log"
"github.com/streadway/amqp"
)
// 오류 처리
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ에 연결하지 못했습니다")
defer conn.Close()
// 채널 생성
ch, err := conn.Channel()
failOnError(err, "채널 열기를 실패했습니다")
defer ch.Close()
// 조작할 큐 선언
q, err := ch.QueueDeclare(
"hello", // 이름
false, // 지속적
false, // 사용되지 않을 때 삭제
false, // 배타적
false, // 노-웨이트
nil, // 인수
)
failOnError(err, "큐 선언에 실패했습니다")
// 보낼 메시지 내용
body := "안녕, 세상아!"
// 메시지 전송
err = ch.Publish(
"", // 교환
q.Name, // 라우팅 키
false, // 필수
false, // 즉시
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "메시지를 게시하지 못했습니다")
log.Printf(" [x] 전송됨 %s", body)
}
3. 메시지 수신
메시지를 수신하는 첫 세 단계는 메시지를 보내는 것과 동일합니다. 각각 2.1, 2.2 및 2.3 절에 해당합니다. 메시지를 수신하는 완전한 코드는 다음과 같습니다:
package main
// 패키지 가져오기
import (
"log"
"github.com/streadway/amqp"
)
// 오류 처리
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ에 연결 실패")
defer conn.Close()
// 채널 생성
ch, err := conn.Channel()
failOnError(err, "채널 열기 실패")
defer ch.Close()
// 조작할 대상 큐 선언
q, err := ch.QueueDeclare(
"hello", // 큐 이름은 메시지를 보내는 큐 이름과 일치해야 함
false, // durable
false, // 사용되지 않을 때 삭제
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "큐 선언 실패")
// 메시지 소비자 생성
msgs, err := ch.Consume(
q.Name, // 큐 이름
"", // 소비자 이름을 지정하지 않으면 자동으로 고유한 ID가 생성됨
true, // 메시지를 자동으로 확인할지 여부, 즉, RabbitMQ에게 메시지가 성공적으로 처리되었음을 자동으로 알림
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "소비자 등록 실패")
// 루프에서 큐에서 메시지 받아오기
for d := range msgs {
// 메시지 내용 출력
log.Printf("메시지 받음: %s", d.Body)
}
}