Flink01:编译源码并在IDEA中运行Standalone模式
简述
本系列是个人以参与Flink开源、为社区贡献代码为目标进行研究的笔记心得
Flink版本以及后续更新
- 记录此笔记时,Flink社区的最新版本是v.1.17-SNAPSHOT
- 本项目将紧跟社区代码的更新
源码下载和环境搭建
fork和clone代码
为了方便后续为社区贡献代码,需要将flink fork到个人github仓库,从个人仓库clone代码到本地,通过idea加载项目。
解决maven依赖问题:
最好同时使用阿里云maven镜像和中央仓库镜像,两者同时使用能够解决大部分依赖问题,遇到不能解决的可以通过手工下载放入.m2目录、编译依赖项目对应版本的源码等方式解决。
官方要求的IDEA插件
官方要求在idea中安装Scala、Save Actions、Checkstyle-IDEA、google-java-format插件。其中google-java-format有对应的版本要求,目前(1.17-SNAPSHOT)要求v1.7.0.6,需要到文档中指定的url下载,并通过磁盘安装的方式安装到idea,确保不要更新该插件。
如果要修改PyFlink模块,需要python环境
代码风格检查
Flink采用spotless和google-java-format来格式化代码,对于Scala则采用Spotless with scalafmt。
需要设置google-java-format的代码风格,设置Actions on Save的行为和自动格式化的目标文件后缀名。
重点设置Checkstyle,具体操作详见参考资料Importing Flink into an IDE v1.17-SNAPSHOT中的
Code Formatting和Checkstyle For Java两部分内容
源码编译
maven版本:
3.8.4编译过程和注意事项
整个编译过程分为两步,package和assemble,assemble放在flink-dist中单独进行,所以整个流程是这样的:
1
2
3
4
5
6在flink目录中
mvn clean install -DskipTests -Dmaven.javadoc.skip=false -T 1C
进入flink-dist目录
cd flink-dist
打包
mvn install -DskipTests为了编译速度快一些,可以用
-T 1C命令来让1个cpu执行一个项目的编译;为了阅读源码方便,可以加入
-Dmaven.javadoc.skip=false生成javadoc;如果报Too many files with unapproved license, 需要加入-Drat.skip=true 跳过许可证发行检查,或者根据Importing Flink into an IDE v1.17-SNAPSHOT中的Copyright Profile章节,对许可证进行配置。
打包结果
第一步执行完成后会在各个模块生成jar包,进入flink-dist进行assemble后会把jar包收集到flink-dist模块下的target中,生成flink-*-bin目录
在IDEA中运行Standalone模式
为了修改和调试方便,我们需要在IDEA中直接运行flink,而不是采用remote方式远程调试。所以需要在idea中单独运行JobManager和TaskManager
JobManager
目标函数
Flink提供了Standalone模式的入库,在flink-runtime模块下的StandaloneSessionClusterEntrypoint类里。直接执行这个Main方法会提示以下信息:
1
2
3
4
5
6
7
8
9
10usage: StandaloneSessionClusterEntrypoint -c <configuration directory> [-D
<property=value>] [-h <hostname>] [-r <rest port>] [-x <execution mode>]
-c,--configDir <configuration directory> Directory which contains the
configuration file
flink-conf.yml.
-D <property=value> use value for given property
-h,--host <hostname> Hostname for the RPC service.
-r,--webui-port <rest port> Port for the rest endpoint and
the web UI.
-x,--executionMode <execution mode> Deprecated option这是一个help信息,要求我们用-c参数输入一个配置文件所在目录。
这个配置文件目录就是flink编译后的conf目录。所以我们在idea的application执行界面的program arguments中填入编译后的conf目录:
1
-c C:\document\codes\flink\flink-dist\target\flink-1.17-SNAPSHOT-bin\flink-1.17-SNAPSHOT\conf

解决主类和依赖的问题
配置完成后执行StandaloneSessionClusterEntrypoint,会出现找不到主类的问题,我们需要依赖其他的jar包。在IDEA的File -> project structure->Modules中给flink-runtime-添加依赖,把我们编译得到的
flink-*-bin/lib目录下的所有jar包都添加进去。日志配置
处理完依赖问题后,已经能够启动项目了但是无法看到日志,需要在jvm启动命令中指定log4j配置文件
1
-Dlog4j.configurationFile=file:/C:/document/codes/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/conf/log4j-console.properties
在
log4j-console.properties文件中,日志目录被配置为$sys:log.file$,即从系统环境变量中取log.file,如果没有配置的话仍然会无法显示日志或者报错,所以需要提前配置好。我这边不打算采用系统环境变量的方式,而是用自定义的环境变量
log.file=target/log/flink.log,并把log4j-console.properties中的$sys:log.file$都改成$env:log.file$,这样的好处是可以把JobManager和TaskManager的日志分开。验证:打开localhost:8081可以出现flink界面
TaskManager
基础配置
TaskManager的运行配置跟JobManager大体相似,运行模块和日志配置方式都是一样的,只是主类不一样,所以可以直接把JobManager的配置复制一份,修改主类名称为:
org.apache.flink.runtime.taskexecutor.TaskManagerRunner。由于都是runtime模块下的类,在JM阶段已经解决了依赖问题,这里就不需要再做处理了。为了避免日志写入到同一个文件里,可以在环境变量中把日志文件换个名字:
log.file=target/log/taskmanager.log(如果日志目录用的环境变量就不能用这种方式了)启动taskmanager报错处理
创建临时目录出错
启动时会创建一个临时目录,这个目录的名字格式是:tm_localhost:63324-413063,其中冒号在windows里是不被允许的,需要修改TaskManagerRunner类中构造TaskManagerResourceID的方法getTaskManagerResourceID(),把ip和端口的连接符号从冒号改成下划线
配置项检查不通过
taskmanager在启动时会检查TaskManager的必要配置项,这些配置项在flink-conf.yml文件中默认是没有配置的,有可能是在shell脚本中设置的,为了避免麻烦我们可以直接在flink-conf.yml文件中手动添加:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# ================== 单独启动时添加 ===================
taskmanager.cpu.cores: 2
taskmanager.memory.task.heap.size: 750mb
taskmanager.memory.task.off-heap.size: 0mb
taskmanager.memory.managed.size: 2048mb
taskmanager.memory.network.min: 128mb
taskmanager.memory.network.max: 128mb
taskmanager.memory.framework.heap.size: 128mb
taskmanager.memory.framework.off-heap.size: 128mb
taskmanager.log.path: target/log/task.log
# jvm
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.max: 256mb
taskmanager.memory.jvm-overhead.min: 256mb
# ==================================================注意:
- 其中
taskmanager.log.path是task的日志文件,stdout文件会与日志同名并放到相同目录。如果配置文件里没有,会到系统环境变量里取log.file,如果都没有在点击Task的日志或者stdout时JobManager后台会报错。 taskmanager.memory.network.min、taskmanager.memory.network.min要求保持相等、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.min要求保持相等
- 其中
验证:Taskmaanger在启动后会自动注册到JobManager,所以在flink界面能够看到TaskManager的数量从0变成了1