KubeEdge 部署简单应用
参考(主要来源) Kubeedge 安装,配置,HelloWorld
非常感谢该文作者
架构 KubeEdge Web Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Cloud +-----------------------------+ | +-------------------+ | | | KubeEdge Cloud | | User Message | | +---------------+ | | | | | Server :80 +<------------+ | | +---------------+ | | | +-------------------+ | +-----------------------------+ | | | Sync Message | Edge | +------------v----------------------------+ | | | +-----------------+ +------------+ | | | KubeEdge Edge | | MQTT | | | | +----> | | | +-----------------+ +------------+ | | | +-----------------------------------------+
部署的 KubeEdge 由两个节点组成,一个节点为主节点,一个节点为边缘节点,如果部署参照前面的文章,以下为两个节点的情况
1 2 3 NAME STATUS ROLES AGE VERSION edge1 Ready agent,edge 28h v1.17.1-kubeedge-v1.3.1 ubuntu Ready master 2d5h v1.17.0
服务运行在主节点上,该服务会根据用户下达的请求向边缘节点的 MQTT 同步消息。此程序运行于 80 端口
前置要求 KubeEdge 以部署完毕,在主节点上一个有 KubeEdge 源码并正确地将其加入到 GOPATH 中
主节点部署 clone 代码 kubeedge/examples 至本地,进入到目录 github.com/kubeedge/examples/kubeedge-web-demo/
下。之后的讲解都以此为根目录
修改 utils/kubeclient.go
内容为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package utilsimport ( "errors" "fmt" "io/ioutil" "k8s.io/client-go/rest" certutil "k8s.io/client-go/util/cert" "net" "os" ) var KubeQPS = float32 (5.000000 )var KubeBurst = 10 var KubeContentType = "application/vnd.kubernetes.protobuf" var ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined" )func InClusterConfig () (*rest.Config, error) { const ( tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ) host, port := os.Getenv("KUBERNETES_SERVICE_HOST" ), os.Getenv("KUBERNETES_SERVICE_PORT" ) if len (host) == 0 || len (port) == 0 { return nil , ErrNotInCluster } token, err := ioutil.ReadFile(tokenFile) if err != nil { return nil , err } tlsClientConfig := rest.TLSClientConfig{} if _, err := certutil.NewPool(rootCAFile); err != nil { fmt.Errorf("Expected to load root CA config from %s, but got err: %v" , rootCAFile, err) } else { tlsClientConfig.CAFile = rootCAFile } return &rest.Config{ Host: "https://" + net.JoinHostPort(host, port), TLSClientConfig: tlsClientConfig, BearerToken: string (token), }, nil } func KubeConfig () (conf *rest.Config, err error) { kubeConfig, err := InClusterConfig() if err != nil { return nil , err } kubeConfig.QPS = KubeQPS kubeConfig.Burst = KubeBurst kubeConfig.ContentType = KubeContentType return kubeConfig, err }
修改 deployments/kubeedge-speaker-instance.yaml
为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 apiVersion: devices.kubeedge.io/v1alpha1 kind: Device metadata: name: speaker-01 labels: description: "Speaker" manufacturer: "test" spec: deviceModelRef: name: speaker-model nodeSelector: nodeSelectorTerms: - matchExpressions: - key: "" operator: In values: - edge1
注意,最后的 edge1
为边缘节点的名称
修改文件 deployments/kubeedge-web-app.yaml
为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apiVersion: apps/v1 kind: Deployment metadata: labels: k8s-app: kubeedge-web-app name: kubeedge-web-app namespace: default spec: selector: matchLabels: k8s-app: kubeedge-web-app template: metadata: labels: k8s-app: kubeedge-web-app spec: nodeSelector: node-role.kubernetes.io/master: "" hostNetwork: true containers: - name: kubeedge-web-app image: kubeedge/kubeedge-web-app:v2.6 imagePullPolicy: IfNotPresent restartPolicy: Always
修改 Dockerfile 为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 FROM centos:7.6 .1810 LABEL maintainers="KubeEdge Authors" LABEL description="KubeEdge Web App" COPY kubeedge-web-app /kubeedge-web-app COPY static /static COPY views /views ENTRYPOINT ["/kubeedge-web-app" ] CMD ["kubeedge-web-app" ]
编译 Go 程序以及构造镜像
1 2 go build -o kubeedge-web-app main.go docker build . -t kubeedge/kubeedge-web-app:v2.6
添加用户权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 sudo tee deployments/fabric8-rbac.yaml <<-'EOF' apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: fabric8-rbac subjects: - kind: ServiceAccount name: default namespace: default roleRef: kind: ClusterRole name: cluster-admin apiGroup: rbac.authorization.k8s.io EOF
部署资源文件
1 2 3 4 kubectl create -f deployments/fabric8-rbac.yaml kubectl create -f deployments/kubeedge-speaker-model.yaml kubectl create -f deployments/kubeedge-speaker-instance.yaml kubectl create -f deployments/kubeedge-web-app.yaml
查看主节点运行状态 执行
结果为
1 2 3 NAME STATUS ROLES AGE VERSION edge1 Ready agent,edge 28h v1.17.1-kubeedge-v1.3.1 ubuntu Ready master 2d5h v1.17.0
执行
结果为
1 2 NAME READY UP-TO-DATE AVAILABLE AGE kubeedge-web-app 1/1 1 1 160m
执行
结果为
1 2 NAME READY STATUS RESTARTS AGE kubeedge-web-app-76d4755f59-qbjpd 1/1 Running 0 160m
执行
1 kubectl describe pod kubeedge-web-app-76d4755f59-qbjpd | grep IP
结果为
1 2 3 IP: 10.211.55.52 IPs: IP: 10.211.55.52
访问 http://10.211.55.52
即可看到页面
边缘端 此处不使用样例代码中运行在边缘端的代码,仅仅查看边缘端 MQTT 是否接受到来自于主节点的消息,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package mainimport ( "encoding/json" "fmt" "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/types" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" ) func main () { fmt.Println("Get music list successfully" ) cli := client.New(&client.Options{ ErrorHandler: func (err error) { fmt.Println(err) }, }) fmt.Println("Create mqtt client successfully" ) stopchan := make (chan int ) defer cli.Terminate() err := cli.Connect(&client.ConnectOptions{ Network: "tcp" , Address: "localhost:1883" , ClientID: []byte ("receive-client" ), }) if err != nil { panic (err) } fmt.Println("Connect mqtt client successfully" ) err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ { TopicFilter: []byte (`$hw/events/device/speaker-01/twin/update/document` ), QoS: mqtt.QoS0, Handler: func (topicName, message []byte ) { Update := &types.DeviceTwinDocument{} err := json.Unmarshal(message, Update) if err != nil { fmt.Println("Unmarshal error" , err) fmt.Printf("Unmarshal error: %v\n" , err) } fmt.Printf("%+v" , Update) }, }, }, }) fmt.Println("Subscribe mqtt topic successfully" ) <-stopchan if err != nil { panic (err) } else { fmt.Println("Connection successfully" ) } }
注意,需要将 kubeedge
官方代码正确添加到 GOPATH 下,正确配置到文件的 GOPATH,同时使用 go get
下载好相关的依赖。不要使用 go mod
,会出问题。编译好程序直接运行并保持运行状态。
检验 主节点运行命令查看 pod 日志
1 kubectl logs -f kubeedge-web-app-76d4755f59-qbjpd
访问之前提到的页面,选择其中的一首歌点击播放,日志会有类似如下输出
1 2 2020/06/16 11:00:04 PlayTrack: 7 2020/06/16 11:00:04 Track [ 7 ] will be played on speaker speaker-01
而运行在边缘端的程序也会有类似如下输出
1 &{BaseMessage:{EventID:cd29c533-9c76-4c7f-b913-90310327526e Timestamp:1592305204859} Twin:map[track:0xc0000aa4c0]}
在边缘端安装 sqlite3
,执行 sqlite3 /var/lib/kubeedge/edgecore.db
进入交互节点,执行
1 select * from device_twin;
会有类似如下输出
1 2|speaker-01|track||7||{"timestamp":1592305204859}||{"cloud":85047,"edge":0}||0|string|{}
至此,云边交互成功。