golang,go,博客,开源,编程
今天简单介绍一个消息队列库 nsqio/go-nsq
NSQ 是一个实时分布式消息平台,旨在处理大规模、分布式系统中的消息流转。它采用去中心化的设计,并且具备高可用性和容错能力。
而 go-nsq 则是 NSQ 官方提供的 Golang 客户端库,用于实现消息的生产(Producer)与消费(Consumer)。它封装了网络连接、重试、心跳检测等底层细节,使得开发者能够专注于业务逻辑的实现。
下面的示例展示了如何使用 go-nsq 发布消息:
package main
import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
// 创建配置对象
config := nsq.NewConfig()
// 创建一个生产者,连接到 NSQD 的 TCP 端口(默认4150)
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
// 发布消息到指定的 topic
topic := "sample_topic"
messageBody := []byte("Hello NSQ!")
if err := producer.Publish(topic, messageBody); err != nil {
log.Fatalf("消息发布失败: %v", err)
} else {
fmt.Println("消息发布成功!")
}
// 停止生产者,释放连接
producer.Stop()
}
说明:
nsq.NewProducer
创建生产者对象,并指定 NSQD 地址。Publish
方法将消息投递到指定 topic 中。Stop
方法关闭连接。下面的示例展示了如何使用 go-nsq 消费消息:
package main
import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
// MyHandler 实现了 nsq.Handler 接口,用于处理接收到的消息
type MyHandler struct{}
func (h *MyHandler) HandleMessage(m *nsq.Message) error {
// 处理消息,业务逻辑写在这里
fmt.Printf("收到消息: %s\n", string(m.Body))
return nil
}
func main() {
config := nsq.NewConfig()
// 创建消费者,指定 topic 和 channel,channel 用于消费者分组
consumer, err := nsq.NewConsumer("sample_topic", "sample_channel", config)
if err != nil {
log.Fatalf("创建消费者失败: %v", err)
}
// 添加消息处理器
consumer.AddHandler(&MyHandler{})
// 连接到 NSQD 实例(或通过 nsqlookupd 发现节点)
if err := consumer.ConnectToNSQD("127.0.0.1:4150"); err != nil {
log.Fatalf("连接 NSQD 失败: %v", err)
}
// 阻塞等待退出信号
<-consumer.StopChan
}
说明:
nsq.NewConsumer
用于创建消费者对象,传入 topic 和 channel 参数。MyHandler
实现了 nsq.Handler
接口,HandleMessage
方法负责处理每条消息。ConnectToNSQD
方法连接到 NSQD 节点,或者可以使用 ConnectToNSQLookupd
通过 lookup 服务连接集群。StopChan
阻塞等待消费者退出。在实际生产环境中,可以通过 nsqlookupd 动态获取 NSQD 节点列表,实现更灵活的负载均衡与故障切换:
package main
import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("sample_topic", "sample_channel", config)
if err != nil {
log.Fatalf("创建消费者失败: %v", err)
}
consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
fmt.Printf("收到消息: %s\n", string(m.Body))
return nil
}))
// 连接到 nsqlookupd,动态发现 NSQD 节点(默认端口4161)
if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
log.Fatalf("连接 nsqlookupd 失败: %v", err)
}
<-consumer.StopChan
}
说明:
ConnectToNSQLookupd
方法连接到 nsqlookupd 服务,自动获取可用的 NSQD 节点列表。go-nsq 提供了许多配置项,下面列举几个常用的配置参数:
通过调整这些参数,可以针对不同的业务场景实现更高效、更稳定的消息传输。