간단한 고랭 RabbitMQ 큐 모드

Golang RabbitMQ

설명: P는 제작자, C는 소비자를 나타내며, 빨간색은 큐를 나타냅니다.

참고: RabbitMQ에 익숙하지 않은 경우 먼저 RabbitMQ 기본 개념 섹션을 읽어주십시오.

1. 종속성 설치

go get github.com/streadway/amqp

종속성 패키지 가져오기

import (
  "github.com/streadway/amqp"
)

2. 메시지 보내기

다음 단계에서는 메시지 제작자가 메시지 전달을 완료하는 방법을 보여줍니다.

2.1. RabbitMQ 서버에 연결

// RabbitMQ 서버에 연결
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

연결 주소 설명:

amqp://사용자이름:암호@RabbitMQ주소:포트/

2.2. 채널 생성

대부분의 작업은 채널에서 수행됩니다.

ch, err := conn.Channel()
defer ch.Close()

2.3. 큐 선언

읽거나 쓸 큐를 나타냅니다.

q, err := ch.QueueDeclare(
  "hello", // 큐 이름
  false,   // 메시지 지속성
  false,   // 사용하지 않을 때 큐 삭제
  false,   // 배타적
  false,   // 노-웨이트
  nil,     // 매개변수
)

2.4. 메시지 전송

// 메시지 내용
body := "안녕, 세상아!"

// 메시지 전송
err = ch.Publish(
  "",     // 교환 (여기서는 무시)
  q.Name, // 라우팅 매개변수, 큐 이름을 라우팅 매개변수로 사용
  false,  // 필수
  false,  // 즉시
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // 메시지 내용
  })

2.5. 메시지 송신을 위한 완전한 코드

package main

// 패키지 가져오기
import (
	"log"
	"github.com/streadway/amqp"
)

// 오류 처리
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ에 연결
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ에 연결하지 못했습니다")
	defer conn.Close()

	// 채널 생성
	ch, err := conn.Channel()
	failOnError(err, "채널 열기를 실패했습니다")
	defer ch.Close()

	// 조작할 큐 선언
	q, err := ch.QueueDeclare(
		"hello", // 이름
		false,   // 지속적
		false,   // 사용되지 않을 때 삭제
		false,   // 배타적
		false,   // 노-웨이트
		nil,     // 인수
	)
	failOnError(err, "큐 선언에 실패했습니다")

	// 보낼 메시지 내용
	body := "안녕, 세상아!"

	// 메시지 전송
	err = ch.Publish(
		"",     // 교환
		q.Name, // 라우팅 키
		false,  // 필수
		false,  // 즉시
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "메시지를 게시하지 못했습니다")
	log.Printf(" [x] 전송됨 %s", body)
}

3. 메시지 수신

메시지를 수신하는 첫 세 단계는 메시지를 보내는 것과 동일합니다. 각각 2.1, 2.2 및 2.3 절에 해당합니다. 메시지를 수신하는 완전한 코드는 다음과 같습니다:

package main

// 패키지 가져오기
import (
	"log"
	"github.com/streadway/amqp"
)

// 오류 처리
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ에 연결
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ에 연결 실패")
	defer conn.Close()

	// 채널 생성
	ch, err := conn.Channel()
	failOnError(err, "채널 열기 실패")
	defer ch.Close()
	
	// 조작할 대상 큐 선언
	q, err := ch.QueueDeclare(
		"hello", // 큐 이름은 메시지를 보내는 큐 이름과 일치해야 함
		false,   // durable
		false,   // 사용되지 않을 때 삭제
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "큐 선언 실패")

	// 메시지 소비자 생성
	msgs, err := ch.Consume(
		q.Name, // 큐 이름
		"",     // 소비자 이름을 지정하지 않으면 자동으로 고유한 ID가 생성됨
		true,   // 메시지를 자동으로 확인할지 여부, 즉, RabbitMQ에게 메시지가 성공적으로 처리되었음을 자동으로 알림
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "소비자 등록 실패")
	
	// 루프에서 큐에서 메시지 받아오기
	for d := range msgs {
		// 메시지 내용 출력
		log.Printf("메시지 받음: %s", d.Body)
	}
}