KubeEdge 部署简单应用

KubeEdge 部署简单应用

参考(主要来源)

Kubeedge 安装,配置,HelloWorld

非常感谢该文作者


架构

KubeEdge Web Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 Cloud
+-----------------------------+
| +-------------------+ |
| | KubeEdge Cloud | | User Message
| | +---------------+ | |
| | | Server :80 +<------------+
| | +---------------+ | |
| +-------------------+ |
+-----------------------------+
|
|
| Sync Message
|
Edge |
+------------v----------------------------+
| |
| +-----------------+ +------------+ |
| | KubeEdge Edge | | MQTT | |
| | +----> | |
| +-----------------+ +------------+ |
| |
+-----------------------------------------+

部署的 KubeEdge 由两个节点组成,一个节点为主节点,一个节点为边缘节点,如果部署参照前面的文章,以下为两个节点的情况

1
2
3
NAME     STATUS   ROLES        AGE    VERSION
edge1 Ready agent,edge 28h v1.17.1-kubeedge-v1.3.1
ubuntu Ready master 2d5h v1.17.0

服务运行在主节点上,该服务会根据用户下达的请求向边缘节点的 MQTT 同步消息。此程序运行于 80 端口


前置要求

KubeEdge 以部署完毕,在主节点上一个有 KubeEdge 源码并正确地将其加入到 GOPATH 中


主节点部署

clone 代码 kubeedge/examples 至本地,进入到目录 github.com/kubeedge/examples/kubeedge-web-demo/ 下。之后的讲解都以此为根目录

修改 utils/kubeclient.go 内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package utils

import (
"errors"
"fmt"
"io/ioutil"
"k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert"
"net"
"os"
)

var KubeQPS = float32(5.000000)
var KubeBurst = 10
var KubeContentType = "application/vnd.kubernetes.protobuf"
var ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")

func InClusterConfig() (*rest.Config, error) {
const (
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, ErrNotInCluster
}

token, err := ioutil.ReadFile(tokenFile)
if err != nil {
return nil, err
}

tlsClientConfig := rest.TLSClientConfig{}

if _, err := certutil.NewPool(rootCAFile); err != nil {
fmt.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
} else {
tlsClientConfig.CAFile = rootCAFile
}

return &rest.Config{
Host: "https://" + net.JoinHostPort(host, port),
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
}, nil
}

// KubeConfig from flags
func KubeConfig() (conf *rest.Config, err error) {
kubeConfig, err := InClusterConfig()
if err != nil {
return nil, err
}
kubeConfig.QPS = KubeQPS
kubeConfig.Burst = KubeBurst
kubeConfig.ContentType = KubeContentType
return kubeConfig, err
}

修改 deployments/kubeedge-speaker-instance.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: devices.kubeedge.io/v1alpha1
kind: Device
metadata:
name: speaker-01
labels:
description: "Speaker"
manufacturer: "test"
spec:
deviceModelRef:
name: speaker-model
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ""
operator: In
values:
- edge1

注意,最后的 edge1边缘节点的名称

修改文件 deployments/kubeedge-web-app.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
k8s-app: kubeedge-web-app
name: kubeedge-web-app
namespace: default
spec:
selector:
matchLabels:
k8s-app: kubeedge-web-app
template:
metadata:
labels:
k8s-app: kubeedge-web-app
spec:
nodeSelector:
node-role.kubernetes.io/master: ""
hostNetwork: true
containers:
- name: kubeedge-web-app
image: kubeedge/kubeedge-web-app:v2.6
imagePullPolicy: IfNotPresent
restartPolicy: Always

修改 Dockerfile 为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
FROM centos:7.6.1810
LABEL maintainers="KubeEdge Authors"
LABEL description="KubeEdge Web App"

# Copy from build directory
COPY kubeedge-web-app /kubeedge-web-app
COPY static /static
COPY views /views

# Define default command
ENTRYPOINT ["/kubeedge-web-app"]

# Run the executable
CMD ["kubeedge-web-app"]

编译 Go 程序以及构造镜像

1
2
go build -o kubeedge-web-app main.go
docker build . -t kubeedge/kubeedge-web-app:v2.6

