【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】
- 1)函数类(Function Classes)
- 2)富函数类(Rich Function Classes)
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
1)函数类(Function Classes)
Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:
方式一:实现 FilterFunction 接口
public class TransFunctionUDF { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource
stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); DataStream filter = stream.filter(new UserFilter()); filter.print(); env.execute(); } public static class UserFilter implements FilterFunction { @Override public boolean filter(WaterSensor e) throws Exception { return e.id.equals("sensor_1"); } } } 方式二:通过匿名类来实现 FilterFunction 接口
DataStream
stream = stream.filter(new FilterFunction< WaterSensor>() { @Override public boolean filter(WaterSensor e) throws Exception { return e.id.equals("sensor_1"); } }); 方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource
stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); DataStream stream = stream.filter(new FilterFunctionImpl("sensor_1")); public static class FilterFunctionImpl implements FilterFunction { private String id; FilterFunctionImpl(String id) { this.id = id; } @Override public boolean filter(WaterSensor value) throws Exception { return thid.id.equals(value.id); } } } 方式三:采用匿名函数(Lambda)
public class TransFunctionUDF { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource
stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); //map 函数使用 Lambda 表达式,不需要进行类型声明 SingleOutputStreamOperator filter = stream.filter(sensor -> "sensor_1".equals(sensor.id)); filter.print(); env.execute(); } } 2)富函数类(Rich Function Classes)
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunction 、RichFilterFunction、RichReduceFunction 等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function 有生命周期的概念。典型的生命周期方法有:
-
open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。
-
close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。
来看一个例子说明:
public class RichFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env .fromElements(1, 2, 3, 4) .map(new RichMapFunction
() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始"); } @Override public Integer map(Integer integer) throws Exception { return integer + 1; } @Override public void close() throws Exception { super.close(); System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束"); } }) .print(); env.execute(); } }
-
猜你喜欢
- 12天前(哈弗h9优惠9万是真的吗)热浪来袭,哈弗H9超值补贴火热加码
- 12天前(fender japan hybrid)Fender东京旗舰店盛大开幕在即,开售商品和店内服务提前揭晓
- 12天前(万达酒店及度假村连续五年荣获“中国饭店集团60强”)万达酒店及度假村连续五年荣获“中国饭店集团60强”
- 12天前(新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!)新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!
- 12天前(纳米比亚旅游报价)纳米比亚旅游局2024年中国推介会圆满落幕
- 12天前(世茂海峡大厦多高)巴西地产高管齐聚厦门世茂海峡大厦 共探超高层建筑锻造经验
- 12天前(大黄山景区高质量发展联盟成立多少年)大黄山景区高质量发展联盟成立
- 12天前(内蒙古交通旅游图)内蒙古着力提升交通与旅游服务水平
- 12天前(殷建祥简历)全国十大牛商解码:殷建祥如何用178天技术突围打造星空梦星空房
- 12天前(新西兰旅游局×时差岛:黄觉的“长白云之乡”奇遇)新西兰旅游局×时差岛:黄觉的“长白云之乡”奇遇
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章