详解Flink CEP的概念及功能
园陌我们在看直播的时候,不管对于主播还是用户来说,非常重要的一项就是弹幕文化。为了增加直播趣味性和互动性, 各大网络直播平台纷纷采用弹窗弹幕作为用户实时交流的方式,内容丰富且形式多样的弹幕数据中隐含着复杂的用户属性与用户行为, 研究并理解在线直播平台用户具有弹幕内容审核与监控、舆论热点预测、个性化摘要标注等多方面的应用价值。
本文不分析弹幕数据的应用价值,只通过弹幕内容审核与监控案例来了解下Flink CEP的概念及功能。
在用户发弹幕时,直播平台主要实时监控识别两类弹幕内容:一类是发布不友善弹幕的用户 ;一类是刷屏的用户。
我们先记住上述需要实时监控识别的两类用户,接下来介绍Flink CEP的API,然后使用CEP解决上述问题。
Flink CEPFlink CEP 是什么
Flink CEP是一个基于Flink的复杂事件处理库,可以从多个数据流中发现复杂事件,识别有意义的事件(例如机会或者威胁),并尽快的做出响应,而不是需要等待几天或则几个月相当长的时间,才发现问题。
Flink CEP API
CEP API的核心是Pattern(模式) API,它允许你快速定义复杂的事件模式。每个模式包含多个阶段(stage)或者我们也可称为状态(state)。从一个状态切换到另一个状态,用户可以指定条件,这些条件可以作用在邻近的事件或独立事件上。
介绍API之前先来理解几个概念:
1. 模式与模式序列
简单模式称为模式,将最终在数据流中进行搜索匹配的复杂模式序列称为模式序列,每个复杂模式序列是由多个简单模式组成。
匹配是一系列输入事件,这些事件通过一系列有效的模式转换,能够访问复杂模式图的所有模式。
每个模式必须具有唯一的名称,我们可以使用模式名称来标识该模式匹配到的事件。
2. 单个模式
一个模式既可以是单例的,也可以是循环的。单例模式接受单个事件,循环模式可以接受多个事件。
3. 模式示例:
有如下模式:a b+ c?d
其中a,b,c,d这些字母代表的是模式,+代表循环,b+就是循环模式;?代表可选,c?就是可选模式;
所以上述模式的意思就是:a后面可以跟一个或多个b,后面再可选的跟c,最后跟d。
其中a、c? 、d是单例模式,b+是循环模式。
一般情况下,模式都是单例模式,可以使用量词(Quantifiers)将其转换为循环模式。
每个模式可以带有一个或多个条件,这些条件是基于事件接收进行定义的。或者说,每个模式通过一个或多个条件来匹配和接收事件。
了解完上述概念后,接下来介绍下案例中需要用到的几个CEP API:
案例中用到的CEP API:
Begin:定义一个起始模式状态
用法:start = Pattern.<Event>begin("start");
Next:附加一个新的模式状态。匹配事件必须直接接续上一个匹配事件
用法:next = start.next("next");
Where:定义当前模式状态的过滤条件。仅当事件通过过滤器时,它才能与状态匹配
用法:patternState.where(_.message == "TMD");
Within: 定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃
用法:patternState.within(Time.seconds(10));
Times:一个给定类型的事件出现了指定次数
用法:patternState.times(5);
API 先介绍以上这几个,接下来我们解决下文章开头提到的案例:
监测用户弹幕行为案例
案例一:监测恶意用户
规则:用户如果在10s内,同时输入 TMD 超过5次,就认为用户为恶意攻击,识别出该用户。
使用 Flink CEP 检测恶意用户:
import org.apache.flink.api.scala._
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object BarrageBehavior01 {
case class LoginEvent(userId:String, message:String, timestamp:Long){
override def toString: String = userId
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用IngestionTime作为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 用于观察测试数据处理顺序
env.setParallelism(1)
// 模拟数据源
val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
List(
LoginEvent("1", "TMD", 1618498576),
LoginEvent("1", "TMD", 1618498577),
LoginEvent("1", "TMD", 1618498579),
LoginEvent("1", "TMD", 1618498582),
LoginEvent("2", "TMD", 1618498583),
LoginEvent("1", "TMD", 1618498585)
)
).assignAscendingTimestamps(_.timestamp * 1000)
//定义模式
val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
.where(_.message == "TMD")
.times(5)
.within(Time.seconds(10))
//匹配模式
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
import scala.collection.Map
val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
val first = pattern.getOrElse("begin", null).iterator.next()
(first.userId, first.timestamp)
})
//恶意用户,实际处理可将按用户进行禁言等处理,为简化此处仅打印出该用户
1 2 下一页>