基于Kubernetes的平台应用与机器学习自动化实践
立即解锁
发布时间: 2025-08-30 00:12:53 阅读量: 5 订阅数: 16 AIGC 

### 基于Kubernetes的平台应用与机器学习自动化实践
#### 1. 平台应用搭建
首先创建了一个名为 hc1 的新 Kubernetes 集群。后续示例会利用之前安装的应用和集群配置,所有配置清单都组织在 `cluster-apk8s-hc1` 文件夹下。使用到的应用包括:
- Mosquitto(MQTT):用于从树莓派设备(物联网)传输传感器数据。
- Apache NiFi:监听 MQTT 事件,将数据进行转换和加载(ETL)到 Elasticsearch 中。
- JupyterHub:在 GPU 节点上处理数据、开发和训练机器学习(AIML)模型。
- MinIO:存储机器学习模型和工件。
使用的 ingress 域名如下:
| 应用 | 域名 |
| ---- | ---- |
| Apache Nifi | nifi.hc1.apk8s.dev |
| JupyterHub | hub.hc1.apk8s.dev |
| Kibana | kib.hc1.apk8s.dev |
| Keycloak | iam.hc1.apk8s.dev |
| MinIO | minio.hc1.apk8s.dev |
#### 2. 数据收集
利用 Kubernetes 的 DaemonSet 对象在树莓派(物联网)设备上部署和管理数据收集工作负载,使用 CronJob 对象部署执行数据聚合的 ETL 工作负载。
##### 2.1 MQTT IoT 客户端
传统的物联网应用开发通常针对特定平台编写代码,而借助 Kubernetes 和容器化技术,为物联网应用的开发、部署、网络、监控和控制带来了新的方法。
操作步骤如下:
1. 创建目录 `cluster-apk8s-hc1/020-data/220-smon`,并在其中创建 `20-configmap.yml` 文件,内容如下:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: smon
namespace: data
labels:
app: smon
data:
collect.sh: |-
for (( ; ; ))
do
load=$(cat /proc/loadavg | cut -d\ -f 1)
temp=$(cat /sensor/temp)
mosquitto_pub \
-h mqtt.data \
-i $DEVICE \
-t sensor/$DEVICE \
-m "{\"device\":\"$DEVICE\",\"temp\":$temp, \"load\":$load}"
echo "device: $DEVICE, temp: $temp, load: $load"
sleep 15
done
```
2. 应用 ConfigMap:
```bash
$ kubectl apply -f 20-configmap.yml
```
3. 创建名为 `40-daemonset.yml` 的 DaemonSet 文件,内容如下:
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: smon
namespace: data
labels:
app: smon
component: sensor
spec:
selector:
matchLabels:
name: smon
template:
metadata:
labels:
name: smon
component: sensor
spec:
nodeSelector:
kubernetes.io/role: sensor
tolerations:
- key: dedicated
value: pi
effect: NoSchedule
containers:
- name: smon
image: apk8s/mosquitto-clients:1.5.7_1
command: ["/bin/bash",
"/scripts/collect.sh"]
env:
- name: DEVICE
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
volumeMounts:
- name: scripts
mountPath: /scripts
- name: sensor
mountPath: /sensor
resources:
limits:
memory: 200Mi
requests:
cpu: 50m
memory: 200Mi
volumes:
- name: sensor
hostPath:
path: /sys/class/thermal/thermal_zone0/
- name: scripts
configMap:
name: smon
```
4. 应用 DaemonSet:
```bash
$ kubectl apply -f 40-daemonset.yml
```
此时,三个树莓派设备每 15 秒会将 CPU 负载和温度数据报告给集群中运行的 MQTT 代理。
#### 3. ETL 操作
前一节的 DaemonSet 每 15 秒从树莓派设备收集指标并将结果以 JSON 格式发布到 MQTT 主题 `/sensor/DEVICE_NAME`。接下来使用 Apache Nifi 提取 MQTT 消息、进行转换并加载到 Elasticsearch 进行索引,同时使用 Python 脚本和 Kubernetes CronJob 从收集的数据范围创建 CSV 对象进行长期存储。
##### 3.1 Apache NiFi
使用 NiFi 的 `ConsumeMQTT`、`JoltTransformJSON` 和 `PutElasticsearchHttp` 处理器。
操作步骤如下:
1. 创建 `ConsumeMQTT` 处理器,设置属性:
- Broker URI:`tcp://mqtt:1883`
- Topic Filter:`sensor/+`
- Max Queue Size:`1000`
2. 使用高级按钮配置处理器的 Jolt 转换语言,内容如下:
```json
[
{
"operation": "default",
"spec": {
"collected": "${now():toNumber()}"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"collected": "=toLong"
}
}
]
```
3. 测试转换,将从 MQTT 获取的示例数据粘贴到 JSON 输入字段进行测试。
4. 添加 `PutElasticsearchHttp` 处理器并连接 `JoltTransformJSON` 处理器的输出,配置属性:
- Elasticsearch URL:`http://elasticsearch:9200`
- Index:`sensor-${now():format('yyyy-MM')}`
- Type:`_doc`
##### 3.2 Python CronJob
为了满足数据科学和机器学习研究对静态数据集的需求,使用 Kubernetes CronJob 进行 ETL 活动。
操作步骤如下:
1. 创建目录 `cluster-apk8s-hc1/020-data/500-jobs`,并在其中创建 `01-configmap-hrdump.yml` 文件,内容如下:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: hrdump
namespace: data
labels:
app: hrdump
data:
hrdump.py: |-
import requests
import pandas as pd
import boto3
import datetime
import os
d = datetime.datetime.now()
idx = f"sensor-{d.year}-{d.month:02d}"
query = {
"size": 10000,
"query": {
"range" : {
"collected" : {
"gte" : "now-1h"
}
}
}
}
r = requests.post(
f'http://elasticsearch.data:9200/{idx}/_search',
json=query
)
df = pd.DataFrame()
for rec in r.json()['hits']['hits']:
df = df.append(rec['_source'], ignore_index=True)
csv_data = df.to_csv(
```
0
0
复制全文
相关推荐










