瀏覽代碼

try grpc lb interface

kevin 4 年之前
父節點
當前提交
353cb5000f
共有 4 個文件被更改,包括 80 次插入20 次删除
  1. 6 4
      rpcx/internal/balancer/roundrobin/roundrobin.go
  2. 36 0
      rpcx/internal/discovclient.go
  3. 21 16
      rpcx/internal/resolver/resolver.go
  4. 17 0
      rpcx/lb/main.go

+ 6 - 4
rpcx/internal/balancer/roundrobin.go → rpcx/internal/balancer/roundrobin/roundrobin.go

@@ -1,7 +1,8 @@
-package balancer
+package roundrobin
 
 import (
 	"context"
+	"fmt"
 	"math/rand"
 	"sync"
 	"time"
@@ -11,16 +12,16 @@ import (
 	"google.golang.org/grpc/resolver"
 )
 
-const Name = "roundrobin"
+const Name = "zero_rr"
 
 func init() {
-	balancer.Register(newBuilder())
+	balancer.Register(newRoundRobinBuilder())
 }
 
 type roundRobinPickerBuilder struct {
 }
 
-func newBuilder() balancer.Builder {
+func newRoundRobinBuilder() balancer.Builder {
 	return base.NewBalancerBuilder(Name, new(roundRobinPickerBuilder))
 }
 
@@ -48,6 +49,7 @@ type roundRobinPicker struct {
 
 func (p *roundRobinPicker) Pick(ctx context.Context, info balancer.PickInfo) (
 	conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
+	fmt.Println(p.conns)
 	p.lock.Lock()
 	defer p.lock.Unlock()
 

+ 36 - 0
rpcx/internal/discovclient.go

@@ -0,0 +1,36 @@
+package internal
+
+import (
+	"zero/core/discov"
+	"zero/rpcx/internal/balancer/roundrobin"
+	"zero/rpcx/internal/resolver"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/connectivity"
+)
+
+type DiscovClient struct {
+	conn *grpc.ClientConn
+}
+
+func NewDiscovClient(etcd discov.EtcdConf, opts ...ClientOption) (*DiscovClient, error) {
+	resolver.RegisterResolver(etcd)
+	opts = append(opts, WithDialOption(grpc.WithBalancerName(roundrobin.Name)))
+	conn, err := dial("discov:///", opts...)
+	if err != nil {
+		return nil, err
+	}
+
+	return &DiscovClient{
+		conn: conn,
+	}, nil
+}
+
+func (c *DiscovClient) Next() (*grpc.ClientConn, bool) {
+	state := c.conn.GetState()
+	if state == connectivity.Ready {
+		return c.conn, true
+	} else {
+		return nil, false
+	}
+}

+ 21 - 16
rpcx/internal/resolver/resolver.go

@@ -6,15 +6,19 @@ import (
 	"google.golang.org/grpc/resolver"
 )
 
-type discovResolver struct {
-	scheme string
-	etcd   discov.EtcdConf
-	cc     resolver.ClientConn
+const discovScheme = "discov"
+
+type discovBuilder struct {
+	etcd discov.EtcdConf
 }
 
-func (r *discovResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
+func (b *discovBuilder) Scheme() string {
+	return discovScheme
+}
+
+func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
 	resolver.Resolver, error) {
-	sub, err := discov.NewSubscriber(r.etcd.Hosts, r.etcd.Key)
+	sub, err := discov.NewSubscriber(b.etcd.Hosts, b.etcd.Key)
 	if err != nil {
 		return nil, err
 	}
@@ -27,27 +31,28 @@ func (r *discovResolver) Build(target resolver.Target, cc resolver.ClientConn, o
 				Addr: val,
 			})
 		}
-		r.cc.UpdateState(resolver.State{
+		cc.UpdateState(resolver.State{
 			Addresses: addrs,
 		})
 	})
 
-	return r, nil
+	return &discovResolver{
+		cc: cc,
+	}, nil
 }
 
-func (r *discovResolver) Close() {
+type discovResolver struct {
+	cc resolver.ClientConn
 }
 
-func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) {
+func (r *discovResolver) Close() {
 }
 
-func (r *discovResolver) Scheme() string {
-	return r.scheme
+func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) {
 }
 
-func RegisterResolver(scheme string, etcd discov.EtcdConf) {
-	resolver.Register(&discovResolver{
-		scheme: scheme,
-		etcd:   etcd,
+func RegisterResolver(etcd discov.EtcdConf) {
+	resolver.Register(&discovBuilder{
+		etcd: etcd,
 	})
 }

+ 17 - 0
rpcx/lb/main.go

@@ -0,0 +1,17 @@
+package main
+
+import (
+	"zero/core/discov"
+	"zero/core/lang"
+	"zero/rpcx/internal"
+)
+
+func main() {
+	cli, err := internal.NewDiscovClient(discov.EtcdConf{
+		Hosts: []string{"localhost:2379"},
+		Key:   "rpcx",
+	})
+	lang.Must(err)
+
+	cli.Next()
+}