connection.go 932 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package dq
  2. import (
  3. "sync"
  4. "github.com/beanstalkd/go-beanstalk"
  5. )
  6. type connection struct {
  7. lock sync.RWMutex
  8. endpoint string
  9. tube string
  10. conn *beanstalk.Conn
  11. }
  12. func newConnection(endpint, tube string) *connection {
  13. return &connection{
  14. endpoint: endpint,
  15. tube: tube,
  16. }
  17. }
  18. func (c *connection) Close() error {
  19. c.lock.Lock()
  20. conn := c.conn
  21. c.conn = nil
  22. defer c.lock.Unlock()
  23. if conn != nil {
  24. return conn.Close()
  25. }
  26. return nil
  27. }
  28. func (c *connection) get() (*beanstalk.Conn, error) {
  29. c.lock.RLock()
  30. conn := c.conn
  31. c.lock.RUnlock()
  32. if conn != nil {
  33. return conn, nil
  34. }
  35. c.lock.Lock()
  36. defer c.lock.Unlock()
  37. var err error
  38. c.conn, err = beanstalk.Dial("tcp", c.endpoint)
  39. if err != nil {
  40. return nil, err
  41. }
  42. c.conn.Tube.Name = c.tube
  43. return c.conn, err
  44. }
  45. func (c *connection) reset() {
  46. c.lock.Lock()
  47. defer c.lock.Unlock()
  48. if c.conn != nil {
  49. c.conn.Close()
  50. c.conn = nil
  51. }
  52. }