Flink任务开发:从代码编写到集群提交
人民网>>社会·法治

Flink任务开发:从代码编写到集群提交

2025-06-24 11:39:51 | 来源:人民网
小字号

Flink任务开发:从代码编写到集群提交

一、引言

Apache Flink是一个分布式流批一体化处理引擎,在大数据处理领域应用广泛。本文将详细介绍如何开发Flink任务,包括使用DataStream API进行编码、打包并提交到集群上运行,以及提交任务的两种方式。

二、Flink编码步骤/模型

Flink任务的开发通常遵循以下几个步骤:

  1. env - 准备环境:创建StreamExecutionEnvironment对象,这是Flink程序的基础环境,用于设置运行时参数、获取数据源等操作。例如:
StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();

可以设置运行模式,如STREAMING(流处理,默认模式)、BATCH(批处理)或AUTOMATIC(根据数据自动判断)。

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

还可以获取或设置任务的并行度,并行度决定了任务在集群中执行的并行程度。

// 设置并行度为2env.setParallelism(2);// 获取系统的并行度intparallelism =env.getParallelism();
  1. source - 加载数据:从各种数据源读取数据,如文件、Kafka、Socket等。这里以从元素列表读取数据为例:
DataStream<String>dataStream01 =env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");

也可以从文件读取数据,并且可以通过外部参数指定文件路径:

DataStream<String>dataStream =null;if(args.length!=0){ Stringpath;ParameterToolparameterTool =ParameterTool.fromArgs(args);if(parameterTool.has("input")){ path =parameterTool.get("input");}else{ path =args[0];}dataStream =env.readTextFile(path);}else{ dataStream =env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");}
  1. transformation - 数据处理转换:对读取的数据进行各种转换操作,如mapflatMapfilterkeyBysum等。例如:
DataStream<String>flatMapStream =dataStream01.flatMap(newFlatMapFunction<String,String>(){ @OverridepublicvoidflatMap(Stringline,Collector<String>collector)throwsException{ String[]arr =line.split(" ");for(Stringword :arr){ collector.collect(word);}}});DataStream<Tuple2<String,Integer>>mapStream =flatMapStream.map(newMapFunction<String,Tuple2<String,Integer>>(){ @OverridepublicTuple2<String,Integer>map(Stringword)throwsException{ returnTuple2.of(word,1);}});DataStream<Tuple2<String,Integer>>sumResult =mapStream.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){ @OverridepublicStringgetKey(Tuple2<String,Integer>tuple2)throwsException{ returntuple2.f0;}}).sum(1);

也可以使用Lambda表达式进行简洁的函数式编程,但使用Lambda表达式后需要添加returns指定返回类型,否则可能报错。例如:

DataStream<String>wordsDS =dataStream.flatMap((Stringvalue,Collector<String>out)->{ String[]words =value.split(" ");for(Stringword :words){ out.collect(word);}}).returns(Types.STRING);DataStream<Tuple2<String,Integer>>wordAndOneDS =wordsDS.map((Stringvalue)->Tuple2.of(value,1)).returns(Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String>keyedDS =wordAndOneDS.keyBy((Tuple2<String,Integer>value)->value.f0);SingleOutputStreamOperator<Tuple2<String,Integer>>result =keyedDS.sum(1);
  1. sink - 数据输出:将处理后的数据输出到各种目标,如文件、Kafka、控制台等。这里以打印到控制台为例:
sumResult.print();
  1. execute - 执行:启动Flink任务的执行。
env.execute();

三、DataStream API开发

  1. 添加依赖:在Maven项目中添加以下依赖:
<properties><flink.version>1.13.6flink.version>properties><dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-java-bridge_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-planner-blink_2.11artifactId><version>${ flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-shaded-hadoop-2-uberartifactId><version>2.7.5-10.0version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency><dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><version>1.18.24version>dependency>dependencies><build><extensions><extension><groupId>org.apache.maven.wagongroupId><artifactId>wagon-sshartifactId><version>2.8version>extension>extensions><plugins><plugin><groupId>org.codehaus.mojogroupId><artifactId>wagon-maven-pluginartifactId><version>1.0version><configuration><fromFile>target/${ project.build.finalName}.jarfromFile><url>scp://root:root@bigdata01:/opt/appurl>configuration>plugin>plugins>build>
  1. 编写代码:按照上述编码步骤编写具体的业务逻辑代码,如WordCount示例代码。

四、打包、上传与提交任务

  1. 打包:使用Maven命令mvn clean package对项目进行打包,生成可执行的JAR文件。
  2. 上传:通过配置wagon-maven-plugin,可以在打包时将JAR文件上传到指定的远程服务器路径。
  3. 提交任务
    • 以UI的方式递交:在Flink集群的Web界面中,上传打包好的JAR文件,并配置相关参数,如主类名、运行参数等,然后启动任务。
    • 以命令的方式递交
      • 带有自定义input参数的提交方式:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input /home/wc.txt
- 带有运行模式的提交方式:
flink run -Dexecution.runtime-mode=BATCH -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input hdfs://bigdata01:9820/home/wc.txt

五、总结

通过本文的介绍,我们了解了Flink任务开发的基本流程,包括使用DataStream API进行编码、打包上传以及提交任务的两种方式。在实际应用中,可以根据具体的业务需求,灵活运用Flink的各种功能和特性,构建高效、可靠的大数据处理应用。同时,需要注意Flink版本的兼容性以及相关依赖的管理,以确保任务的顺利开发和运行。

(责编:人民网)

分享让更多人看到