Flink01:编译源码并在IDEA中运行Standalone模式

Posted by YI on 2022-11-20

Flink01:编译源码并在IDEA中运行Standalone模式

简述

本系列是个人以参与Flink开源、为社区贡献代码为目标进行研究的笔记心得

Flink版本以及后续更新

  1. 记录此笔记时,Flink社区的最新版本是v.1.17-SNAPSHOT
  2. 本项目将紧跟社区代码的更新

源码下载和环境搭建

  1. fork和clone代码

    为了方便后续为社区贡献代码,需要将flink fork到个人github仓库,从个人仓库clone代码到本地,通过idea加载项目。

  2. 解决maven依赖问题:

    最好同时使用阿里云maven镜像和中央仓库镜像,两者同时使用能够解决大部分依赖问题,遇到不能解决的可以通过手工下载放入.m2目录、编译依赖项目对应版本的源码等方式解决。

  3. 官方要求的IDEA插件

    参考资料:Importing Flink into an IDE v1.17-SNAPSHOT

    官方要求在idea中安装Scala、Save Actions、Checkstyle-IDEA、google-java-format插件。其中google-java-format有对应的版本要求,目前(1.17-SNAPSHOT)要求v1.7.0.6,需要到文档中指定的url下载,并通过磁盘安装的方式安装到idea,确保不要更新该插件。

    如果要修改PyFlink模块,需要python环境

  4. 代码风格检查

    Flink采用spotless和google-java-format来格式化代码,对于Scala则采用Spotless with scalafmt。

    需要设置google-java-format的代码风格,设置Actions on Save的行为和自动格式化的目标文件后缀名。

    重点设置Checkstyle,具体操作详见参考资料Importing Flink into an IDE v1.17-SNAPSHOT中的Code FormattingCheckstyle For Java两部分内容

源码编译

  1. maven版本:3.8.4

  2. 编译过程和注意事项

    • 整个编译过程分为两步,package和assemble,assemble放在flink-dist中单独进行,所以整个流程是这样的:

      1
      2
      3
      4
      5
      6
      # 在flink目录中
      mvn clean install -DskipTests -Dmaven.javadoc.skip=false -T 1C
      # 进入flink-dist目录
      cd flink-dist
      # 打包
      mvn install -DskipTests

      为了编译速度快一些,可以用-T 1C命令来让1个cpu执行一个项目的编译;

      为了阅读源码方便,可以加入-Dmaven.javadoc.skip=false生成javadoc;

      如果报Too many files with unapproved license, 需要加入-Drat.skip=true 跳过许可证发行检查,或者根据Importing Flink into an IDE v1.17-SNAPSHOT中的Copyright Profile章节,对许可证进行配置。

    • 打包结果

      第一步执行完成后会在各个模块生成jar包,进入flink-dist进行assemble后会把jar包收集到flink-dist模块下的target中,生成flink-*-bin目录

在IDEA中运行Standalone模式

为了修改和调试方便,我们需要在IDEA中直接运行flink,而不是采用remote方式远程调试。所以需要在idea中单独运行JobManager和TaskManager

