chunkexecutor_test.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package executors
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. func TestChunkExecutor(t *testing.T) {
  9. var values []int
  10. var lock sync.Mutex
  11. executor := NewChunkExecutor(func(items []any) {
  12. lock.Lock()
  13. values = append(values, len(items))
  14. lock.Unlock()
  15. }, WithChunkBytes(10), WithFlushInterval(time.Minute))
  16. for i := 0; i < 50; i++ {
  17. executor.Add(1, 1)
  18. time.Sleep(time.Millisecond)
  19. }
  20. lock.Lock()
  21. assert.True(t, len(values) > 0)
  22. // ignore last value
  23. for i := 0; i < len(values); i++ {
  24. assert.Equal(t, 10, values[i])
  25. }
  26. lock.Unlock()
  27. }
  28. func TestChunkExecutorFlushInterval(t *testing.T) {
  29. const (
  30. caches = 10
  31. size = 5
  32. )
  33. var wait sync.WaitGroup
  34. wait.Add(1)
  35. executor := NewChunkExecutor(func(items []any) {
  36. assert.Equal(t, size, len(items))
  37. wait.Done()
  38. }, WithChunkBytes(caches), WithFlushInterval(time.Millisecond*100))
  39. for i := 0; i < size; i++ {
  40. executor.Add(1, 1)
  41. }
  42. wait.Wait()
  43. }
  44. func TestChunkExecutorEmpty(t *testing.T) {
  45. executor := NewChunkExecutor(func(items []any) {
  46. assert.Fail(t, "should not called")
  47. }, WithChunkBytes(10), WithFlushInterval(time.Millisecond))
  48. time.Sleep(time.Millisecond * 100)
  49. executor.Wait()
  50. }
  51. func TestChunkExecutorFlush(t *testing.T) {
  52. const (
  53. caches = 10
  54. tasks = 5
  55. )
  56. var wait sync.WaitGroup
  57. wait.Add(1)
  58. be := NewChunkExecutor(func(items []any) {
  59. assert.Equal(t, tasks, len(items))
  60. wait.Done()
  61. }, WithChunkBytes(caches), WithFlushInterval(time.Minute))
  62. for i := 0; i < tasks; i++ {
  63. be.Add(1, 1)
  64. }
  65. be.Flush()
  66. wait.Wait()
  67. }
  68. func BenchmarkChunkExecutor(b *testing.B) {
  69. b.ReportAllocs()
  70. be := NewChunkExecutor(func(tasks []any) {
  71. time.Sleep(time.Millisecond * time.Duration(len(tasks)))
  72. })
  73. for i := 0; i < b.N; i++ {
  74. time.Sleep(time.Microsecond * 200)
  75. be.Add(1, 1)
  76. }
  77. be.Flush()
  78. }