文章目录
- 01 引言
- 02 简介概述
- 03 基于文件读取数据
- 3.1 readTextFile(path)
- 3.2 readFile(fileInputFormat, path)
- 3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 3.4 实现原理
- 3.5 注意事项
- 3.6 支持读取的文件形式
- 04 源码实战demo
- 4.1 pom.xml依赖
- 4.2 创建文件数据流作业
- 4.3 运行程序查看日志
01 引言
源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git 模块:aurora_flink 主类:FlinkFileSourceJob(文件)
02 简介概述
1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。 2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。 3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。
03 基于文件读取数据
3.1 readTextFile(path)
读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
3.2 readFile(fileInputFormat, path)
按照指定的文件输入格式读取(一次)文件。
3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。
3.4 实现原理
底层Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。
3.5 注意事项
(1)如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
(2)如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
3.6 支持读取的文件形式
1.本地文件
2.HDFS文件
3.文件夹
4.压缩文件
04 源码实战demo
4.1 pom.xml依赖
4.0.0 com.xsy aurora_flink1.0-SNAPSHOT 11 3.8.1 UTF-8 UTF-8 1.2.75 2.17.1 1.18.0 2.11 2.17.1 com.alibaba fastjson${fastjson.version} org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-scala_2.12${flink.version} org.apache.flink flink-clients${flink.version} org.apache.logging.log4j log4j-slf4j-impl${log4j.version} org.apache.logging.log4j log4j-api${log4j.version} org.apache.logging.log4j log4j-core${log4j.version} ${project.name} src/main/resources src/main/java **/*.xml org.apache.maven.plugins maven-shade-plugin3.1.1 package shade org.apache.flink:force-shading org.google.code.flindbugs:jar305 org.slf4j:* org.apache.logging.log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA org.xsy.sevenhee.flink.TestStreamJob org.springframework.boot spring-boot-maven-plugin${spring.boot.version} true ${project.build.finalName} repackage maven-compiler-plugin ${maven.plugin.version} ${java.version} ${java.version} UTF-8 -parameters aliyun-repos https://maven.aliyun.com/nexus/content/groups/public/ false aliyun-plugin https://maven.aliyun.com/nexus/content/groups/public/ false 4.2 创建文件数据流作业
package com.aurora.source; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @description flink的文件source应用 * @author 浅夏的猫 * @datetime 23:03 2024/1/28 */ public class FlinkFileSourceJob { private static final Logger logger = LoggerFactory.getLogger(FlinkFileSourceJob.class); public static void main(String[] args) throws Exception { //1.创建Flink运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置Flink运行模式: //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式) env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //3.基于文件的source使用(本地/HDFS文件/文件夹/压缩文件) //3.1本地文件 DataStreamSource
dataStreamSourceFile = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties"); //3.2 HDFS文件,前提你已经搭建环境 // DataStreamSource dataStreamSourceHdfs = env.readTextFile("hdfs://localhost:8020//source/application.txt"); //3.3文件夹 DataStreamSource dataStreamSourceDir = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources"); //3.4压缩文件 DataStreamSource dataStreamSourceRar = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\test.rar"); //4.输出打印 dataStreamSourceFile.print(); // dataStreamSourceHdfs.print(); dataStreamSourceDir.print(); dataStreamSourceRar.print(); //5.启动运行 env.execute(); } } 4.3 运行程序查看日志
猜你喜欢
- 1月前(鄂尔多斯航空公司客服电话)架起“北方之路” ,中国联合航空带您飞向鄂尔多斯重回1倍速
- 1月前(江西启动“唱游江西”计划)江西启动“唱游江西”计划
- 1月前(上海文旅产业发展高峰论坛)《上海打造文旅元宇宙新赛道行动方案》发布
- 1月前(三亚太阳湾柏悦度假酒店)三亚太阳湾柏悦酒店携手ROSEONLY诺誓缔造浪漫七夕
- 1月前(曼谷丽思卡尔顿公寓价格)在曼谷丽思卡尔顿酒店CALEŌ 邂逅鸡尾酒的浪漫艺术
- 1月前(安岚度假村及酒店推出"山海之约"目的地婚礼计划)安岚度假村及酒店推出"山海之约"目的地婚礼计划
- 1月前(希尔顿集团2021年筹建的酒店)希尔顿集团两大重点项目亮相第四届上海旅游投资促进大会
- 1月前(大连aaaaa景区)辽宁大连A级旅游景区应急救护水平整体跃升
- 1月前(内蒙古交通旅游图)内蒙古着力提升交通与旅游服务水平
- 1月前(新西兰登陆《我的世界》!全球首个目的地游戏模组震撼上线)新西兰登陆《我的世界》!全球首个目的地游戏模组震撼上线
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章