大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink 状态类型

案例1: Flink State 求平均值

Flink State状态描述

广播状态

基本概念

所有并行实例,这些实例将他们维持为状态,不广播另一个流事件,而是将其发送到同一运营的各个实例,并与广播流的事件一起处理。

新的广播状态非常适合需要加入低吞吐量和高吞吐量或需要动态更新其处理逻辑的应用程序,我们将使用后一个用例的具体示例来解释广播状态。

广播状态下的动态模式评估:想象一下,一个电子商务网站将所有用户的交互捕获为用户操作流,运营该网站的公司有兴趣分析交互以增加收入,改善用户体验,以及监测和防止恶意行为。

该网站实现了一个流应用程序,用于检测用户事件流上的模式,但是,公司希望每次模式更改时都避免修改和重新部署应用程序。相反,应用程序从模式流接收新模式时摄取第二个模式流并更新其活动模式。

在上下文中,我们将逐步讨论此应用程序,并展示它如何利用ApacheFlink的广播状态功能。

我们示例应用程序摄取两个数据流,第一个流在网站上提供用户操作,并在上图的左上方展示,用户交互事件包括操作的类型(用户登录、用户注销、添加购物车或者完成付款)和用户的ID,其由颜色编码。

图中所示的用户动作时间流包含用户1001的注销动作,其后是用户1003的支付完成事件,以及用户1002的添加购物车动作。

第二流提供应用将执行的动作模式,评估,模式由两个连续的动作组成。在上图上,模式流包含以下两个:


模式1:用户登录并立即注销而不无需浏览电子商务网站上的其他页面。

模式2:用户将商品添加到购物车并在不完成购买的情况下注销。

这些模式由助于企业更好的分析用户行为,检测恶意行为并改善网站体验。例如,如果项目被添加到购物车而没有后续购买,网站团队可以适当地采取措施来更好的了解用户未完成购买的原因,并启动特定的流程改善网站的转换(比如折扣、优惠卷等)。

在上图的右侧,该图显示了操作员的三个并行任务,即摄取模式和用户操作流,评估操作流上的模式,并在下游接收到新模式时,替换为当前活动模式。原则上,还可以实现运算符以同时评估更复杂的模式或多个模式,这些模式可以单独添加或者移除。


我们将描述模式匹配应用程序如何处理用户操作和模式流:

首先,将模式发送给操作员,该模式被广播到运营商的所有三个并行任务。任务将模式存储在其广播状态中,由于广播状态只应用使用广播数据进行更新,因此所有任务的状态始终预期相同。

接下来,第一个用户操作按用户ID分区并发送到操作员任务,分区可确保同一用户的所有操作都由同一个任务处理,上图显示了操作员任务消耗第一个模式和前三个操作事件后应用程序的状态。

当任务收到新的用户操作时,它会通过查看用户的最新和先前操作来评估当前活动模式,对于每个用户,操作员先前的操作存储在键控状态。由于上图中的任务到目前为止仅为每个用户收到了一个操作(我们刚启动应用程序),因此不需要评估该模式。最后,用户键控状态中的先前操作被更新为最新动作,以便能够在同一用户的下一个动作到达时查找它。

在处理前三个动作之后,下一个事件(用户1001的注销动作)被运送到处理用户1001的事件的任务,当任务接收到动作时,它从广播状态中查找到当前模式并且用户1001的先前操作。由于模式匹配的两个动作,因此任务发出模式匹配事件,最后,任务通过使用最新操作覆盖上一个事件来更新其键控状态。

当新模式到达模式流时,它被广播到所有任务,并且每个任务通过新模式替换当前模式来更新广播状态。

一旦用新模式更新广播状态,匹配逻辑就像之前一样继续,用户动作时间由密钥分区并负责任务评估。


如何使用

那么如何使用广播状态实现应用程序?

到目前为止,概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模式,接下来,我们将展示如何使用 Flink的DataStream API和广播状态功能实现示例应用程序。


让我们从应用程序的输入数据开始,我们有两个数据流,操作和模式,在这一点上,我们并不关心流来来何处。

这些流可以从ApacheKafka或者Kinesis或任何其他系统中摄取。并与两个字段的POJO:

DataStream<Action> actions = ???`
`DataStream<Pattern> patterns = ???
Action``Pattern

Action:Long userId,String action

Pattern:String firstAction String secondAction

作为第一步,我们在属性上键入操作流。接下来,我们准备广播状态,广播状态始终为Flink提供的最通用的状态原语。由于我们的应用程序一次只评估和存储一个,我们将广播状态配置为具体有键类型和值类型。使用广播状态,我们在流上应用转换并接收,在我们获得了KeyedStream和广播流之后,我们都流式传输并应用了一个userId。


编写代码

package icu.wzk;


import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;