JobManager

  1. 目标函数

    Flink提供了Standalone模式的入库,在flink-runtime模块下的StandaloneSessionClusterEntrypoint类里。直接执行这个Main方法会提示以下信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    usage: StandaloneSessionClusterEntrypoint -c <configuration directory> [-D
    <property=value>] [-h <hostname>] [-r <rest port>] [-x <execution mode>]
    -c,--configDir <configuration directory> Directory which contains the
    configuration file
    flink-conf.yml.
    -D <property=value> use value for given property
    -h,--host <hostname> Hostname for the RPC service.
    -r,--webui-port <rest port> Port for the rest endpoint and
    the web UI.
    -x,--executionMode <execution mode> Deprecated option

    这是一个help信息,要求我们用-c参数输入一个配置文件所在目录。

    这个配置文件目录就是flink编译后的conf目录。所以我们在idea的application执行界面的program arguments中填入编译后的conf目录:

    1
    -c C:\document\codes\flink\flink-dist\target\flink-1.17-SNAPSHOT-bin\flink-1.17-SNAPSHOT\conf

  2. 解决主类和依赖的问题

    配置完成后执行StandaloneSessionClusterEntrypoint,会出现找不到主类的问题,我们需要依赖其他的jar包。在IDEA的File -> project structure->Modules中给flink-runtime-添加依赖,把我们编译得到的flink-*-bin/lib目录下的所有jar包都添加进去。

  3. 日志配置

    处理完依赖问题后,已经能够启动项目了但是无法看到日志,需要在jvm启动命令中指定log4j配置文件

    1
    -Dlog4j.configurationFile=file:/C:/document/codes/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/conf/log4j-console.properties

    log4j-console.properties文件中,日志目录被配置为$sys:log.file$,即从系统环境变量中取log.file,如果没有配置的话仍然会无法显示日志或者报错,所以需要提前配置好。

    我这边不打算采用系统环境变量的方式,而是用自定义的环境变量log.file=target/log/flink.log,并把log4j-console.properties中的$sys:log.file$都改成$env:log.file$,这样的好处是可以把JobManager和TaskManager的日志分开。

  4. 验证:打开localhost:8081可以出现flink界面

TaskManager

  1. 基础配置

    TaskManager的运行配置跟JobManager大体相似,运行模块和日志配置方式都是一样的,只是主类不一样,所以可以直接把JobManager的配置复制一份,修改主类名称为:org.apache.flink.runtime.taskexecutor.TaskManagerRunner。由于都是runtime模块下的类,在JM阶段已经解决了依赖问题,这里就不需要再做处理了。

    为了避免日志写入到同一个文件里,可以在环境变量中把日志文件换个名字:log.file=target/log/taskmanager.log(如果日志目录用的环境变量就不能用这种方式了)

  2. 启动taskmanager报错处理

    • 创建临时目录出错

      启动时会创建一个临时目录,这个目录的名字格式是:tm_localhost:63324-413063,其中冒号在windows里是不被允许的,需要修改TaskManagerRunner类中构造TaskManagerResourceID的方法getTaskManagerResourceID(),把ip和端口的连接符号从冒号改成下划线

    • 配置项检查不通过

      taskmanager在启动时会检查TaskManager的必要配置项,这些配置项在flink-conf.yml文件中默认是没有配置的,有可能是在shell脚本中设置的,为了避免麻烦我们可以直接在flink-conf.yml文件中手动添加:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      # ================== 单独启动时添加 ===================
      taskmanager.cpu.cores: 2

      taskmanager.memory.task.heap.size: 750mb
      taskmanager.memory.task.off-heap.size: 0mb

      taskmanager.memory.managed.size: 2048mb

      taskmanager.memory.network.min: 128mb
      taskmanager.memory.network.max: 128mb

      taskmanager.memory.framework.heap.size: 128mb
      taskmanager.memory.framework.off-heap.size: 128mb
      taskmanager.log.path: target/log/task.log

      # jvm
      taskmanager.memory.jvm-metaspace.size: 256mb
      taskmanager.memory.jvm-overhead.max: 256mb
      taskmanager.memory.jvm-overhead.min: 256mb
      # ==================================================

      注意:

      1. 其中taskmanager.log.path是task的日志文件,stdout文件会与日志同名并放到相同目录。如果配置文件里没有,会到系统环境变量里取log.file,如果都没有在点击Task的日志或者stdout时JobManager后台会报错。
      2. taskmanager.memory.network.mintaskmanager.memory.network.min要求保持相等、taskmanager.memory.jvm-overhead.maxtaskmanager.memory.jvm-overhead.min要求保持相等
  3. 验证:Taskmaanger在启动后会自动注册到JobManager,所以在flink界面能够看到TaskManager的数量从0变成了1