NSQ
A realtime distributed messaging platform
优势
- 基于golang
- 分布式
- 水平扩展
- 自带UI,操作友好
- 多语言client
组件
组件 |
功能 |
nsqd |
接收、排队和向客户端传递消息的守护进程 |
nsqlookupd |
管理拓扑信息的守护进程 |
nsqadmin |
Web UI,用于实时查看聚合的集群统计信息并执行各种管理任务 |
utilities |
常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq |
nsqd
它可以独立运行,但通常与nsqlookupd 实例一起配置在集群中(在这种情况下,它将宣布主题和频道以供发现)。
它侦听两个 TCP 端口,一个用于客户端,另一个用于 HTTP API。它可以选择在第三个端口上侦听 HTTPS。
nsqlookupd
有两个接口:nsqd用于广播的TCP 接口和用于客户端执行发现和管理操作的 HTTP 接口。
MAC安装
brew install nsq
nsqlookupd
nsqd –lookupd-tcp-address=127.0.0.1:4160 –broadcast-address=127.0.0.1
nsqadmin –lookupd-http-address=127.0.0.1:4161
go-nsq
package nsq
import ( "context" "fmt" "time"
"github.com/nsqio/go-nsq" "github.com/spf13/cast" )
func Send(ctx context.Context, cancel context.CancelFunc, topic string) { defer cancel() str := "127.0.0.1:4150" fmt.Println("address: ", str) producer, err := nsq.NewProducer(str, nsq.NewConfig()) if err != nil { panic(err) }
producer.SetLogger(nil, 0)
for i := 0; i < 5; i++ { msg := "puresai, " + cast.ToString(i) fmt.Println("publish", msg, producer.Publish(topic, []byte(msg))) time.Sleep(time.Second * 1) }
<-ctx.Done() producer.Stop() fmt.Println("producer exit") }
|
package nsq
import ( "context" "fmt" "time"
"github.com/nsqio/go-nsq" )
type Consumer struct{}
func Receive(ctx context.Context, cancel context.CancelFunc, topic string) { defer cancel()
channel := topic + "-channel" cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second * 2 c, err := nsq.NewConsumer(topic, channel, cfg) if err != nil { panic(err) } c.SetLogger(nil, 0) c.AddConcurrentHandlers(&Consumer{}, 3)
if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150"}); err != nil { panic(err) } <-ctx.Done() c.Stop() fmt.Println("consumer exit") }
func (*Consumer) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil }
|
package nsq
import ( "context" "os" "os/signal" "syscall" "testing" "time" )
func TestReceive(t *testing.T) { topic := "sai0556" ctx, cancel := context.WithCancel(context.Background()) defer cancel() go Send(ctx, cancel, topic) go Receive(ctx, cancel, topic)
sig := make(chan os.Signal) signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) t.Log("开始监听")
select { case <-ctx.Done(): t.Log("ctx done") return case <-sig: t.Log("signal exit...") cancel() time.Sleep(2 * time.Second) return }
}
|
测试走一波,

对于NSQ,自己也是刚刚使用,给我的感觉是相当好上手,之前有用过RabbitMQ,nsq相比来说更简单,可能是go-client相对好用一些吧。
后续有其他值得分享的点再继续补充。如有需要交流,可联系我email/qq。
参考