kevin пре 4 година
родитељ
комит
f994e1df1a
3 измењених фајлова са 98 додато и 9 уклоњено
  1. 7 7
      core/breaker/breaker.go
  2. 89 0
      core/breaker/breaker_test.go
  3. 2 2
      core/mr/mapreduce.go

+ 7 - 7
core/breaker/breaker.go

@@ -5,12 +5,12 @@ import (
 	"fmt"
 	"strings"
 	"sync"
-	"time"
 
 	"github.com/tal-tech/go-zero/core/mathx"
 	"github.com/tal-tech/go-zero/core/proc"
 	"github.com/tal-tech/go-zero/core/stat"
 	"github.com/tal-tech/go-zero/core/stringx"
+	"github.com/tal-tech/go-zero/core/timex"
 )
 
 const (
@@ -195,23 +195,23 @@ type errorWindow struct {
 
 func (ew *errorWindow) add(reason string) {
 	ew.lock.Lock()
-	ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
+	ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
 	ew.index = (ew.index + 1) % numHistoryReasons
 	ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
 	ew.lock.Unlock()
 }
 
 func (ew *errorWindow) String() string {
-	var builder strings.Builder
+	var reasons []string
 
 	ew.lock.Lock()
-	for i := ew.index + ew.count - 1; i >= ew.index; i-- {
-		builder.WriteString(ew.reasons[i%numHistoryReasons])
-		builder.WriteByte('\n')
+	// reverse order
+	for i := ew.index - 1; i >= ew.index-ew.count; i-- {
+		reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
 	}
 	ew.lock.Unlock()
 
-	return builder.String()
+	return strings.Join(reasons, "\n")
 }
 
 type promiseWithReason struct {

+ 89 - 0
core/breaker/breaker_test.go

@@ -2,7 +2,9 @@ package breaker
 
 import (
 	"errors"
+	"fmt"
 	"strconv"
+	"strings"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
@@ -33,6 +35,84 @@ func TestLogReason(t *testing.T) {
 	assert.Equal(t, numHistoryReasons, errs.count)
 }
 
+func TestErrorWindow(t *testing.T) {
+	tests := []struct {
+		name    string
+		reasons []string
+	}{
+		{
+			name: "no error",
+		},
+		{
+			name:    "one error",
+			reasons: []string{"foo"},
+		},
+		{
+			name:    "two errors",
+			reasons: []string{"foo", "bar"},
+		},
+		{
+			name:    "five errors",
+			reasons: []string{"first", "second", "third", "fourth", "fifth"},
+		},
+		{
+			name:    "six errors",
+			reasons: []string{"first", "second", "third", "fourth", "fifth", "sixth"},
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			var ew errorWindow
+			for _, reason := range test.reasons {
+				ew.add(reason)
+			}
+			var reasons []string
+			if len(test.reasons) > numHistoryReasons {
+				reasons = test.reasons[len(test.reasons)-numHistoryReasons:]
+			} else {
+				reasons = test.reasons
+			}
+			for _, reason := range reasons {
+				assert.True(t, strings.Contains(ew.String(), reason), fmt.Sprintf("actual: %s", ew.String()))
+			}
+		})
+	}
+}
+
+func TestPromiseWithReason(t *testing.T) {
+	tests := []struct {
+		name   string
+		reason string
+		expect string
+	}{
+		{
+			name: "success",
+		},
+		{
+			name:   "success",
+			reason: "fail",
+			expect: "fail",
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			promise := promiseWithReason{
+				promise: new(mockedPromise),
+				errWin:  new(errorWindow),
+			}
+			if len(test.reason) == 0 {
+				promise.Accept()
+			} else {
+				promise.Reject(test.reason)
+			}
+
+			assert.True(t, strings.Contains(promise.errWin.String(), test.expect))
+		})
+	}
+}
+
 func BenchmarkGoogleBreaker(b *testing.B) {
 	br := NewBreaker()
 	for i := 0; i < b.N; i++ {
@@ -41,3 +121,12 @@ func BenchmarkGoogleBreaker(b *testing.B) {
 		})
 	}
 }
+
+type mockedPromise struct {
+}
+
+func (m *mockedPromise) Accept() {
+}
+
+func (m *mockedPromise) Reject() {
+}

+ 2 - 2
core/mr/mapreduce.go

@@ -127,8 +127,8 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
 		drain(collector)
 	}()
 
-	go executeMappers(func(item interface{}, writer Writer) {
-		mapper(item, writer, cancel)
+	go executeMappers(func(item interface{}, w Writer) {
+		mapper(item, w, cancel)
 	}, source, collector, done.Done(), options.workers)
 
 	value, ok := <-output