bulkinserter.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package mon
  2. import (
  3. "context"
  4. "time"
  5. "github.com/zeromicro/go-zero/core/executors"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. )
  9. const (
  10. flushInterval = time.Second
  11. maxBulkRows = 1000
  12. )
  13. type (
  14. // ResultHandler is a handler that used to handle results.
  15. ResultHandler func(*mongo.InsertManyResult, error)
  16. // A BulkInserter is used to insert bulk of mongo records.
  17. BulkInserter struct {
  18. executor *executors.PeriodicalExecutor
  19. inserter *dbInserter
  20. }
  21. )
  22. // Deprecated. Use NewBatchInserter instead.
  23. func NewBulkInserter(coll *mongo.Collection, interval ...time.Duration) *BulkInserter {
  24. return newBulkInserter(coll, interval...)
  25. }
  26. // NewBatchInserter returns a BulkInserter.
  27. func NewBatchInserter(coll Collection, interval ...time.Duration) (*BulkInserter, error) {
  28. cloneColl, err := coll.Clone()
  29. if err != nil {
  30. return nil, err
  31. }
  32. return newBulkInserter(cloneColl, interval...), nil
  33. }
  34. // newBulkInserter returns a BulkInserter.
  35. func newBulkInserter(coll *mongo.Collection, interval ...time.Duration) *BulkInserter {
  36. inserter := &dbInserter{
  37. collection: coll,
  38. }
  39. duration := flushInterval
  40. if len(interval) > 0 {
  41. duration = interval[0]
  42. }
  43. return &BulkInserter{
  44. executor: executors.NewPeriodicalExecutor(duration, inserter),
  45. inserter: inserter,
  46. }
  47. }
  48. // Flush flushes the inserter, writes all pending records.
  49. func (bi *BulkInserter) Flush() {
  50. bi.executor.Flush()
  51. }
  52. // Insert inserts doc.
  53. func (bi *BulkInserter) Insert(doc interface{}) {
  54. bi.executor.Add(doc)
  55. }
  56. // SetResultHandler sets the result handler.
  57. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
  58. bi.executor.Sync(func() {
  59. bi.inserter.resultHandler = handler
  60. })
  61. }
  62. type dbInserter struct {
  63. collection *mongo.Collection
  64. documents []interface{}
  65. resultHandler ResultHandler
  66. }
  67. func (in *dbInserter) AddTask(doc interface{}) bool {
  68. in.documents = append(in.documents, doc)
  69. return len(in.documents) >= maxBulkRows
  70. }
  71. func (in *dbInserter) Execute(objs interface{}) {
  72. docs := objs.([]interface{})
  73. if len(docs) == 0 {
  74. return
  75. }
  76. result, err := in.collection.InsertMany(context.Background(), docs)
  77. if in.resultHandler != nil {
  78. in.resultHandler(result, err)
  79. } else if err != nil {
  80. logx.Error(err)
  81. }
  82. }
  83. func (in *dbInserter) RemoveAll() interface{} {
  84. documents := in.documents
  85. in.documents = nil
  86. return documents
  87. }