CaptureChangeMySQL的风险
这两周都在对Apache NiFi的CaptureChangeMySQL组件做优化,因为我们在用NiFi做数据同步的过程中,发现它的CDC组件存在一些OOM问题。
NiFi的CDC组件启动时有两个线程在运行:Binlog监听线程和NiFi组件线程。Binlog监听用的是开源工具mysql-binlog-connector-java,这个工具会启动一个监听线程不断接收来自MySQL的Binlog事件;NiFi组件线程则是NiFi调度系统控制的线程。一个CDC组件实例在运行时,Binlog监听线程在后面不断接收数据并存入一个阻塞队列中,NiFi组件线程通过调用onTrigger方法不断消费队列。组件停止时,NiFi组件线程通过一个AtomicBoolean字段通知Binlog监听线程停止工作,除此之外没有任何方法可以停止Binlog监听线程。
造成OOM的原因有几点:
- 内存队列没有设置上限。虽然是阻塞队列,但未设置上限意味着写入时不会阻塞,也就是说MySQL产生了binlog数据就会被存入阻塞队列,消费不及时就会占用大量内存;
- 消费数据时,会把一个事务所有Binlog事件的解析结果都缓存在内存,直到遇到commit事件才会输出到下游。这相当于保证了接收端的事务,如果事务出错能够回滚,但这也导致了内存的大量消耗,遇到业务线刷数据这种一个事务几百万事件的情况就会直接崩掉,比如update某个字段。
- NiFi背压机制不能控制Binlog监听线程停止运行。只有组件停止,才会通过AtomicBoolean字段关闭Binlog监听线程。而在背压时,NIFI并不会调用stop方法停止组件,而是调度系统不再调用onTrigger方法触发组件的工作,组件后台的Binlog监听线程仍然在运行。
为了解决这些问题,我们尝试基于原生cdc组件来开发一个不会OOM的自定义组件。
设计思路
- CDC组件不给内存队列设置上限是有原因的,我们当时实现了阻塞队列的CDC,运行时发现阻塞时间过长后再运行会导致获取的binlog事件无法解析,要了解具体原因需要知道MySQL发送binlog事件的整个过程,等查明白了再把原因放上来。
- 第二个设计思路是不缓存,binlog监听线程拿到数据就提交到下游队列里。由于binlog位点应该从一个事务开始处启动,所以在一个事件输出完后,会在组件状态里记录下一个要输出的事务的起始位点。这样如果组件关闭后重启,可能造成数据重复输出。另一个缺点是不受NiFi背压机制的控制,只要组件在运行,有binlog事件产生就会输出到队列里。不能保证精确一次,但速度是很快的,所以后面我们打算把它作为可配置功能加到组件里。
- 第三个思路是文件缓存,接收到一个事务的数据后缓存到文件里,当事务结束后再复制到另一个统一的日志文件里,然后组件线程通过ontrigger读取统一日志文件。这样就能控制一个事务未完成的时候,不写入日志文件,重启时不会造成重复。
我们最终选择了第三个思路,前面两个都有比较严重的缺陷,第三个文件方式,虽然要读写文件,慢是慢点,总比出问题强。
细节
缓存的内容
既然要写文件,首先考虑要存入的内容。一开始我考虑的是写原生的event事件,拿到就写进去,等组件线程读出来再做处理,这样的好处是改动不大,能尽量与原生组件的逻辑保持一致。但这直接导致了我需要考虑读和写两个场景的binlog位点、读写文件名和位置的记录,以及读写不同场景下的tableInfo、currentDatabase等信息的缓存,在组件可以灵活停止重启的情况下,保证这么多信息的正确性变得非常不容易。我在binlog位点不断出错和tableInfo日常丢失等各种痛苦里挣扎了好几天,直到同事看了我的代码,提醒我应该在拿到binlog事件时就解析好再写文件,我才开始考虑新的方法。
确实,如果拿到事件就处理好,写入文件的只是结果,那等组件消费时逻辑就非常简单了。这一点我在一开始就考虑过,但因为原生的cdc组件解析的结果不能序列化,并且非常复杂,就放弃了。后来我发现不能缓存结果的话可以缓存生成结果的参数,等消费时取参数生成就好了。于是自定义了一个事件类记录各种参数。变长数据写文件
采用的方法很朴实,就是先写长度,再写内容,读的时候先读出一个long类型8字节表示长度,再读这个长度的字节数解析成事件就可以。
加入gitd
gtid是下一版本1.12.0的特性,但是我们生产环境的mysql都是集群,连的智能节点,每次binlog位点一漂移就报错。模仿了新版本的实现,结合我们写文件的机制实现了gtid特性,因为底层框架已经实现了gtid,所以这个过程不是很麻烦。
从小文件复制到大文件的过程中,程序崩溃了怎么办
当这种流式数据的处理程序开始需要考虑容错和幂等性时,问题就会变得更加复杂了。如果一个事务文件写完后复制到统一日志文件,然后程序崩溃了,binlog位点没记录上,也就是说重启NiFi后state里存的还是上个事务的,但日志文件里已经有了这个事务的数据,这就数据重复了。为了避免这种情况发现,在复制文件前会记录一次统一日志文件的大小,等写完了再记一次。如果在这期间程序崩溃了第二次就没记到,所以还是上一次的。这时候我们只要重启时发现记录的大小比真实文件大小要小,就说明复制中断了,把文件恢复到上次大小就可以了,多余的内容删掉。
结果
目前已经能正常运行了,20万事件的大事务无压力,gtid特性也不错。但这种方式对高速磁盘的依赖更高了,本身NiFi对磁盘的依赖就非常高(所有flowfile的操作记录都会落地磁盘,用于快照恢复),所以要想达到生产环境的使用还需要进一步优化。
目前社区对这个问题也没有很好的解决方法,期待后续的优化。