singleflight.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package syncx
  2. import "sync"
  3. type (
  4. // SharedCalls is an alias of SingleFlight.
  5. // Deprecated: use SingleFlight.
  6. SharedCalls = SingleFlight
  7. // SingleFlight lets the concurrent calls with the same key to share the call result.
  8. // For example, A called F, before it's done, B called F. Then B would not execute F,
  9. // and shared the result returned by F which called by A.
  10. // The calls with the same key are dependent, concurrent calls share the returned values.
  11. // A ------->calls F with key<------------------->returns val
  12. // B --------------------->calls F with key------>returns val
  13. SingleFlight interface {
  14. Do(key string, fn func() (interface{}, error)) (interface{}, error)
  15. DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  16. }
  17. call struct {
  18. wg sync.WaitGroup
  19. val interface{}
  20. err error
  21. }
  22. flightGroup struct {
  23. calls map[string]*call
  24. lock sync.Mutex
  25. }
  26. )
  27. // NewSingleFlight returns a SingleFlight.
  28. func NewSingleFlight() SingleFlight {
  29. return &flightGroup{
  30. calls: make(map[string]*call),
  31. }
  32. }
  33. // NewSharedCalls returns a SingleFlight.
  34. // Deprecated: use NewSingleFlight.
  35. func NewSharedCalls() SingleFlight {
  36. return NewSingleFlight()
  37. }
  38. func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  39. c, done := g.createCall(key)
  40. if done {
  41. return c.val, c.err
  42. }
  43. g.makeCall(c, key, fn)
  44. return c.val, c.err
  45. }
  46. func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
  47. c, done := g.createCall(key)
  48. if done {
  49. return c.val, false, c.err
  50. }
  51. g.makeCall(c, key, fn)
  52. return c.val, true, c.err
  53. }
  54. func (g *flightGroup) createCall(key string) (c *call, done bool) {
  55. g.lock.Lock()
  56. if c, ok := g.calls[key]; ok {
  57. g.lock.Unlock()
  58. c.wg.Wait()
  59. return c, true
  60. }
  61. c = new(call)
  62. c.wg.Add(1)
  63. g.calls[key] = c
  64. g.lock.Unlock()
  65. return c, false
  66. }
  67. func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
  68. defer func() {
  69. g.lock.Lock()
  70. delete(g.calls, key)
  71. g.lock.Unlock()
  72. c.wg.Done()
  73. }()
  74. c.val, c.err = fn()
  75. }