Interfaz de Publicador/Suscriptor Personalizada
Para agregar soporte para un publicador/suscriptor personalizado, es necesario implementar las interfaces message.Publisher
y message.Subscriber
.
Código fuente completo: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish publica los mensajes proporcionados en el tema especificado.
//
// La publicación puede ejecutarse de forma síncrona o asíncrona, dependiendo de la implementación.
//
// La mayoría de las implementaciones de publicadores no admiten la publicación de mensajes de forma atómica.
// Esto significa que si un mensaje no se puede publicar, el siguiente mensaje no se publicará.
//
// Publish debe ser segura para subprocesos.
Publish(topic string, messages ...*Message) error
// Si el publicador es asíncrono, Close debe vaciar los mensajes no enviados.
Close() error
}
// Subscriber es la parte consumidora del publicador/suscriptor.
type Subscriber interface {
// Subscribe devuelve un canal de salida para los mensajes recibidos del tema proporcionado.
// El canal se cerrará cuando se llame a Close() en el suscriptor.
//
// Para recibir el siguiente mensaje, se debe llamar a Ack() en el mensaje recibido.
// Si el procesamiento del mensaje falla y se debe reintentar el envío del mensaje, se debe llamar a Nack().
//
// Cuando el ctx proporcionado se cancela, el suscriptor cerrará la suscripción y el canal de salida.
// El ctx proporcionado se establece en todos los mensajes generados.
// Cuando se llama a Nack o Ack en el mensaje, se cancela el contexto del mensaje.
Subscribe(ctx context.Context, topic string) (
Lista de Tareas
Aquí hay algunos puntos que no debes olvidar:
- Registro (buenos mensajes y niveles apropiados).
- Codificadores de mensajes reemplazables y configurables.
- La implementación de
Close()
para publicadores y suscriptores debería ser:- Idempotente
- Capaz de funcionar correctamente cuando el publicador o suscriptor está bloqueado (por ejemplo, esperando un Ack)
- Capaz de funcionar correctamente cuando el canal de salida del suscriptor está bloqueado (porque no hay nadie escuchándolo)
- Soporte para
Ack()
yNack()
en los mensajes consumidos. - Soporte para el reintento de mensajes consumidos usando
Nack()
. - Utilizar pruebas genéricas de publicador/suscriptor. Deberías consultar la guía de solución de problemas de pruebas para obtener consejos de depuración.
- Optimización del rendimiento.
- GoDocs, documentación en Markdown y ejemplos introductorios.