consumernode.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package dq
  2. import (
  3. "time"
  4. "zero/core/logx"
  5. "zero/core/syncx"
  6. "github.com/beanstalkd/go-beanstalk"
  7. )
  8. type (
  9. consumerNode struct {
  10. conn *connection
  11. tube string
  12. on *syncx.AtomicBool
  13. }
  14. consumeService struct {
  15. c *consumerNode
  16. consume Consume
  17. }
  18. )
  19. func newConsumerNode(endpoint, tube string) *consumerNode {
  20. return &consumerNode{
  21. conn: newConnection(endpoint, tube),
  22. tube: tube,
  23. on: syncx.ForAtomicBool(true),
  24. }
  25. }
  26. func (c *consumerNode) dispose() {
  27. c.on.Set(false)
  28. }
  29. func (c *consumerNode) consumeEvents(consume Consume) {
  30. for c.on.True() {
  31. conn, err := c.conn.get()
  32. if err != nil {
  33. logx.Error(err)
  34. time.Sleep(time.Second)
  35. continue
  36. }
  37. // because getting conn takes at most one second, reserve tasks at most 5 seconds,
  38. // if don't check on/off here, the conn might not be closed due to
  39. // graceful shutdon waits at most 5.5 seconds.
  40. if !c.on.True() {
  41. break
  42. }
  43. conn.Tube.Name = c.tube
  44. conn.TubeSet.Name[c.tube] = true
  45. id, body, err := conn.Reserve(reserveTimeout)
  46. if err == nil {
  47. conn.Delete(id)
  48. consume(body)
  49. continue
  50. }
  51. // the error can only be beanstalk.NameError or beanstalk.ConnError
  52. switch cerr := err.(type) {
  53. case beanstalk.ConnError:
  54. switch cerr.Err {
  55. case beanstalk.ErrTimeout:
  56. // timeout error on timeout, just continue the loop
  57. case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
  58. beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
  59. beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
  60. // won't reset
  61. logx.Error(err)
  62. default:
  63. // beanstalk.ErrOOM, beanstalk.ErrUnknown and other errors
  64. logx.Error(err)
  65. c.conn.reset()
  66. time.Sleep(time.Second)
  67. }
  68. default:
  69. logx.Error(err)
  70. }
  71. }
  72. if err := c.conn.Close(); err != nil {
  73. logx.Error(err)
  74. }
  75. }
  76. func (cs consumeService) Start() {
  77. cs.c.consumeEvents(cs.consume)
  78. }
  79. func (cs consumeService) Stop() {
  80. cs.c.dispose()
  81. }