spark3.13-dataframe
时间: 2025-02-07 19:07:49 浏览: 45
### Spark 3.1.3 DataFrame 使用教程
#### 创建DataFrame
创建DataFrame可以通过读取文件或现有RDD来完成。对于大多数情况,推荐使用`spark.read`接口加载数据集。
```python
from pyspark.sql import SparkSession
# 初始化Spark会话
spark = SparkSession.builder \
.appName("DataFrame Example") \
.getOrCreate()
# 加载CSV文件到DataFrame
df = spark.read.csv("/path/to/csv", header=True, inferSchema=True)
# 显示前几行数据
df.show()
```
#### 基本操作
一旦有了DataFrame对象之后,就可以对其进行各种变换和动作操作。常见的有选择特定列、过滤行以及添加新计算字段等。
```python
# 选取单个或多个列
selected_columns_df = df.select('column_name')
# 过滤条件
filtered_df = df.filter(df['age'] > 20)
# 添加一列基于已有列的新计算结果
new_column_added_df = df.withColumn('double_age', df['age']*2)
```
#### 聚合函数
支持多种内置聚合函数来进行统计分析工作,比如计数(count),求平均(mean),最大最小值(max/min)等等。
```python
# 计算年龄的最大值、最小值及均值
aggregated_stats_df = df.describe(['age'])
# 对某列进行分组并汇总其他列的信息
grouped_by_gender_df = df.groupBy('gender').agg({'salary': 'mean'})
```
#### Join操作
当需要连接两个表时,可以利用join方法实现内联接(inner join), 外联接(full outer join)等形式的数据关联查询。
```python
# 执行两表之间的左外连接
joined_df = table_a.join(table_b, on='id', how='left_outer')
```
#### 数据写入
最后还可以把处理后的DataFrame保存回存储系统中去,如HDFS,HBase,RDBMS等。
```python
# 将DataFrame写出为Parquet格式文件
df.write.parquet('/output/path/')
```
[^1]
阅读全文
相关推荐















