Flink任务开发:从代码编写到集群提交
2025-06-24 11:39:51 | 来源:人民网

Flink任务开发:从代码编写到集群提交
一、引言
Apache Flink是一个分布式流批一体化处理引擎,在大数据处理领域应用广泛。本文将详细介绍如何开发Flink任务,包括使用DataStream API进行编码、打包并提交到集群上运行,以及提交任务的两种方式。
二、Flink编码步骤/模型
Flink任务的开发通常遵循以下几个步骤:
- env - 准备环境:创建
StreamExecutionEnvironment
对象,这是Flink程序的基础环境,用于设置运行时参数、获取数据源等操作。例如:
StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();
可以设置运行模式,如STREAMING
(流处理,默认模式)、BATCH
(批处理)或AUTOMATIC
(根据数据自动判断)。
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
还可以获取或设置任务的并行度,并行度决定了任务在集群中执行的并行程度。
// 设置并行度为2env.setParallelism(2);// 获取系统的并行度intparallelism =env.getParallelism();
- source - 加载数据:从各种数据源读取数据,如文件、Kafka、Socket等。这里以从元素列表读取数据为例:
DataStream<String>dataStream01 =env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");
也可以从文件读取数据,并且可以通过外部参数指定文件路径:
DataStream<String>dataStream =null;if(args.length!=0){ Stringpath;ParameterToolparameterTool =ParameterTool.fromArgs(args);if(parameterTool.has("input")){ path =parameterTool.get("input");}else{ path =args[0];}dataStream =env.readTextFile(path);}else{ dataStream =env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");}
- transformation - 数据处理转换:对读取的数据进行各种转换操作,如
map
、flatMap
、filter
、keyBy
、sum
等。例如:
DataStream<String>flatMapStream =dataStream01.flatMap(newFlatMapFunction<String,String>(){ @OverridepublicvoidflatMap(Stringline,Collector<String>collector)throwsException{ String[]arr =line.split(" ");for(Stringword :arr){ collector.collect(word);}}});DataStream<Tuple2<String,Integer>>mapStream =flatMapStream.map(newMapFunction<String,Tuple2<String,Integer>>(){ @OverridepublicTuple2<String,Integer>map(Stringword)throwsException{ returnTuple2.of(word,1);}});DataStream<Tuple2<String,Integer>>sumResult =mapStream.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){ @OverridepublicStringgetKey(Tuple2<String,Integer>tuple2)throwsException{ returntuple2.f0;}}).sum(1);
也可以使用Lambda表达式进行简洁的函数式编程,但使用Lambda表达式后需要添加returns
指定返回类型,否则可能报错。例如:
DataStream<String>wordsDS =dataStream.flatMap((Stringvalue,Collector<String>out)->{ String[]words =value.split(" ");for(Stringword :words){ out.collect(word);}}).returns(Types.STRING);DataStream<Tuple2<String,Integer>>wordAndOneDS =wordsDS.map((Stringvalue)->Tuple2.of(value,1)).returns(Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String>keyedDS =wordAndOneDS.keyBy((Tuple2<String,Integer>value)->value.f0);SingleOutputStreamOperator<Tuple2<String,Integer>>result =keyedDS.sum(1);
- sink - 数据输出:将处理后的数据输出到各种目标,如文件、Kafka、控制台等。这里以打印到控制台为例:
sumResult.print();
- execute - 执行:启动Flink任务的执行。
env.execute();
三、DataStream API开发
- 添加依赖:在Maven项目中添加以下依赖:
<properties><flink.version>1.13.6flink.version>properties><dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-java-bridge_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-planner-blink_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-shaded-hadoop-2-uberartifactId><version>2.7.5-10.0version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency><dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><version>1.18.24version>dependency>dependencies><build><extensions><extension><groupId>org.apache.maven.wagongroupId><artifactId>wagon-sshartifactId><version>2.8version>extension>extensions><plugins><plugin><groupId>org.codehaus.mojogroupId><artifactId>wagon-maven-pluginartifactId><version>1.0version><configuration><fromFile>target/${ project.build.finalName}.jarfromFile><url>scp://root:root@bigdata01:/opt/appurl>configuration>plugin>plugins>build>
- 编写代码:按照上述编码步骤编写具体的业务逻辑代码,如WordCount示例代码。
四、打包、上传与提交任务
- 打包:使用Maven命令
mvn clean package
对项目进行打包,生成可执行的JAR文件。 - 上传:通过配置
wagon-maven-plugin
,可以在打包时将JAR文件上传到指定的远程服务器路径。 - 提交任务:
- 以UI的方式递交:在Flink集群的Web界面中,上传打包好的JAR文件,并配置相关参数,如主类名、运行参数等,然后启动任务。
- 以命令的方式递交:
- 带有自定义
input
参数的提交方式:
- 带有自定义
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input /home/wc.txt
- 带有运行模式的提交方式:
flink run -Dexecution.runtime-mode=BATCH -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input hdfs://bigdata01:9820/home/wc.txt
五、总结
通过本文的介绍,我们了解了Flink任务开发的基本流程,包括使用DataStream API进行编码、打包上传以及提交任务的两种方式。在实际应用中,可以根据具体的业务需求,灵活运用Flink的各种功能和特性,构建高效、可靠的大数据处理应用。同时,需要注意Flink版本的兼容性以及相关依赖的管理,以确保任务的顺利开发和运行。
(责编:人民网)
分享让更多人看到