用flume把txt文件(json格式)导入MySQL,现已建有数据库news与表general_news 且设有1) id # 自增型变量,主键 (2) title # 新闻标题, (3) release_time # 发布时间 (4) views_count # 浏览次数 (5) contents # 新闻内容 (6) authors # 作者/通讯员 (7) source # 来源 (8) save_time # 保存至数据库的时间
时间: 2025-06-11 11:39:24 AIGC 浏览: 19
为了实现将 JSON 格式的 txt 文件数据通过 Flume 导入 MySQL 数据库的目标表 `news.general_news`,需要配置 Flume 的 JDBC Sink 来完成这一任务。以下是完整的解决方案:
### 1. 准备工作
确保已安装并正确配置好以下组件:
- **Apache Flume** 版本兼容性需满足支持 JDBC Source/Sink 功能的要求。
- **MySQL Connector/J** 驱动程序(JAR 文件)被放置于 `$FLUME_HOME/lib` 路径下以供加载使用[^2]。
### 2. 创建目标表结构
在 MySQL 数据库中预先建立名为 `general_news` 的表格用于接收来自 Flume 的数据流输入。SQL 建表语句如下所示:
```sql
CREATE TABLE general_news (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
release_time DATETIME,
views_count INT,
contents TEXT,
authors VARCHAR(255),
source VARCHAR(255),
save_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
```
### 3. 配置 Flume Agent
编辑 Flume 的 agent 配置文件(通常是 flume-conf.properties 或其他指定名称)。下面是一个示例配置片段展示如何读取本地磁盘上的 JSON 文档并通过 JDBC 插件将其写入到远程 RDBMS 表格内。
#### 定义 Sources, Channels 和 Sinks
```properties
# Define the components
agent.sources = jsonFileSource
agent.channels = memoryChannel
agent.sinks = jdbcSink
# Configure the source to read from a file containing JSON objects.
agent.sources.jsonFileSource.type = exec
agent.sources.jsonFileSource.command = tail -F /path/to/json/input/file.txt
agent.sources.jsonFileSource.interceptors = interceptorParser
# Interceptor configuration for parsing each line as JSON object and extracting fields.
agent.sources.jsonFileSource.interceptors.interceptorParser.type = com.example.JsonInterceptor$Builder
agent.sources.jsonFileSource.interceptors.interceptorParser.preserveExisting = false
agent.sources.jsonFileSource.interceptors.interceptorParser.targetFields = title,release_time,views_count,contents,authors,source
# Memory channel setup between source & sink.
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 100
# JDBC Sink Configuration pointing at your database connection details plus SQL statement template.
agent.sinks.jdbcSink.channel = memoryChannel
agent.sinks.jdbcSink.type = org.apache.flume.sink.jdbc.JdbcSink
agent.sinks.jdbcSink.driver = com.mysql.cj.jdbc.Driver
agent.sinks.jdbcSink.connectURL = jdbc:mysql://localhost:3306/news?useSSL=false&serverTimezone=UTC
agent.sinks.jdbcSink.username = root
agent.sinks.jdbcSink.password = password
agent.sinks.jdbcSink.sqlStatement = INSERT INTO general_news(title, release_time, views_count, contents, authors, source) VALUES (?, ?, ?, ?, ?, ?)
# Bind sources/channels/sinks together via bindings.
agent.sources.jsonFileSource.channels = memoryChannel
agent.sinks.jdbcSink.channel = memoryChannel
```
注意:上述配置假定每条记录都是独立的一行纯文本形式表示的一个合法 JSON 对象;实际应用可能还需要额外处理嵌套属性等问题[^3]。
### 4. 开发自定义拦截器 (Optional)
如果默认提供的拦截机制不足以满足需求,则可考虑开发自己的 Java 类继承 AbstractInterceptor 实现更复杂的逻辑转换功能。例如解析非标准格式的数据源或是动态调整某些字段值等等[^4]。
### 5. 测试与验证
启动 Flume 进程之前先检查所有必要的外部资源是否可用(比如网络连通性和权限授予状况),接着按照常规流程开启服务端口监听等待客户端发送请求过来即可开始传输作业。最后可通过查询目的端数据库确认是否有新增加的记录存在从而判断整个链路运作状态良好与否[^5]。
---
###
阅读全文
相关推荐


















