go语言watch连接etcd

851人浏览 / 0人评论
package main

import (
	"fmt"
	"github.com/coreos/etcd/client"
	"github.com/gin-gonic/gin"
	"golang.org/x/net/context"
	"k8s.io/apimachinery/pkg/util/json"
	"strings"
	"time"
)


/*type etcdClient struct {
	client client.Client // etcd client
	kapi   client.KeysAPI
	//serviceDb map[string]*etcdServiceState
}*/

type ServiceInfo struct {
	ServiceName string // Name of the service
	Role        string // Role of the service. (leader, follower etc)
	Version     string // Version string for the service
	TTL         int    // TTL for this service
	HostAddr    string // Host name or IP address where its running
	Port        int    // Port number where its listening
	Hostname    string // Host name where its running
}

const (
	WatchServiceEventAdd   = iota // New Service endpoint added
	WatchServiceEventDel          // A service endpoint was deleted
	WatchServiceEventError        // Error occurred while watching for service
)
type WatchServiceEvent struct {
	EventType   uint        // event type
	ServiceInfo ServiceInfo // Information about the service
}

var NodeInfo ServiceInfo

var SumWatch bool
//定义内存存储node数据
var srvMap = make(map[string]ServiceInfo)
//定义etcd客户端,并拿基础值
func server(key string) (uint64, []ServiceInfo, error) {
	var srvcList []ServiceInfo
	var serinfo ServiceInfo
	newclient,err := client.New(client.Config{
		//qa 1
		//Endpoints: []string{"http://10.110.14.66:2379", "http://10.110.14.70:2379", "http://10.110.14.79:2379"},
		//qa 3
		//Endpoints: []string{"http://10.110.13.122:2381", "http://10.110.13.154:2381", "http://10.110.13.156:2381"},
		//qa 2
		Endpoints: []string{"http://10.110.13.158:2379", "http://10.110.13.140:2379"},
		//Endpoints: []string{"http://10.110.13.47:2379", "http://10.110.13.158:2379", "http://10.110.13.140:2379"},
		//dev 1
		//Endpoints: []string{"http://10.110.13.98:2303", "http://10.110.13.209:2303"},
		//,"http://10.110.13.211:2303"},
		//dev liangzeyu
		//Endpoints: []string{"http://10.110.13.12:2303", "http://10.110.13.108:2303", "http://10.110.13.99:2303"},
		//Transport:               nil,
		//CheckRedirect:           nil,
		//Username:                "",
		//Password:                "",
		//HeaderTimeoutPerRequest: 0,
		//SelectionMode:           0,
	})
	if err != nil {
		fmt.Println(err)
	}

	//读取数据
   ss := client.NewKeysAPI(newclient)
	//fmt.Println(key)
	resp, err := ss.Get(context.Background(), key, &client.GetOptions{Recursive: true, Sort: true})
	if err != nil {
		fmt.Println(err)
	}

	waindex := resp.Node.Nodes

	//打印读取到的数据
	//fmt.Println(waindex)


	//遍历
	for _, node := range waindex {
		err := json.Unmarshal([]byte(node.Value), &serinfo)
		if err != nil{
			fmt.Println(err)
		}
		//fmt.Println(node.Value)
		//fmt.Println(serinfo.HostAddr)
		srvcList = append(srvcList, serinfo)
	}
	watchIndex := resp.Index
	fmt.Println(watchIndex)
	return watchIndex, srvcList, nil
}
//获取所有的node
func getallNodeOne(eventCh chan WatchServiceEvent) (uint64, error) {

	var key string
	key = "/contiv.io/service/" + "netplugin" + "/"
	Mindex, srvList, err := server(key)
	if err != nil {
		fmt.Println("err",err)
	}


	for pp,OldNode := range srvMap {
		var EtcdNodeInfoDiff bool
		fmt.Println("打印内存数据..",OldNode.HostAddr)

		for _, srvInfo := range srvList  {
			if OldNode.HostAddr == srvInfo.HostAddr {
				NodeInfo = srvInfo
				EtcdNodeInfoDiff = true
				fmt.Println(OldNode.HostAddr,srvInfo.HostAddr)
				break
			}
		}
		if EtcdNodeInfoDiff == false {
			fmt.Println("delete",OldNode.HostAddr)
			delete(srvMap,pp)
			eventCh <- WatchServiceEvent{
				EventType:   WatchServiceEventDel,
				ServiceInfo: NodeInfo,
			}
		}


	}
	for _, srvInfo := range srvList {

		//fmt.Printf("Sending service add event: %+v \n", srvInfo)
		eventCh <- WatchServiceEvent{
			EventType:   WatchServiceEventAdd,
			ServiceInfo: srvInfo,
		}
		fmt.Println(<-eventCh)
		//<- eventCh
	}
	TimeApp()
	fmt.Println("one 索引值",Mindex)
	SumWatch = true
	Mindex, _, err = server(key)
	return Mindex, err
}

