nsq基础知识与简单demo
NSQ
A realtime distributed messaging platform
优势
- 基于golang
- 分布式
- 水平扩展
- 自带UI,操作友好
- 多语言client
组件
组件 | 功能 |
---|---|
nsqd | 接收、排队和向客户端传递消息的守护进程 |
nsqlookupd | 管理拓扑信息的守护进程 |
nsqadmin | Web UI,用于实时查看聚合的集群统计信息并执行各种管理任务 |
utilities | 常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq |
nsqd
它可以独立运行,但通常与nsqlookupd 实例一起配置在集群中(在这种情况下,它将宣布主题和频道以供发现)。
它侦听两个 TCP 端口,一个用于客户端,另一个用于 HTTP API。它可以选择在第三个端口上侦听 HTTPS。
nsqlookupd
有两个接口:nsqd用于广播的TCP 接口和用于客户端执行发现和管理操作的 HTTP 接口。
MAC安装
brew install nsq
nsqlookupd
nsqd –lookupd-tcp-address=127.0.0.1:4160 –broadcast-address=127.0.0.1
nsqadmin –lookupd-http-address=127.0.0.1:4161
go-nsq
// producer
package nsq
import (
"context"
"fmt"
"time"
"github.com/nsqio/go-nsq"
"github.com/spf13/cast"
)
// 主函数
func Send(ctx context.Context, cancel context.CancelFunc, topic string) {
defer cancel()
str := "127.0.0.1:4150"
fmt.Println("address: ", str)
producer, err := nsq.NewProducer(str, nsq.NewConfig())
if err != nil {
panic(err)
}
producer.SetLogger(nil, 0)
for i := 0; i < 5; i++ {
msg := "puresai, " + cast.ToString(i)
fmt.Println("publish", msg, producer.Publish(topic, []byte(msg)))
time.Sleep(time.Second * 1)
}
<-ctx.Done()
producer.Stop()
fmt.Println("producer exit")
}
// consumer
package nsq
import (
"context"
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
// 消费者
type Consumer struct{}
// 主函数
func Receive(ctx context.Context, cancel context.CancelFunc, topic string) {
defer cancel()
// address := "127.0.0.1:4161"
channel := topic + "-channel"
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second * 2
c, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddConcurrentHandlers(&Consumer{}, 3)
//建立NSQLookupd连接
// if err := c.ConnectToNSQLookupd(address); err != nil {
// panic(err)
// }
//建立多个nsqd连接
if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150"}); err != nil {
panic(err)
}
<-ctx.Done()
c.Stop()
fmt.Println("consumer exit")
}
// 处理消息
func (*Consumer) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
// nsq_test
package nsq
import (
"context"
"os"
"os/signal"
"syscall"
"testing"
"time"
)
func TestReceive(t *testing.T) {
topic := "sai0556"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go Send(ctx, cancel, topic)
go Receive(ctx, cancel, topic)
sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
t.Log("开始监听")
select {
case <-ctx.Done():
t.Log("ctx done")
return
case <-sig:
t.Log("signal exit...")
cancel()
time.Sleep(2 * time.Second)
return
}
// send(topic)
}
测试走一波,
对于NSQ,自己也是刚刚使用,给我的感觉是相当好上手,之前有用过RabbitMQ,nsq相比来说更简单,可能是go-client相对好用一些吧。
后续有其他值得分享的点再继续补充。如有需要交流,可联系我email/qq。
参考
nsq基础知识与简单demo
https://blog.puresai.com/2021/07/31/360/