搞一波性能
- 各种类型性能测试理论
-
- Ⅰ、并发测试
-
- ⅰ性能并发分类
- ⅱ准备工作
- ⅲ需求分析:评估系统并提取性能指标
- ...待补充
- 使用工具
-
- Ⅰ、Jmeter
-
- ① 添加常用插件
- ② 测试前准备
- ③ 提取返回数据
- ④验证提取公式
- ⑤从文件中读取参数值
- Ⅱ、Python
-
- ①单线程执行
- ②多线程执行
- ③守护线程
- ④阻塞线程
- ⑤并发测试框架
- 数据分析
各种类型性能测试理论
Ⅰ、并发测试
说明: 并发测试不光可以做性能,也可用于其他业务目的
- 功能并发:单业务功能场景的并发、混合业务功能场景的并发,验证数据的正确性
- 稳定性并发:判断测试系统的长期稳定运行的能力
- 异常性并发:模拟系统在较差、异常资源配置下运行,评估被测对象在资源不足的情况下的工作状态
- 性能并发:满足性能指标的前提下,让被测对象承担不同的工作量,以评估北侧对象最大处理能力及是否存在缺陷
ⅰ性能并发分类
- 点层面并发:多用户在同一个时间点,对同一个事件进行操作
- 线层面并发:在一个时间段内,多用户进行不同事件的操作,但同时对服务器产生压力
ⅱ准备工作
- 性能测试环境尽量与生存环境保持一致
- 性能测试基础数据可以根据前些时间的业务数据预估获得
- 在确定某一测试场景后,对测试执行产生的数据要有预判,方便测试数据准备
- 性能指标设定,比如其中常用的三个指标:
- 响应时间(RT) >>> 首页打开速度 < 3s ,订单提交成功 < 5s
- 事物成功率 >>> 订单成功率达到99%以上
- 吞吐量(TPS计算公式:并发数/平均响应时间) >>> :在100个并发用户的高峰期,订单处理能力至少达到900TPS
ⅲ需求分析:评估系统并提取性能指标
- 引导相关的运维人员和需求人员给出具体的需求数据,并对这些数据进行二次分析,得出我们真实的性能需求
- 按系统分类:
- 初次上线系统:参考同行的系统数据,进行用户行为分析和商业数据结构的估算为前提,利用性能估算法推算。得到的负荷和响应时间数据可以被用于验证所计划的模型的能力,并帮助做出决策
- 已经上线系统:通过相关运维人员获取TPS和时间的比列分布图、用户数和时间的分布图、数据库ER关系图、容量数据等,直接精准得出目前的系统的用户行为和业务数据关系,进而而出我们需要的性能需求
…待补充
使用工具
Ⅰ、Jmeter
① 添加常用插件
一、梯度加压插件:Stepping Thread Group
1、访问网网站:https://jmeter-plugins.org/downloads/old/
2、下载插件:
3、下载后需要解压,然后将JMeterPlugins-Standard.jar包放在jmeter安装目录的jmeter-x.x\lib\ext路径下,重新启动jemter即可。**
使用方法:
1、添加线程组——jp@gc - Stepping Thread Group
2、Stepping Thread Group界面如下:
上图的各项意思:
This group will start 100 threads
:设置线程组启动的线程总数为100个;
First,wait for N seconds
:启动第一个线程之前,需要等待N秒;
Then start N threads
:设置最开始时启动N个线程;
Next,add 10
:每隔30秒
threads every 30seconds
:启动10个线程
using ramp-up 5 seconds
:10个线程在5秒内启动完成
Then hold load for 60 seconds
:启动的线程总数达到最大值之后,再持续运行60秒;
Finally,stop 5 threads every 1 seconds
:每秒停止5个线程;
3、在该Stepping Thread Group线程线下新建http请求等 :
二、响应时间图形+TPS图形插件:
1、访问网网站:https://jmeter-plugins.org/wiki/Start/
2、下载插件:
3、点击任意链接进入搜索页面,事实证明点击这2个链接搜索结果都一样,会下载相同的压缩包jpgc-graphs-basic-2.0
,其中包含了这2个插件:
- jmeter-plugins-cmn-jmeter-0.4.jar >>> 响应时间图形
- jmeter-plugins-graphs-basic-2.0.jar >>> TPS图形
- jmeter-plugins-manager-0.20.jar >>>jmeter插件管理器(版本有点老,上面网站可下载最新的)
三、jmeter插件管理器:
1、下载:
2、使用:
② 测试前准备
- HTTP请求默认值:设置默认协议、服务器地址、端口号(http-80、https-443)
- HTTP Cookie管理器:用于存放cookie,使每次请求都带上存放的cookie信息,比如小程序登陆后身份信息
- 用户定义的变量:自定义变量,用于参数化修改多个请求中的值
调用变量公式:${变量名}
,比如下方调用自定义变量mdid
,写入${mdid}
即可
③ 提取返回数据
场景1: 第2个请求需要用到第1个请求返回数据的某个值
解决: 返回数据一般格式是json,在请求中添加后置处理器:json提取器
补充: match no
设置参数意义:
- 0随机
- n取第几个匹配值
- -1匹配所有,后续引用用 变量名_N 取第N个值
方法:
说明:json串 [ ]表示对象组成的数组,{ }表示对象,对象里包含多个属性值,属性值可以是值,或数组,或对象
公式:$表示响应的根对象。取子对象或对象的属性用. 取数组里的对象用[],数组索引从0开始
某返回数据如下:
{“data”:[{“id”:10312,“name”:“限量秒杀”,“grade”:0,“parent”:null},{“id”:10111,“name”:“平价菜场”,“grade”:0,“parent”:null},{
“id”:10101,“name”:“新鲜水果”,“grade”:0,“parent”:null},{“id”:10122,“name”:“肉禽蛋”,“grade”:0,“parent”:null},{“id”:10205,“name”:“米面粮油”,“grade”:0,“parent”:null},{“id”:10304,“name”:“调味干货”,“grade”:0,“parent”:null},{“id”:10159,“name”:“酒水乳饮”,“grade”:0,“parent”:null},{“id”:10151,“name”:“休闲零食”,“grade”:0,“parent”:null},{“id”:10314,“name”:“方便速食”,“grade”:0,“parent”:null}],“message”:“成功!”,“status”:“OK”}
提取以上返回数据标黄id的值,先取返回数据$
中data
对应的值,由于data对应的值为数组
,我们需要提取的数据为数组中第二个对象data[1]
中id
的值,所以最后公式为$.data[1].id
验证提取结果: 添加调试取样器,运行后在结果树中查看
提取成功如下图,如果提取失败则展示之前在json提取器中设置的提示“提取失败”:
调用: 验证提取成功后,使用方法与使用用户自定义变量相同,用上方在json提取器设置的变量名${yno2}
场景2: 随机提取请求中某个键(有多个相同键)对应的值,用作后续调用
方法: 获取数组的所有值 $[*].属性
返回数据为:
{“data”:[{“id”:10312,“name”:“特价秒杀”,“grade”:0,“parent”:null},{“id”:10111,“name”:“平价菜场”,“grade”:0,“parent”:null},{“id”:10101,“name”:“新鲜水果”,“grade”:0,“parent”:null},{“id”:10122,“name”:“肉禽蛋”,“grade”:0,“parent”:null},{“id”:10303,“name”:“海鲜水产”,“grade”:0,“parent”:null},{“id”:10205,“name”:“米面粮油”,“grade”:0,“parent”:null},{“id”:10304,“name”:“调味干货”,“grade”:0,“parent”:null},{“id”:10159,“name”:“酒水乳饮”,“grade”:0,“parent”:null},{“id”:10151,“name”:“休闲零食”,“grade”:0,“parent”:null},{“id”:10314,“name”:“方便速食”,“grade”:0,“parent”:null},{“id”:10451,“name”:“日用百货”,“grade”:0,“parent”:null}],“message”:“成功!”,“status”:“OK”}
随机提取返回数据某分类的id值,公式:$.data[*].id
④验证提取公式
场景: json提取器验证提取公式
⑤从文件中读取参数值
方法1: 函数助手
方法2: 配置原件:CSV 数据文件设置
Ⅱ、Python
Python的内置模块提供了两个线程模块:thread
和 threading
,thread是原生模块,threading是扩展模块,在thread的基础上进行了封装和改进。所以只需要使用threading
这个模块就能完成并发测试。
语法:
变量 = threading.Thread(target=执行函数)
>>> 创建线程
变量.start()
>>> 启动线程,执行线程中的函数
①单线程执行
import threadingdef test():print("666")
t = threading.Thread(target=test) ↑ 通过target参数把test()函数放到线程中
t.start()
单线程执行的结果和单独执行某一个/组函数的结果是一样的,区别只在于用线程的方式执行函数,而线程是可以同时多个一起执行的,函数是不可以同时执行的。
②多线程执行
多线程只需要通过循环创建多个线程,并通过循环启动执行就可以了。
import threadingdef test():print("666")def thd():Threads = [] # 自定义一个空的数组for i in range(10):t = threading.Thread(target=test) # 通过循环创建线程Threads.append(t) # 通过循环把每一次创建的线程t装到Threads数组中for t in Threads:t.start() # 对10个线程进行循环启动if __name__ == '__main__':thd() # 执行thd()函数进行多线程并发
这样就通过10个线程执行了10次test()函数,但Python的并发并非绝对意义上的同时处理,因为启动线程是通过循环启动,还是有先后次序,存在细微的时间差异,可以小的忽略不计,当然如果线程较多就会扩大这种差异,所以实际使用中不推荐建立太多线程来执行并发。
优化:
需要并发执行500次test()函数,启动500个线程会很慢,也非常消耗资源,那就可以把500个并发拆成25个线程,每个线程再循环20次执行test()函数,这样在启动下一个线程的时候,上一个线程已经在循环执行了,以此类推当启动第25个线程时,可能已经执行了200次test()函数,这样就可以大大减少并发的时间差异
import threadingdef test():print("666")def looptest():for i in range(20):test()def thd():Threads = []for i in range(25):t = threading.Thread(target=looptest)Threads.append(t)for t in Threads:t.start()if __name__ == '__main__':thd()
③守护线程
说明:
之前创建的线程只是main()线程的子线程,即先启动主线程main(),然后执行thd()启动子线程。
守护线程:
多线程的一个重要概念,即当主线程执行完毕之后,所有的子线程也被关闭(无论子线程是否执行完成)。默认不设置的情况下是没有守护线程的,主线程执行完毕之后,会等待子线程全部执行完毕,才会关闭结束程序,没有守护线程的弊端是当子线程死循环或者一直处于等待中,则程序将不会被关闭。所以守护线程的意义在于处理主线程和子线程的关闭工作
设置守护线程:
守护线程通过子线程的setDaemon()
方法实现,默认情况下不设置,等同于setDaemon(Flase)
,如果需要设置守护线程则将其改成setDaemon(True)
import threadingdef test():x = 0while x == 0:print("666")def thd():Threads = []for i in range(10):t = threading.Thread(target=test)Threads.append(t)t.setDaemon(True) # 在每一个线程中加上守护线程,必须加在start()之前才有效果for t in Threads:t.start()if __name__ == '__main__':thd()print("end")
结果如下: 主线程执行完毕,子线程无线循环被终止,打印“end”
④阻塞线程
阻塞线程:
守护线程太霸道,可以通过子线程join()
方法阻塞线程,让主线程等待子线程完成之后再往下执行,等主线程执行完毕后再关闭所有子线程
import threadingdef test():x = 0while x == 0:print("666")def thd():threads = []for i in range(10):t = threading.Thread(target=test)threads.append(t)t.setDaemon(True) # 在每一个线程中加上守护线程,必须加在start()之前才有效果for t in threads:t.start()for t in thread: # 在每一个线程中加上阻塞线程,必须加在start()之后才有效果t.join()if __name__ == '__main__':thd()
结果如下: 主线程会因为等待子线程结束而不会往下执行,主线程无法执行完成,自然无法关闭子线程,所以是不会打印“end”
So: 对于死循环/一直等待的情况,也可以通过join()的timeout参数
来控制:修改上方代码t.join()
为t.join(2)
,这样就可以完美解决相互等待的情况,子线程告诉主线程让其等待2秒,2秒之内子线程完成,主线程就继续往下执行,2秒之后如果子线程还未完成,主线程也会继续往下执行,执行完成后关闭子线程。
运行代修改之后码: 运行20秒,而非2秒,这是超时机制决定的,完成第一个线程的超时之后才会开始计算第二个线程的超时,所以执行了10个线程超时时间就是20秒。一般来说不推荐使用timeout参数,最好的方式还是在函数内加上超时判断,如果超过设置时间就直接退出函数,这样也等于结束了子线程,那主线程会随着子线程的结束而继续执行了。阻塞线程的意义在于控制子线程与主线程的执行顺序。
⑤并发测试框架
综上所诉可编写一套多线程的框架用作并发测试,这样只需要套用不同的测试函数,修改不同的参数作为并发数即可,而不再需要改多线程并发的代码
import threadingTHREAD_NUM = 1 # 线程数量
ONE_WORKER_NUM = 1 # 每个线程循环次数
''' 这2个变量的乘积就是最终的并发数,如果只需要用线程来并发,把ONE_WORKER_NUM设置为1即可 '''def test():pass # 测试代码def working():global ONE_WORKER_NUMfor i in range(0, ONE_WORKER_NUM):test()def th():global THREAD_NUMthreads = []for i in range(THREAD_NUM):t = threading.Thread(target=working, name="T"+str(i))t.setDaemon(True)threads.append(t)for t in threads:t.start()for t in threads:t.join()if __name__ == '__main__':th()
优化以上代码,加上数据统计:
import threading
import requests
import json
import time
import datetimeTHREAD_NUM = 1 # 线程数量
ONE_WORKER_NUM = 1 # 每个线程循环次数
sum_time = 0.00 # 总响应时间
success_count = 0 # 总成功次数
c = {
"JSESSIONID": "U1220055d66e46-5826-5cb8-a48c-aee07630420b"}def test():global sum_timeglobal success_countglobal ct1 = time.time()url = "https://haohuo.jdsq360.com/api/wxapp/store/getBanner"r = requests.get(url=url, cookies=c)j = json.loads(r.text)assert j["message"] == "成功", "断言失败" # 断言失败时打印"断言失败"t2 = time.time()print("获取banner成功")r_time = t2 - t1print("成功响应时间:" + str(r_time))sum_time = sum_time + r_timesuccess_count = success_count + 1txt = open("./测试结果.txt", "a")txt.write("成功响应时间:" + str(r_time) + "\n")txt.close()def working():global ONE_WORKER_NUMfor i in range(0, ONE_WORKER_NUM):test()def th():global THREAD_NUMthreads = []for i in range(THREAD_NUM):t = threading.Thread(target=working, name="T"+str(i))t.setDaemon(True)threads.append(t)for t in threads:t.start()for t in threads:t.join(60)if __name__ == '__main__':txt = open("./测试结果.txt", "a")txt.write("\n" + "\n" + "测试开始时间:" + str(datetime.datetime.now()) + "\n")txt.write("----------------------------------------" + "\n")txt.close()th()txt = open("./测试结果.txt", "a")txt.write("----------------------------------------" + "\n")txt.write("总并发数量:" + str(THREAD_NUM * ONE_WORKER_NUM) + "\n")txt.write("并发成功数量:" + str(success_count) + "\n")txt.write("成功率:" + str((success_count/(THREAD_NUM * ONE_WORKER_NUM)) * 100) + "%" + "\n")txt.write("成功响应总时长:" + str(sum_time) + "\n")txt.write("成功响应平均时长:" + str(sum_time/success_count) + "\n")txt.write("TPS:" + str(success_count/(sum_time/success_count)) + "\n")txt.close()
数据分析