serverchange.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package update
  2. import (
  3. "crypto/md5"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "zero/core/hash"
  9. "zero/core/jsonx"
  10. "zero/rq/internal"
  11. )
  12. var ErrInvalidServerChange = errors.New("not a server change message")
  13. type (
  14. weightedKey struct {
  15. Key string
  16. Weight int
  17. }
  18. Snapshot struct {
  19. Keys []string
  20. WeightedKeys []weightedKey
  21. }
  22. ServerChange struct {
  23. Previous Snapshot
  24. Current Snapshot
  25. Servers []string
  26. }
  27. )
  28. func (s Snapshot) GetCode() string {
  29. keys := append([]string(nil), s.Keys...)
  30. sort.Strings(keys)
  31. weightedKeys := append([]weightedKey(nil), s.WeightedKeys...)
  32. sort.SliceStable(weightedKeys, func(i, j int) bool {
  33. return weightedKeys[i].Key < weightedKeys[j].Key
  34. })
  35. digest := md5.New()
  36. for _, key := range keys {
  37. io.WriteString(digest, fmt.Sprintf("%s\n", key))
  38. }
  39. for _, wkey := range weightedKeys {
  40. io.WriteString(digest, fmt.Sprintf("%s:%d\n", wkey.Key, wkey.Weight))
  41. }
  42. return fmt.Sprintf("%x", digest.Sum(nil))
  43. }
  44. func (sc ServerChange) CreateCurrentHash() *hash.ConsistentHash {
  45. curHash := hash.NewConsistentHash()
  46. for _, key := range sc.Current.Keys {
  47. curHash.Add(key)
  48. }
  49. for _, wkey := range sc.Current.WeightedKeys {
  50. curHash.AddWithWeight(wkey.Key, wkey.Weight)
  51. }
  52. return curHash
  53. }
  54. func (sc ServerChange) CreatePrevHash() *hash.ConsistentHash {
  55. prevHash := hash.NewConsistentHash()
  56. for _, key := range sc.Previous.Keys {
  57. prevHash.Add(key)
  58. }
  59. for _, wkey := range sc.Previous.WeightedKeys {
  60. prevHash.AddWithWeight(wkey.Key, wkey.Weight)
  61. }
  62. return prevHash
  63. }
  64. func (sc ServerChange) GetCode() string {
  65. return sc.Current.GetCode()
  66. }
  67. func IsServerChange(message string) bool {
  68. return len(message) > 0 && message[0] == internal.ServerSensitivePrefix
  69. }
  70. func (sc ServerChange) Marshal() (string, error) {
  71. body, err := jsonx.Marshal(sc)
  72. if err != nil {
  73. return "", err
  74. }
  75. return string(append([]byte{internal.ServerSensitivePrefix}, body...)), nil
  76. }
  77. func UnmarshalServerChange(body string) (ServerChange, error) {
  78. if len(body) == 0 {
  79. return ServerChange{}, ErrInvalidServerChange
  80. }
  81. var change ServerChange
  82. err := jsonx.UnmarshalFromString(body[1:], &change)
  83. return change, err
  84. }