poller.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "time"
  8. "zero/core/discov"
  9. "zero/core/lang"
  10. "zero/core/logx"
  11. "zero/core/service"
  12. "zero/core/stores/redis"
  13. "zero/rq"
  14. )
  15. var (
  16. redisHost = flag.String("redis", "localhost:6379", "")
  17. redisType = flag.String("type", "node", "")
  18. redisKey = flag.String("key", "queue", "")
  19. producers = flag.Int("producers", 1, "")
  20. dropBefore = flag.Int64("drop", 0, "messages before seconds to drop")
  21. )
  22. type Consumer struct {
  23. lock sync.Mutex
  24. resources map[string]interface{}
  25. }
  26. func NewConsumer() *Consumer {
  27. return &Consumer{
  28. resources: make(map[string]interface{}),
  29. }
  30. }
  31. func (c *Consumer) Consume(msg string) error {
  32. fmt.Println("=>", msg)
  33. c.lock.Lock()
  34. defer c.lock.Unlock()
  35. c.resources[msg] = lang.Placeholder
  36. return nil
  37. }
  38. func (c *Consumer) OnEvent(event interface{}) {
  39. fmt.Printf("event: %+v\n", event)
  40. }
  41. func main() {
  42. flag.Parse()
  43. consumer := NewConsumer()
  44. q, err := rq.NewMessageQueue(rq.RmqConf{
  45. ServiceConf: service.ServiceConf{
  46. Name: "queue",
  47. Log: logx.LogConf{
  48. Path: "logs",
  49. KeepDays: 3,
  50. Compress: true,
  51. },
  52. },
  53. Redis: redis.RedisKeyConf{
  54. RedisConf: redis.RedisConf{
  55. Host: *redisHost,
  56. Type: *redisType,
  57. },
  58. Key: *redisKey,
  59. },
  60. Etcd: discov.EtcdConf{
  61. Hosts: []string{
  62. "localhost:2379",
  63. },
  64. Key: "queue",
  65. },
  66. DropBefore: *dropBefore,
  67. NumProducers: *producers,
  68. }, rq.WithHandler(consumer), rq.WithRenewId(time.Now().UnixNano()))
  69. if err != nil {
  70. log.Fatal(err)
  71. }
  72. defer q.Stop()
  73. q.Start()
  74. }