public class BroadCastDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        // 两套数据流 1:用户行为 2:模式
        UserAction user1 = new UserAction(1001L, "login");
        UserAction user2 = new UserAction(1003L, "pay");
        UserAction user3 = new UserAction(1002L, "car");
        UserAction user4 = new UserAction(1001L, "logout");
        UserAction user5 = new UserAction(1003L, "car");
        UserAction user6 = new UserAction(1002L, "logout");
        DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6);
        MyPattern myPattern1 = new MyPattern("login", "logout");
        MyPattern myPattern2 = new MyPattern("car", "logout");
        DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2);
        KeyedStream<UserAction, Long> keyed = actions
                .keyBy(new KeySelector<UserAction, Long>() {
                    @Override
                    public Long getKey(UserAction value) throws Exception {
                        return value.getUserId();
                    }
                });
        // 将模式流广播到下游的所有算子
        MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(MyPattern.class));
        BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor);

        SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed
                .connect(broadcastPatterns)
                .process(new PatternEvaluator());

        // 匹配结果输出到控制台
        process.print();
        env.execute("BroadCastDemo");
    }

    // 用户行为类
    public static class UserAction {
        private Long userId;
        private String userAction;

        public UserAction(Long userId, String userAction) {
            this.userId = userId;
            this.userAction = userAction;
        }

        public Long getUserId() {
            return userId;
        }

        public String getUserAction() {
            return userAction;
        }
    }

    // 模式类
    public static class MyPattern {
        private String firstAction;
        private String secondAction;

        // 无参构造函数
        public MyPattern() {
        }

        public MyPattern(String firstAction, String secondAction) {
            this.firstAction = firstAction;
            this.secondAction = secondAction;
        }

        public String getFirstAction() {
            return firstAction;
        }

        public void setFirstAction(String firstAction) {
            this.firstAction = firstAction;
        }

        public String getSecondAction() {
            return secondAction;
        }

        public void setSecondAction(String secondAction) {
            this.secondAction = secondAction;
        }
    }

    public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {

        private transient ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        /**
         * 每个一个Action数据,触发一次执行
         * @author wzk
         * @date 11:21 2024/7/29
        **/
        @Override
        public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            // 把用户行为流和模式流中的模式进行匹配
            ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            MyPattern myPattern =  patterns.get(null);
            String prevAction = prevActionState.value();
            if (myPattern != null && prevAction != null) {
                if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
                    // 匹配成功
                    System.out.println("匹配成功: " + ctx.getCurrentKey());
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
                } else {
                    // 匹配失败
                    System.out.println("匹配失败: " + ctx.getCurrentKey());
                }
            }
            prevActionState.update(value.getUserAction());
        }

        /**
         * 每次来一个模式 Pattern 的时候触发
         * @author wzk
         * @date 11:29 2024/7/29
        **/
        @Override
        public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            BroadcastState<Void, MyPattern> broadcastState = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            broadcastState.put(null, value);
        }
    }

}

processBroadcastElement():为广播流的每个调用记录,在我们的函数中,我们只是使用键将接收到的记录放入广播状态(记住,我们只存储一个模式)。PatternEvaluator Pattern null MapState

processElement():为键控流的每个记录调用,它提供对广播状态的只读访问,以防止修改导致跨函数的并行示例的不同广播状态,从广播状态检索当前模式的方法和从键控状态检索用户的先前动作。如果两者都存在,则检查先后和当前操作是否与模式匹配,并且如果是这种情况则发出模式匹配记录。最后,它将键控状态更新为当前用户操作。processElement() ParrternEvaluator

onTimer():在先前注册的计时器出发时调用,定时器可以在任何处理方法中注册,并用于执行计算或将清理状态,我们在示例中没有实现此方法以保持代码简洁。但是,当用户在一段时间内未处于活动状态时,它可以用于删除用户的最后一个操作,以避免由于非活动用户而导致状态增长。

运行测试

数据结果为:

匹配失败: 1003
匹配成功: 1001
匹配失败: 1002
3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)

控制台输出的结果为:

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
3月前
|
Cloud Native 大数据 Java
大数据新视界--大数据大厂之大数据时代的璀璨导航星:Eureka 原理与实践深度探秘
本文深入剖析 Eureka 在大数据时代分布式系统中的关键作用。涵盖其原理,包括服务注册、续约、发现及自我保护机制;详述搭建步骤、两面性;展示在大数据等多领域的应用场景、实战案例及代码演示。Eureka 如璀璨导航星,为分布式系统高效协作指引方向。
|
9月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
618 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
大数据分析中的机器学习基础:从原理到实践
大数据分析中的机器学习基础:从原理到实践
284 3
|
7月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
3299 32
|
7月前
|
存储 SQL 数据挖掘
深入理解 Flink 中的 State
Flink 的 State(状态)是其四大核心之一,为流处理和批处理任务提供强大支持。本文深入探讨 Flink 中的状态管理,涵盖 State 在 HDFS 中的存储格式、存在形式(如 ValueState、ListState 等)、使用方法、过期时间 TTL 和清除策略,并介绍 Table API 和 SQL 模块中的状态管理。通过实际案例,帮助读者理解如何在电商订单处理、实时日志统计等场景中有效利用状态管理功能。
612 16
zdl
|
9月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
389 56
|
8月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
348 16
|
9月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
247 1
|
10月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
224 0
|
10月前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
273 0

热门文章

最新文章