前面的文章提到,在学会使用yappi之后,我们立即重新做了一轮profile。结果发现最占用CPU的果然是kafka写操作和redis读操作,这都是调用的第三方的库,基本上很难再进行优化。
那接下来怎么办呢?log_generator组件占用CPU资源严重的问题该如何解决呢?
既然单个组件不好优化,那么我们必须从系统的角度来分析下,为何该组件会大量占用系统资源。
首先,考虑该组件的角色。log_generator实际是一个消费者,它不停地监听redis的某些channel,当其他组件发布数据到redis的channel上时,log_generator就写log文件并且写kafka。通过观察发现,log_generator会在redis channel有数据的时候,大量占用系统资源,想想这非常合理,如果它突然收到20w条数据,当然会不停地读,处理,然后进行写操作。但是,当这些数据基本处理完后,log_generator就会进入空闲状态,是典型的忙时忙死,闲时闲死。虽说,一段时间内,系统的总负载是一定的,但是突发的这种大量占用CPU的情况,会导致整个系统不稳定,肯定没有将负载平均消耗到一段时间上来的可靠。所以,我们能不能想办法让redis数据的生产者使用慢速持续生产的方式代替突发生产的方式呢?
其次,我们提到过log_generator是一个运行在4核16G服务器上的程序,采用了多线程模型。那么由于GIL的存在,必然无法充分利用多核资源。Python建议在性能敏感的场景下使用多进程模型代替多线程模型,我们何不尝试一下呢?
说干就干,首先我们重构了redis数据的生产者程序-simulator,让它将一次性发送给redis的数据分批均匀地发送给redis。它的代码类似如下, 其中YourTaskToConsumeCPU()函数指代分批发送数据的任务。源码可以在github上找到https://github.com/jumper2014/Asgard/blob/master/tools/time_split.py
import math
import time
if __name__ == "__main__":
# task list
elements = range(1, 998)
#
section_count = 100
total_time = 10 # second
print("len:", len(elements))
elements_each_section = len(elements) / (section_count * 1.0)
elements_each_section = int(math.ceil(elements_each_section))
time_each_section = total_time / (section_count * 1.0)
print("elements_each_section:", elements_each_section)
print("section_count:", elements_each_section)
print("time_each_section:", time_each_section)
start_time = time.time()
for i in range(section_count):
start_index = i * elements_each_section
n = i + 1
end_index = n * elements_each_section
print(start_index, end_index)
print(elements[start_index: end_index])
#############
# TODO
# YourTaskToConsumeCPU()
#############
# time compensation
now = time.time()
if now - start_time < n * time_each_section:
duration = n * time_each_section - (now - start_time)
print("sleep:", duration)
time.sleep(duration)
end_time = time.time()
print(end_time - start_time)
接着,我们将原来的多线程程序-log_generator改成多进程程序。Python就是好,从多线程改成多进程,简单的不行。 原来代码
import threading
for reviewer in reviewers:
p = threading.Thread(target=reviewer.run, args=())
p.setDaemon(True)
p.start()
现在代码
from multiprocessing import Process
for reviewer in reviewers:
p = Process(target=reviewer.run, args=())
p.daemon = True
p.start()
改完后上机一运行,哇撒,log_generator的峰值神奇地消失了,最高时候的CPU负载,由120%降到了60%,运行十分稳定,轻轻松松就达到了模拟20w节点在线的要求。
20w节点在线搞定了,那我们是如何搞定模拟100w节点在线的呢?且听下回分解。