Flink1.17.1使用mysql-cdc连接器
时间: 2025-06-28 11:16:00 浏览: 20
### Flink 1.17.1 使用 MySQL CDC Connector 示例教程
#### 添加 Maven 依赖项
为了实现 Flink 和 MySQL 的 CDC 数据同步,项目中需引入特定的依赖项。具体来说,`flink-connector-mysql-cdc` 是用于连接 MySQL 并捕获变更数据的核心组件[^3]。
```xml
<dependencies>
<!-- cdc 依赖 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
```
#### 创建 Flink 应用程序
下面是一个简单的 Java 程序示例,展示了如何配置并运行一个基于 Flink 的 MySQL CDC 流处理作业:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
public class MysqlCdcExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test_db") // 可指定多个数据库名,逗号分隔
.tableList("test_db.test_table") // 或者指定具体的表列表
.username("root")
.password("your_password")
.deserializer(new SimpleStringDebeziumDeserializer()) // 自定义反序列化逻辑
.build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.print(); // 输出到控制台或其他目标位置
env.execute("Flink MySQL CDC Example");
}
}
```
上述代码片段创建了一个名为 `MysqlCdcExample` 的类,在其中设置了 MySQL 源,并指定了要监控的目标库和表。通过调用 `.execute()` 方法来启动流处理任务,这里简单地将接收到的数据打印到了标准输出上[^1]。
#### 启动 Flink SQL 客户端执行查询语句
除了编程方式外,还可以利用 Flink 提供的强大SQL接口来进行更便捷的操作。先确保已经安装好必要的环境之后,可以通过命令行工具进入交互式的 Shell 来编写和提交复杂的ETL脚本[^2]:
```bash
/usr/lib/flink/bin/sql-client.sh embedded shell
```
接着可以尝试如下所示的一条 DDL 声明,它描述了来自 MySQL 表的变化事件作为输入源:
```sql
CREATE TABLE test_table (
id BIGINT,
name STRING,
age INT,
ts TIMESTAMP(3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'your_password',
'database-name' = 'test_db',
'table-name' = 'test_table'
);
```
以上就是关于如何使用 Apache Flink 1.17.1 结合 MySQL CDC Connector 构建实时数据管道的一个基本介绍。
阅读全文
相关推荐









