cluster.go 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package main
  2. import (
  3. "flag"
  4. "log"
  5. "zero/core/logx"
  6. "zero/core/queue"
  7. "zero/core/service"
  8. "zero/core/stores/redis"
  9. "zero/rq"
  10. )
  11. var (
  12. host = flag.String("s", "10.24.232.63:7002", "server address")
  13. mode = flag.String("m", "queue", "cluster test mode")
  14. )
  15. type bridgeHandler struct {
  16. pusher queue.QueuePusher
  17. }
  18. func newBridgeHandler() rq.ConsumeHandler {
  19. return bridgeHandler{}
  20. }
  21. func (h bridgeHandler) Consume(str string) error {
  22. logx.Info("=>", str)
  23. return nil
  24. }
  25. func main() {
  26. flag.Parse()
  27. if *mode == "queue" {
  28. mq, err := rq.NewMessageQueue(rq.RmqConf{
  29. ServiceConf: service.ServiceConf{
  30. Log: logx.LogConf{
  31. Path: "logs",
  32. },
  33. },
  34. Redis: redis.RedisKeyConf{
  35. RedisConf: redis.RedisConf{
  36. Host: *host,
  37. Type: "cluster",
  38. },
  39. Key: "notexist",
  40. },
  41. NumProducers: 1,
  42. }, rq.WithHandler(newBridgeHandler()))
  43. if err != nil {
  44. log.Fatal(err)
  45. }
  46. defer mq.Stop()
  47. mq.Start()
  48. } else {
  49. rds := redis.NewRedis(*host, "cluster")
  50. rds.Llen("notexist")
  51. select {}
  52. }
  53. }