bulkinserter.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package mon
  2. import (
  3. "context"
  4. "time"
  5. "github.com/zeromicro/go-zero/core/executors"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. )
  9. const (
  10. flushInterval = time.Second
  11. maxBulkRows = 1000
  12. )
  13. type (
  14. // ResultHandler is a handler that used to handle results.
  15. ResultHandler func(*mongo.InsertManyResult, error)
  16. // A BulkInserter is used to insert bulk of mongo records.
  17. BulkInserter struct {
  18. executor *executors.PeriodicalExecutor
  19. inserter *dbInserter
  20. }
  21. )
  22. // NewBulkInserter returns a BulkInserter.
  23. func NewBulkInserter(coll *mongo.Collection, interval ...time.Duration) *BulkInserter {
  24. inserter := &dbInserter{
  25. collection: coll,
  26. }
  27. duration := flushInterval
  28. if len(interval) > 0 {
  29. duration = interval[0]
  30. }
  31. return &BulkInserter{
  32. executor: executors.NewPeriodicalExecutor(duration, inserter),
  33. inserter: inserter,
  34. }
  35. }
  36. // Flush flushes the inserter, writes all pending records.
  37. func (bi *BulkInserter) Flush() {
  38. bi.executor.Flush()
  39. }
  40. // Insert inserts doc.
  41. func (bi *BulkInserter) Insert(doc interface{}) {
  42. bi.executor.Add(doc)
  43. }
  44. // SetResultHandler sets the result handler.
  45. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
  46. bi.executor.Sync(func() {
  47. bi.inserter.resultHandler = handler
  48. })
  49. }
  50. type dbInserter struct {
  51. collection *mongo.Collection
  52. documents []interface{}
  53. resultHandler ResultHandler
  54. }
  55. func (in *dbInserter) AddTask(doc interface{}) bool {
  56. in.documents = append(in.documents, doc)
  57. return len(in.documents) >= maxBulkRows
  58. }
  59. func (in *dbInserter) Execute(objs interface{}) {
  60. docs := objs.([]interface{})
  61. if len(docs) == 0 {
  62. return
  63. }
  64. result, err := in.collection.InsertMany(context.Background(), docs)
  65. if in.resultHandler != nil {
  66. in.resultHandler(result, err)
  67. } else if err != nil {
  68. logx.Error(err)
  69. }
  70. }
  71. func (in *dbInserter) RemoveAll() interface{} {
  72. documents := in.documents
  73. in.documents = nil
  74. return documents
  75. }