123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package executors
- import (
- "time"
- )
- const defaultBulkTasks = 1000
- type (
- BulkOption func(options *bulkOptions)
- BulkExecutor struct {
- executor *PeriodicalExecutor
- container *bulkContainer
- }
- bulkOptions struct {
- cachedTasks int
- flushInterval time.Duration
- }
- )
- func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
- options := newBulkOptions()
- for _, opt := range opts {
- opt(&options)
- }
- container := &bulkContainer{
- execute: execute,
- maxTasks: options.cachedTasks,
- }
- executor := &BulkExecutor{
- executor: NewPeriodicalExecutor(options.flushInterval, container),
- container: container,
- }
- return executor
- }
- func (be *BulkExecutor) Add(task interface{}) error {
- be.executor.Add(task)
- return nil
- }
- func (be *BulkExecutor) Flush() {
- be.executor.Flush()
- }
- func (be *BulkExecutor) Wait() {
- be.executor.Wait()
- }
- func WithBulkTasks(tasks int) BulkOption {
- return func(options *bulkOptions) {
- options.cachedTasks = tasks
- }
- }
- func WithBulkInterval(duration time.Duration) BulkOption {
- return func(options *bulkOptions) {
- options.flushInterval = duration
- }
- }
- func newBulkOptions() bulkOptions {
- return bulkOptions{
- cachedTasks: defaultBulkTasks,
- flushInterval: defaultFlushInterval,
- }
- }
- type bulkContainer struct {
- tasks []interface{}
- execute Execute
- maxTasks int
- }
- func (bc *bulkContainer) AddTask(task interface{}) bool {
- bc.tasks = append(bc.tasks, task)
- return len(bc.tasks) >= bc.maxTasks
- }
- func (bc *bulkContainer) Execute(tasks interface{}) {
- vals := tasks.([]interface{})
- bc.execute(vals)
- }
- func (bc *bulkContainer) RemoveAll() interface{} {
- tasks := bc.tasks
- bc.tasks = nil
- return tasks
- }
|