大数据处理:Azure上的Spark实践与Twitter标签统计
立即解锁
发布时间: 2025-08-29 10:28:03 阅读量: 5 订阅数: 14 

### 大数据处理:Azure 上的 Spark 实践与 Twitter 标签统计
#### 1. 在 Microsoft Azure HDInsight Spark 集群上实现 Spark 词频统计
在 Azure 上实现 Spark 词频统计,可按以下步骤操作:
1. **创建 Apache Spark 集群**:通过 Azure 门户创建,参考链接:https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-jupyter-spark-sql-use-portal 。创建时注意选择集群类型为 Spark,默认配置资源较多,可将工作节点数改为 2,并配置工作节点和头节点使用 D3 v2 计算机。点击创建后,配置和部署集群需 20 - 30 分钟。
2. **安装库到集群**:若 Spark 代码需要未安装的库,可使用以下命令查看默认安装的库:
```bash
/usr/bin/anaconda/envs/py35/bin/conda list
```
使用提供的安装脚本安装所需库,步骤如下:
1. 在 Azure 门户中选择集群。
2. 点击集群搜索框下的“Script Actions”。
3. 点击“Submit new”,脚本类型选“Custom”,名称指定为“libraries”,Bash 脚本 URI 使用:http://deitel.com/bookresources/IntroToPython/install_libraries.sh 。
4. 勾选“Head”和“Worker”确保脚本在所有节点安装库。
5. 点击“Create”。执行成功会在脚本操作列表中显示绿色对勾,否则 Azure 会通知错误。
3. **复制 RomeoAndJuliet.txt 到 HDInsight 集群**:
1. 使用 `scp` 命令上传文件,命令如下:
```bash
scp RomeoAndJuliet.txt [email protected]:
```
2. 使用 `ssh` 登录集群:
```bash
ssh [email protected]
```
3. 将文件复制到集群的 Hadoop 文件系统:
```bash
hadoop fs -copyFromLocal RomeoAndJuliet.txt /example/data/RomeoAndJuliet.txt
```
4. **访问 HDInsight 中的 Jupyter 笔记本**:HDInsight 使用旧版 Jupyter Notebook 界面,可参考:https://jupyter-notebook.readthedocs.io/en/stable/examples/Notebook/Notebook%20Basics.html 。在 Azure 门户选择所有资源,再选集群,在“Overview”标签下选择“Jupyter notebook”,使用集群设置时的用户名和密码登录。
5. **上传 RomeoAndJulietCounter.ipynb 笔记本**:点击“Upload”按钮,选择 ch16 示例文件夹的 SparkWordCount 文件夹中的 RomeoAndJulietCounter.ipynb 文件,点击“Open”,再点击右侧“Upload”将笔记本放入当前文件夹,点击笔记本名称在新标签页打开,选择“PySpark3”内核。
6. **修改笔记本以适配 Azure**:
1. 修改 `nltk.download('stopwords')` 为:
```python
nltk.download('stopwords', download_dir='.')
```
2. 在第二个单元格的导入语句后添加:
```python
nltk.data.path.append('.')
```
3. 删除原笔记本的第三和第四个单元格。
4. 将 `'RomeoAndJuliet.txt'` 替换为 `'wasb:///example/data/RomeoAndJuliet.txt'`。
5. 将最后一个单元格的 f - 字符串替换为:
```python
print('{:>{width}}: {}'.format(word, count, width=max_len))
```
#### 2. 使用 pyspark - notebook Docker 栈统计 Twitter 标签
此部分将创建并运行 Spark 流式应用,接收指定主题的推文流,并每 10 秒更新一次前 20 个标签的柱状图。
1. **启动 Docker 容器并安装 Tweepy**:按照相关说明启动容器并安装 Python 库,使用以下命令安装 Tweepy:
```bash
pip install tweepy
```
2. **将推文流式传输到套接字**:
- **执行脚本**:在 JupyterLab 的终端窗口执行 starttweetstream.py 脚本。先导航到 SparkHashtagSummarizer 文件夹:
```bash
cd work/SparkHashtagSummarizer
```
然后执行脚本,例如获取 1000 条关于足球的推文:
```bash
ipython starttweetstream.py 1000 football
```
此时脚本会显示“Waiting for connection”,等待 Spark 连接。
- **starttweetstream.py 脚本分析**:
```python
# starttweetstream.py
"""Script to get tweets on topic(s) specified as script argument(s)
and send tweet text to a socket for processing by Spark."""
import keys
import socket
import sys
import tweepy
class TweetListener(tweepy.StreamListener):
"""Handles incoming Tweet stream."""
def __init__(self, api, connection, limit=10000):
"""Create instance variables for tracking number of tweets."""
self.connection = connection
self.tweet_count = 0
self.TWEET_LIMIT = limit # 10,000 by default
super().__init__(api) # call superclass's init
def on_connect(self):
"""Called when your connection attempt is successful, enabling
you to perform appropriate application tasks at that point."""
print('Successfully connected to Twitter\n')
def on_status(self, status):
"""Called when Twitter pushes a new tweet to you."""
# get the hashtags
hashtags = []
for hashtag_dict in status.entities['hashtags']:
hashtags.append(hashtag_dict['text'].lower())
hashtags_string = ' '.join(hashtags) + '\n'
print(f'Screen name: {status.use
```
0
0
复制全文
相关推荐









