bulkinserter.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package mon
  2. import (
  3. "context"
  4. "time"
  5. "github.com/wuntsong-org/go-zero-plus/core/executors"
  6. "github.com/wuntsong-org/go-zero-plus/core/logx"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. )
  9. const (
  10. flushInterval = time.Second
  11. maxBulkRows = 1000
  12. )
  13. type (
  14. // ResultHandler is a handler that used to handle results.
  15. ResultHandler func(*mongo.InsertManyResult, error)
  16. // A BulkInserter is used to insert bulk of mongo records.
  17. BulkInserter struct {
  18. executor *executors.PeriodicalExecutor
  19. inserter *dbInserter
  20. }
  21. )
  22. // NewBulkInserter returns a BulkInserter.
  23. func NewBulkInserter(coll Collection, interval ...time.Duration) (*BulkInserter, error) {
  24. cloneColl, err := coll.Clone()
  25. if err != nil {
  26. return nil, err
  27. }
  28. inserter := &dbInserter{
  29. collection: cloneColl,
  30. }
  31. duration := flushInterval
  32. if len(interval) > 0 {
  33. duration = interval[0]
  34. }
  35. return &BulkInserter{
  36. executor: executors.NewPeriodicalExecutor(duration, inserter),
  37. inserter: inserter,
  38. }, nil
  39. }
  40. // Flush flushes the inserter, writes all pending records.
  41. func (bi *BulkInserter) Flush() {
  42. bi.executor.Flush()
  43. }
  44. // Insert inserts doc.
  45. func (bi *BulkInserter) Insert(doc any) {
  46. bi.executor.Add(doc)
  47. }
  48. // SetResultHandler sets the result handler.
  49. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
  50. bi.executor.Sync(func() {
  51. bi.inserter.resultHandler = handler
  52. })
  53. }
  54. type dbInserter struct {
  55. collection *mongo.Collection
  56. documents []any
  57. resultHandler ResultHandler
  58. }
  59. func (in *dbInserter) AddTask(doc any) bool {
  60. in.documents = append(in.documents, doc)
  61. return len(in.documents) >= maxBulkRows
  62. }
  63. func (in *dbInserter) Execute(objs any) {
  64. docs := objs.([]any)
  65. if len(docs) == 0 {
  66. return
  67. }
  68. result, err := in.collection.InsertMany(context.Background(), docs)
  69. if in.resultHandler != nil {
  70. in.resultHandler(result, err)
  71. } else if err != nil {
  72. logx.Error(err)
  73. }
  74. }
  75. func (in *dbInserter) RemoveAll() any {
  76. documents := in.documents
  77. in.documents = nil
  78. return documents
  79. }