bulkexecutor.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package executors
  2. import (
  3. "time"
  4. )
  5. const defaultBulkTasks = 1000
  6. type (
  7. BulkOption func(options *bulkOptions)
  8. BulkExecutor struct {
  9. executor *PeriodicalExecutor
  10. container *bulkContainer
  11. }
  12. bulkOptions struct {
  13. cachedTasks int
  14. flushInterval time.Duration
  15. }
  16. )
  17. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  18. options := newBulkOptions()
  19. for _, opt := range opts {
  20. opt(&options)
  21. }
  22. container := &bulkContainer{
  23. execute: execute,
  24. maxTasks: options.cachedTasks,
  25. }
  26. executor := &BulkExecutor{
  27. executor: NewPeriodicalExecutor(options.flushInterval, container),
  28. container: container,
  29. }
  30. return executor
  31. }
  32. func (be *BulkExecutor) Add(task interface{}) error {
  33. be.executor.Add(task)
  34. return nil
  35. }
  36. func (be *BulkExecutor) Flush() {
  37. be.executor.Flush()
  38. }
  39. func (be *BulkExecutor) Wait() {
  40. be.executor.Wait()
  41. }
  42. func WithBulkTasks(tasks int) BulkOption {
  43. return func(options *bulkOptions) {
  44. options.cachedTasks = tasks
  45. }
  46. }
  47. func WithBulkInterval(duration time.Duration) BulkOption {
  48. return func(options *bulkOptions) {
  49. options.flushInterval = duration
  50. }
  51. }
  52. func newBulkOptions() bulkOptions {
  53. return bulkOptions{
  54. cachedTasks: defaultBulkTasks,
  55. flushInterval: defaultFlushInterval,
  56. }
  57. }
  58. type bulkContainer struct {
  59. tasks []interface{}
  60. execute Execute
  61. maxTasks int
  62. }
  63. func (bc *bulkContainer) AddTask(task interface{}) bool {
  64. bc.tasks = append(bc.tasks, task)
  65. return len(bc.tasks) >= bc.maxTasks
  66. }
  67. func (bc *bulkContainer) Execute(tasks interface{}) {
  68. vals := tasks.([]interface{})
  69. bc.execute(vals)
  70. }
  71. func (bc *bulkContainer) RemoveAll() interface{} {
  72. tasks := bc.tasks
  73. bc.tasks = nil
  74. return tasks
  75. }