package main import ( "fmt" "github.com/coreos/etcd/client" "github.com/gin-gonic/gin" "golang.org/x/net/context" "k8s.io/apimachinery/pkg/util/json" "strings" "time" ) /*type etcdClient struct { client client.Client // etcd client kapi client.KeysAPI //serviceDb map[string]*etcdServiceState }*/ type ServiceInfo struct { ServiceName string // Name of the service Role string // Role of the service. (leader, follower etc) Version string // Version string for the service TTL int // TTL for this service HostAddr string // Host name or IP address where its running Port int // Port number where its listening Hostname string // Host name where its running } const ( WatchServiceEventAdd = iota // New Service endpoint added WatchServiceEventDel // A service endpoint was deleted WatchServiceEventError // Error occurred while watching for service ) type WatchServiceEvent struct { EventType uint // event type ServiceInfo ServiceInfo // Information about the service } var NodeInfo ServiceInfo var SumWatch bool //定义内存存储node数据 var srvMap = make(map[string]ServiceInfo) //定义etcd客户端,并拿基础值 func server(key string) (uint64, []ServiceInfo, error) { var srvcList []ServiceInfo var serinfo ServiceInfo newclient,err := client.New(client.Config{ //qa 1 //Endpoints: []string{"http://10.110.14.66:2379", "http://10.110.14.70:2379", "http://10.110.14.79:2379"}, //qa 3 //Endpoints: []string{"http://10.110.13.122:2381", "http://10.110.13.154:2381", "http://10.110.13.156:2381"}, //qa 2 Endpoints: []string{"http://10.110.13.158:2379", "http://10.110.13.140:2379"}, //Endpoints: []string{"http://10.110.13.47:2379", "http://10.110.13.158:2379", "http://10.110.13.140:2379"}, //dev 1 //Endpoints: []string{"http://10.110.13.98:2303", "http://10.110.13.209:2303"}, //,"http://10.110.13.211:2303"}, //dev liangzeyu //Endpoints: []string{"http://10.110.13.12:2303", "http://10.110.13.108:2303", "http://10.110.13.99:2303"}, //Transport: nil, //CheckRedirect: nil, //Username: "", //Password: "", //HeaderTimeoutPerRequest: 0, //SelectionMode: 0, }) if err != nil { fmt.Println(err) } //读取数据 ss := client.NewKeysAPI(newclient) //fmt.Println(key) resp, err := ss.Get(context.Background(), key, &client.GetOptions{Recursive: true, Sort: true}) if err != nil { fmt.Println(err) } waindex := resp.Node.Nodes //打印读取到的数据 //fmt.Println(waindex) //遍历 for _, node := range waindex { err := json.Unmarshal([]byte(node.Value), &serinfo) if err != nil{ fmt.Println(err) } //fmt.Println(node.Value) //fmt.Println(serinfo.HostAddr) srvcList = append(srvcList, serinfo) } watchIndex := resp.Index fmt.Println(watchIndex) return watchIndex, srvcList, nil } //获取所有的node func getallNodeOne(eventCh chan WatchServiceEvent) (uint64, error) { var key string key = "/contiv.io/service/" + "netplugin" + "/" Mindex, srvList, err := server(key) if err != nil { fmt.Println("err",err) } for pp,OldNode := range srvMap { var EtcdNodeInfoDiff bool fmt.Println("打印内存数据..",OldNode.HostAddr) for _, srvInfo := range srvList { if OldNode.HostAddr == srvInfo.HostAddr { NodeInfo = srvInfo EtcdNodeInfoDiff = true fmt.Println(OldNode.HostAddr,srvInfo.HostAddr) break } } if EtcdNodeInfoDiff == false { fmt.Println("delete",OldNode.HostAddr) delete(srvMap,pp) eventCh <- WatchServiceEvent{ EventType: WatchServiceEventDel, ServiceInfo: NodeInfo, } } } for _, srvInfo := range srvList { //fmt.Printf("Sending service add event: %+v \n", srvInfo) eventCh <- WatchServiceEvent{ EventType: WatchServiceEventAdd, ServiceInfo: srvInfo, } fmt.Println(<-eventCh) //<- eventCh } TimeApp() fmt.Println("one 索引值",Mindex) SumWatch = true Mindex, _, err = server(key) return Mindex, err } func TimeApp() { time.Sleep(time.Second * 150 ) } //watch数据 func watchnodeall() { watchCh := make(chan *client.Response, 1) stopCh := make(chan bool, 1) var key string fmt.Println("重新开始") key = "/contiv.io/service/" + "netplugin" + "/" watchCtx, watchCancel := context.WithCancel(context.Background()) newclient, err := client.New(client.Config{ //qa 1 //Endpoints: []string{"http://10.110.14.66:2379", "http://10.110.14.70:2379", "http://10.110.14.79:2379"}, //qa 3 //Endpoints: []string{"http://10.110.13.122:2381", "http://10.110.13.154:2381", "http://10.110.13.156:2381"}, //qa 2 Endpoints: []string{"http://10.110.13.158:2379", "http://10.110.13.140:2379"}, //Endpoints: []string{"http://10.110.13.47:2379", "http://10.110.13.158:2379", "http://10.110.13.140:2379"}, //dev 1 //Endpoints: []string{"http://10.110.13.98:2303", "http://10.110.13.209:2303"}, //,"http://10.110.13.211:2303"}, //dev liangzeyu //Endpoints: []string{"http://10.110.13.12:2303", "http://10.110.13.108:2303", "http://10.110.13.99:2303"}, //Transport: nil, //CheckRedirect: nil, //Username: "", //Password: "", //HeaderTimeoutPerRequest: 0, //SelectionMode: 0, }) if err != nil { fmt.Println(err) } go func() { netpluginWatch := make(chan WatchServiceEvent, 1) Indenx, INdexerror := getallNodeOne(netpluginWatch) fmt.Println(INdexerror) fmt.Println("go 并发的索引号:",Indenx) /* Indenx = 50065153 if SumWatch == 3 { Indenx, err = getallNode(netpluginWatch) }*/ fmt.Printf("索引号", Indenx) fmt.Println("开启watch1") ss := client.NewKeysAPI(newclient) watcher := ss.Watcher(key, &client.WatcherOptions{AfterIndex: Indenx, Recursive: true}) if watcher == nil { fmt.Println("Etcd returned invalid watcher err",err) } for { nodeWatcher, err := watcher.Next(watchCtx) if err != nil { fmt.Println("Stopping watch on key" , err) time.Sleep(time.Second * 10) } watchCh <- nodeWatcher } }() go func() { //var srvMap = make(map[string]ServiceInfo) for { select { case watchCase := <-watchCh: fmt.Println(watchCase) var srvInfo ServiceInfo srvKey := strings.TrimPrefix(watchCase.Node.Key, "/contiv.io/service/") //qa 3 if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.80:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.80:9003" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.79:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.79:9003"{ //qa 2 if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.74:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.74:9003" { //if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.127:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.127:9003" { //fmt.Println(watchCase.Node.Key, watchCase.Action,time.Now().Second()) if _, ok := srvMap[srvKey]; !ok && watchCase.Action == "set" { err := json.Unmarshal([]byte(watchCase.Node.Value), &srvInfo) if err != nil { fmt.Println("err") } time.Sleep(time.Second * 1) fmt.Println(srvInfo) srvMap[srvKey] = srvInfo SumWatch = true //} } case stopReq := <-stopCh: //ep.WatchService(name , eventCh , stopCh ) if stopReq { // Stop watch and return fmt.Println("Stopping watch on") watchCancel() return } case <-time.After(time.Second * 10): fmt.Println("Watch Timeout for 10 second ") //SumWatch ++ //if SumWatch < 20 { if SumWatch == true { SumWatch = false fmt.Printf("Watch umber of retries: %d\n", SumWatch) watchCancel() watchnodeall() return }else { fmt.Println("SumWatch不等于1,忽略重启") } //} } } }() time.Sleep(time.Second *3) } func main() { r := gin.Default() r.GET("/ping", func(i *gin.Context) { i.JSON(200, gin.H{ "message": "pong", }) }) r.Run(":8888") watchnodeall() time.Sleep(time.Second * 100000) }
有问题请加博主微信进行沟通!
全部评论