func TimeApp()  {
	time.Sleep(time.Second * 150 )
}
//watch数据
func watchnodeall() {
	watchCh := make(chan *client.Response, 1)
	stopCh := make(chan bool, 1)
	var key string
	fmt.Println("重新开始")
	key = "/contiv.io/service/" + "netplugin" + "/"
	watchCtx, watchCancel := context.WithCancel(context.Background())
	newclient, err := client.New(client.Config{
		//qa 1
		//Endpoints: []string{"http://10.110.14.66:2379", "http://10.110.14.70:2379", "http://10.110.14.79:2379"},
		//qa 3
		//Endpoints: []string{"http://10.110.13.122:2381", "http://10.110.13.154:2381", "http://10.110.13.156:2381"},
		//qa 2
		Endpoints: []string{"http://10.110.13.158:2379", "http://10.110.13.140:2379"},
		//Endpoints: []string{"http://10.110.13.47:2379", "http://10.110.13.158:2379", "http://10.110.13.140:2379"},
		//dev 1
		//Endpoints: []string{"http://10.110.13.98:2303", "http://10.110.13.209:2303"},
		//,"http://10.110.13.211:2303"},
		//dev liangzeyu
		//Endpoints: []string{"http://10.110.13.12:2303", "http://10.110.13.108:2303", "http://10.110.13.99:2303"},
		//Transport:               nil,
		//CheckRedirect:           nil,
		//Username:                "",
		//Password:                "",
		//HeaderTimeoutPerRequest: 0,
		//SelectionMode:           0,
	})

	if err != nil {
		fmt.Println(err)
	}


	go func() {
		netpluginWatch := make(chan WatchServiceEvent, 1)
		Indenx, INdexerror := getallNodeOne(netpluginWatch)
		fmt.Println(INdexerror)
		fmt.Println("go 并发的索引号:",Indenx)
/*		Indenx = 50065153
		if SumWatch == 3 {
			Indenx, err = getallNode(netpluginWatch)
		}*/
		fmt.Printf("索引号", Indenx)
		fmt.Println("开启watch1")
		ss := client.NewKeysAPI(newclient)
		watcher := ss.Watcher(key, &client.WatcherOptions{AfterIndex: Indenx, Recursive: true})
		if watcher == nil {
			fmt.Println("Etcd returned invalid watcher err",err)
		}



		for {
			nodeWatcher, err := watcher.Next(watchCtx)
			if err != nil {
				fmt.Println("Stopping watch on key" , err)
				time.Sleep(time.Second * 10)
			}
			watchCh <- nodeWatcher
		}

	}()

	go func() {
		//var srvMap = make(map[string]ServiceInfo)
		for {
			select {
			case watchCase := <-watchCh:
				fmt.Println(watchCase)
				var srvInfo ServiceInfo
				srvKey := strings.TrimPrefix(watchCase.Node.Key, "/contiv.io/service/")
				//qa 3 if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.80:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.80:9003"  || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.79:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.79:9003"{
				//qa 2 if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.74:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.74:9003" {
				 //if watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.127:9002" || watchCase.Node.Key == "/contiv.io/service/netplugin/10.110.12.127:9003" {
				 //fmt.Println(watchCase.Node.Key, watchCase.Action,time.Now().Second())
				if _, ok := srvMap[srvKey]; !ok && watchCase.Action == "set" {
					err := json.Unmarshal([]byte(watchCase.Node.Value), &srvInfo)
					if err != nil {
						fmt.Println("err")
					}
					time.Sleep(time.Second * 1)
					fmt.Println(srvInfo)
					srvMap[srvKey] = srvInfo
					SumWatch = true

					//}
				}

			case stopReq := <-stopCh:
				//ep.WatchService(name , eventCh , stopCh )
				if stopReq {
					// Stop watch and return
					fmt.Println("Stopping watch on")
					watchCancel()
					return
				}

			case <-time.After(time.Second * 10):
				fmt.Println("Watch Timeout for 10 second ")

				//SumWatch ++
				//if SumWatch < 20 {
				if SumWatch == true {
					SumWatch = false
					fmt.Printf("Watch umber of retries: %d\n", SumWatch)
					watchCancel()
					watchnodeall()
					return
				}else {
					fmt.Println("SumWatch不等于1,忽略重启")
				}
				//}
			}
		}
	}()
	time.Sleep(time.Second *3)

}


func main()  {
	r := gin.Default()
	r.GET("/ping", func(i *gin.Context) {
		i.JSON(200, gin.H{
			"message": "pong",
		})
	})
	r.Run(":8888")
	watchnodeall()

	time.Sleep(time.Second * 100000)

}

 

全部评论