数据处理与区块链平台搭建
立即解锁
发布时间: 2025-08-30 01:44:05 阅读量: 7 订阅数: 18 AIGC 

# 数据处理与区块链平台搭建
## 1. 数据的分析与可视化
### 1.1 环境准备
JupyterHub 可提供 JupyterLab 环境,方便在集群中直接运行一个或多个 Jupyter Notebooks。以下将展示如何在 Elasticsearch 中进行简单的数据查询和可视化,以及如何通过编程方式开发 NiFi 数据流。
### 1.2 数据查询与可视化步骤
1. **安装 Elasticsearch 包**:在新的基于 Python 的 Jupyter Notebook 的第一个单元格中添加以下命令,安装 7.6.0 版本的 Elasticsearch 包。
```python
!pip install elasticsearch==7.6.0
```
2. **导入必要的库**:导入 elasticsearch、pandas 和 matplotlib。
```python
from elasticsearch import Elasticsearch
import pandas as pd
from matplotlib import pyplot
```
3. **创建 Elasticsearch 客户端**:连接到在 Kubernetes Namespace data 中运行的 elasticsearch 服务。
```python
es = Elasticsearch(["elasticsearch.data"])
```
4. **查询数据**:使用 Elasticsearch 客户端的 search 函数查询以 sentiment- 开头的索引模式,并将结果存储在变量 response 中。
```python
response = es.search(
index="sentiment-*",
body={
"size": 10000,
"query": {
"range": {
"Date": {
"gt": "now-1h"
}
}
},
"_source": [
"Date",
"polarity",
"subjectivity"
]
}
)
```
5. **数据转换**:将 Elasticsearch 的响应映射并转置为 Pandas DataFrame。
```python
df = pd.concat(map(pd.DataFrame.from_dict,
response['hits']['hits']),
axis=1)['_source'].T
```
6. **日期格式转换**:将 Date 列转换为 Python Datetime 数据类型。
```python
datefmt = '%a, %d %b %Y %H:%M:%S GMT'
df['Date'] = pd.to_datetime(df['Date'], format=datefmt)
```
7. **设置索引并转换数据类型**:将 Date 字段设置为 DataFrame 索引,并将所有数值转换为浮点数。
```python
df = df.set_index(['Date'])
df = df.astype(float)
```
8. **查看数据**:打印前五行记录。
```python
df.head()
```
9. **数据可视化**:调用 DataFrame 的 plot 函数绘制情感分析图,将极性分配给 y 轴。
```python
df.plot(y=["polarity"], figsize=(13,5))
```
### 1.3 数据处理的意义
上述示例是数据处理和可视化的基础示例。数据科学家或分析师通常会先进行类似的任务,以初步了解可用数据。机器学习等数据科学活动通常需要固定的数据集,以便进行可重复的实验。将集群内的 Jupyter Notebooks 与 MinIO 对象存储、事件队列、数据管理和 ETL 系统连接起来,为高效构建和共享这些有价值的数据集提供了很多机会。
### 1.4 NiFi 编程
#### 1.4.1 NiFi 简介
Apache NiFi 支持通过用 Java 编写的自定义控制器和处理器进行扩展。此外,还可以通过 API 扩展其功能,以实现自动化和监控。
#### 1.4.2 创建 NiFi 流程组和处理器步骤
1. **安装 NiPyApi 包**:在新的基于 Python 的 Jupyter Notebook 的第一个单元格中添加以下命令,安装 0.14.3 版本的 NiPyApi 包。
```python
!pip install nipyapi==0.14.3
```
2. **导入包**:导入 nipyapi 包。
```python
import nipyapi
```
3. **配置 NiFi 客户端**:使用 API 端点配置 NiFi 客户端,这里使用 Kubernetes Service nifi 在 data Namespace 中的地址。
```python
api_url = "http://nifi.data:8080/nifi-api"
nipyapi.utils.set_endpoint(api_url)
```
4. **测试客户端连接**:通过检索集群中第一个节点的信息来测试客户端连接。
```python
nodes = nipyapi.system.get_cluster().cluster.nodes
nodes[0]
```
5. **创建 NiFi 流程组**:创建一个 NiFi 流程组并将其放置在画布上。
```python
pg0id = nipyapi.canvas.get_process_group(
nipyapi.canvas.get_root_pg_id(),
'id'
)
pg0 = nipyapi.canvas.create_process_group(
pg0id,
"apk8s
```
0
0
复制全文
相关推荐