添加用户权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sudo tee deployments/fabric8-rbac.yaml <<-'EOF'
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: fabric8-rbac
subjects:
- kind: ServiceAccount
# Reference to upper's `metadata.name`
name: default
# Reference to upper's `metadata.namespace`
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
EOF

部署资源文件

1
2
3
4
kubectl create -f deployments/fabric8-rbac.yaml
kubectl create -f deployments/kubeedge-speaker-model.yaml
kubectl create -f deployments/kubeedge-speaker-instance.yaml
kubectl create -f deployments/kubeedge-web-app.yaml

查看主节点运行状态

执行

1
kubectl get nodes

结果为

1
2
3
NAME     STATUS   ROLES        AGE    VERSION
edge1 Ready agent,edge 28h v1.17.1-kubeedge-v1.3.1
ubuntu Ready master 2d5h v1.17.0

执行

1
kubectl get deployments

结果为

1
2
NAME               READY   UP-TO-DATE   AVAILABLE   AGE
kubeedge-web-app 1/1 1 1 160m

执行

1
kubectl get pods

结果为

1
2
NAME                                READY   STATUS    RESTARTS   AGE
kubeedge-web-app-76d4755f59-qbjpd 1/1 Running 0 160m

执行

1
kubectl describe pod kubeedge-web-app-76d4755f59-qbjpd | grep IP

结果为

1
2
3
IP:           10.211.55.52
IPs:
IP: 10.211.55.52

访问 http://10.211.55.52 即可看到页面


边缘端

此处不使用样例代码中运行在边缘端的代码,仅仅查看边缘端 MQTT 是否接受到来自于主节点的消息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"encoding/json"
"fmt"
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/types"
"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/client"
)

func main() {
fmt.Println("Get music list successfully")

cli := client.New(&client.Options{
// Define the processing of the error handler.
ErrorHandler: func(err error) {
fmt.Println(err)
},
})

fmt.Println("Create mqtt client successfully")

stopchan := make(chan int)
// Terminate the Client.
defer cli.Terminate()

// Connect to the MQTT Server.
err := cli.Connect(&client.ConnectOptions{
Network: "tcp",
Address: "localhost:1883",
ClientID: []byte("receive-client"),
})
if err != nil {
panic(err)
}
fmt.Println("Connect mqtt client successfully")

err = cli.Subscribe(&client.SubscribeOptions{
SubReqs: []*client.SubReq{
{
TopicFilter: []byte(`$hw/events/device/speaker-01/twin/update/document`),
QoS: mqtt.QoS0,
// Define the processing of the message handler.
Handler: func(topicName, message []byte) {
Update := &types.DeviceTwinDocument{}
err := json.Unmarshal(message, Update)
if err != nil {
fmt.Println("Unmarshal error", err)
fmt.Printf("Unmarshal error: %v\n", err)
}
fmt.Printf("%+v", Update)
},
},
},
})
fmt.Println("Subscribe mqtt topic successfully")

<-stopchan
if err != nil {
panic(err)
} else {
fmt.Println("Connection successfully")
}
}

注意,需要将 kubeedge 官方代码正确添加到 GOPATH 下,正确配置到文件的 GOPATH,同时使用 go get 下载好相关的依赖。不要使用 go mod ,会出问题。编译好程序直接运行并保持运行状态。


检验

主节点运行命令查看 pod 日志

1
kubectl logs -f kubeedge-web-app-76d4755f59-qbjpd

访问之前提到的页面,选择其中的一首歌点击播放,日志会有类似如下输出

1
2
2020/06/16 11:00:04 PlayTrack: 7
2020/06/16 11:00:04 Track [ 7 ] will be played on speaker speaker-01

而运行在边缘端的程序也会有类似如下输出

1
&{BaseMessage:{EventID:cd29c533-9c76-4c7f-b913-90310327526e Timestamp:1592305204859} Twin:map[track:0xc0000aa4c0]}

在边缘端安装 sqlite3 ,执行 sqlite3 /var/lib/kubeedge/edgecore.db 进入交互节点,执行

1
select * from device_twin;

会有类似如下输出

1
2|speaker-01|track||7||{"timestamp":1592305204859}||{"cloud":85047,"edge":0}||0|string|{}

至此,云边交互成功。