基于Kubernetes的数据平台搭建与机器学习自动化实践
立即解锁
发布时间: 2025-08-30 01:43:11 阅读量: 6 订阅数: 18 AIGC 

# 基于Kubernetes的数据平台搭建与机器学习自动化实践
## 1. 平台应用与环境搭建
首先创建了一个名为 hc1 的新 Kubernetes 集群,并将所有配置清单组织在 `cluster-apk8s-hc1` 文件夹下。以下是所需的配置和应用及其目录组织:
| 资源 | 目录组织 |
| --- | --- |
| Ingress、Cert Manager、Storage、Monitoring | 000-cluster/00-ingress-nginx、000-cluster/10-cert-manager、000-cluster/20-rook-ceph、000-cluster/30-monitoring |
| Namespace、Mosquitto、Zookeeper | 003-data/000-namespace、003-data/050-mqtt、003-data/010-zookeeper |
| Keycloak、JupyterHub、Elasticsearch、Kibana | 003-data/005-keycloak、005-data-lab/000-namespace、003-data/100-jupyterhub、003-data/030-elasticsearch、003-data/034-kibana |
| MinIO | 000-cluster/22-minio、003-data/070-minio |
| NiFi | 003-data/060-nifi |
使用的应用包括:
- Mosquitto(MQTT):用于从树莓派设备(物联网)传输传感器数据。
- Apache NiFi:监听 MQTT 事件,将数据进行转换和加载(ETL)到 Elasticsearch 中。
- JupyterHub:在 GPU 节点上处理数据、开发和训练机器学习(AIML)模型。
- MinIO:存储机器学习模型和工件。
使用的入口域名如下:
- Apache Nifi:nifi.hc1.apk8s.dev
- JupyterHub:hub.hc1.apk8s.dev
- Kibana:kib.hc1.apk8s.dev
- Keycloak:iam.hc1.apk8s.dev
- MinIO:minio.hc1.apk8s.dev
## 2. 数据收集
### 2.1 MQTT IoT 客户端
借助 Kubernetes 的容器化特性,为物联网应用开发、部署、网络、监控和控制开辟了新途径。以下是具体操作步骤:
1. 创建目录 `cluster-apk8s-hc1/020-data/220-smon`,并在其中创建 `20-configmap.yml` 文件,内容如下:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: smon
namespace: data
labels:
app: smon
data:
collect.sh: |-
for (( ; ; ))
do
load=$(cat /proc/loadavg | cut -d\ -f 1)
temp=$(cat /sensor/temp)
mosquitto_pub \
-h mqtt.data \
-i $DEVICE \
-t sensor/$DEVICE \
-m "{\"device\":\"$DEVICE\",\"temp\":$temp, \"load\":$load}"
echo "device: $DEVICE, temp: $temp, load: $load"
sleep 15
done
```
2. 应用 ConfigMap:
```bash
$ kubectl apply -f 20-configmap.yml
```
3. 创建信号监控的 DaemonSet,在 `40-daemonset.yml` 文件中编写如下内容:
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: smon
namespace: data
labels:
app: smon
component: sensor
spec:
selector:
matchLabels:
name: smon
template:
metadata:
labels:
name: smon
component: sensor
spec:
nodeSelector:
kubernetes.io/role: sensor
tolerations:
- key: dedicated
value: pi
effect: NoSchedule
containers:
- name: smon
image: apk8s/mosquitto-clients:1.5.7_1
command: ["/bin/bash",
"/scripts/collect.sh"]
env:
- name: DEVICE
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
volumeMounts:
- name: scripts
mountPath: /scripts
- name: sensor
mountPath: /sensor
resources:
limits:
memory: 200Mi
requests:
cpu: 50m
memory: 200Mi
volumes:
- name: sensor
hostPath:
path: /sys/class/thermal/thermal_zone0/
- name: scripts
configMap:
name: smon
```
4. 应用 DaemonSet:
```bash
$ kubectl apply -f 40-daemonset.yml
```
此时,三个树莓派设备每 15 秒会将其 CPU 负载和温度报告给集群中运行的 MQTT 代理。
### 2.2 ETL 操作
#### 2.2.1 Apache NiFi
使用 Apache NiFi 进行数据提取、转换和加载,具体步骤如下:
1. 创建 ConsumeMQTT 处理器,设置属性:
- Broker URI:tcp://mqtt:1883
- Topic Filter:sensor/+
- Max Queue Size:1000
2. 使用高级按钮,用以下 Jolt 转换语言配置处理器:
```json
[
{
"operation": "default",
"spec": {
"collected": "${now():toNumber()}"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"collected": "=toLong"
}
}
]
```
3. 测试转换,将从 MQTT 获取的示例数据粘贴到 JSON 输入字段进行测试。
4. 添加 PutElasticsearchHttp 处理器,连接 JoltTransformJSON 处理器的输出,并配置属性:
- Elasticsearch URL:http://elasticsearch:9200
- Index:sensor-${now():format('yyyy-MM')}
- Type:_doc
#### 2.2.2 Python CronJob
使用 Kubernetes CronJob 进行 ETL 活动,步骤如下:
1. 创建 Python 脚本,将其添加到 Kubernetes ConfigMap 中。在 `cluster-apk8s-hc1/020-data/500-jobs` 目录下创建 `01-configmap-hrdump.yml` 文件,内容如下:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: hrdump
namespace: data
labels:
app: hrdump
data:
hrdump.py: |-
import requests
import pandas as pd
import boto3
import datetime
import os
d = datetime.datetime.now()
idx = f"sensor-{d.year}-{d.month:02d}"
query = {
"size": 10000,
"query": {
"range" : {
"collected" : {
"gte" : "now-1h"
}
}
}
}
r = requests.post(
f'http://elasticsearch.data:9200/{idx}/_search',
json=query
)
df = pd.DataFrame()
for rec in r.json()['hits']['hits']:
df = df.append(rec['_source'], ignore_index=True)
csv_data = df.to_csv(index_label="idx")
s3 = boto3.client('s3',
endpoint_url='http://minio-internal-service.data:9000',
aws_access_key_id=os.environ['ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['ACCESS_KEY'],
region_name='')
try:
s3.create_bucket(Bucket='sensor-hourly')
except s3.exc
```
0
0
复制全文
相关推荐









