-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
254 lines (208 loc) · 10.2 KB
/
main.py
File metadata and controls
254 lines (208 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import threading
import requests
import time
import random
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
import warnings
import validators
print("************本软件仅用于Web服务器性能测试!*************\n",
"****测试效果与当地OLT、路由器、光猫以及电脑本身性能有关!****")
# ===== 配置参数 =====
def get_valid_urls():
"""循环获取用户输入的URL列表,支持重新输入和退出选项"""
while True:
# 获取用户输入
urls_input = input("请输入测试URL(多个URL用空格分隔,输入'exit'退出): ").strip()
# 退出检查
if urls_input.lower() == 'exit':
print("已退出URL输入")
return None
# 空输入处理
if not urls_input:
print("错误:必须提供至少一个URL!")
continue # 继续循环要求重新输入[1,4](@ref)
# 分割URL
url_list = [url.strip() for url in urls_input.split()]
valid_urls = []
invalid_found = False
# 验证每个URL
for url in url_list:
if validators.url(url):
valid_urls.append(url)
else:
print(f"无效URL: {url}")
invalid_found = True
# 全部有效则返回
if not invalid_found:
return valid_urls
# 部分无效时提供选项
choice = input("部分URL无效,是否重新输入?(y重新输入/n退出): ").strip().lower()
if choice == 'n':
print("已退出URL输入")
return None
TARGET_URLS = get_valid_urls()
if TARGET_URLS:
print(f"有效的目标URL列表: {TARGET_URLS}")
# 这里继续你的测试程序
else:
print("未提供有效URL,程序终止")
exit()
# 随机化User-Agent请求头
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 OPR/26.0.1656.60",
"Opera/8.0 (Windows NT 5.1; U; en)",
"Mozilla/5.0 (Windows NT 5.1; U; en; rv:1.8.1) Gecko/20061208 Firefox/2.0.0 Opera 9.50",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0",
"Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.133 Safari/534.16",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 SE 2.X MetaSr 1.0",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/4.4.3.4000 Chrome/30.0.1599.101 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 UBrowser/4.0.3214.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X; zh-CN) AppleWebKit/537.51.1 (KHTML, like Gecko) Mobile/17D50 UCBrowser/12.8.2.1268 Mobile AliApp(TUnionSDK/0.1.20.3)"
]
# 获取用户输入
try:
TEST_DURATION = int(input("请输入测试时间(一小时3600秒,半天21600秒,一天43200秒):"))
NUM_THREADS = int(input("请输入并发线程数(建议50-200):"))
TASK_CHUNK_SIZE = int(input("请输入任务块大小(建议100-500):"))
PROGRESS_INTERVAL = float(input("请输入进度报告间隔(建议0.5-2秒):"))
except ValueError:
print("错误:请输入有效的数字!")
exit(1)
# 验证输入有效性
if NUM_THREADS <= 0 or TASK_CHUNK_SIZE <= 0 or TEST_DURATION <= 0 or PROGRESS_INTERVAL <= 0:
print("错误:所有参数必须大于0!")
exit(1)
# 全局状态
completed = threading.Event()
total_requests = 0
total_success = 0
total_errors = 0
start_time = time.perf_counter()
statistics_lock = threading.Lock()
def create_session():
"""创建高效复用的HTTP会话,添加重试策略和自定义超时"""
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=50,
pool_maxsize=200,
pool_block=False,
max_retries=3 # 添加有限重试
)
session.mount('https://', adapter)
session.mount('http://', adapter)
# 设置合理的超时时间
session.timeout = (3.05, 9) # 连接3.05秒,读取9秒
return session
def send_request(session, url):
"""发送单个HTTP请求"""
global total_requests, total_success, total_errors
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
try:
response = session.get(url, headers=headers, verify=False, stream=False)
response.close() # 立即关闭连接释放资源
with statistics_lock:
total_success += 1
return True
except Exception as e:
with statistics_lock:
total_errors += 1
return False
finally:
with statistics_lock:
total_requests += 1
def worker(thread_id):
"""高效线程工作函数"""
session = create_session()
url = TARGET_URLS[thread_id % len(TARGET_URLS)]
while not completed.is_set():
# 使用更细粒度的控制,避免大块任务导致延迟
for _ in range(min(50, TASK_CHUNK_SIZE)):
if completed.is_set():
break
send_request(session, url)
session.close() # 确保关闭会话释放资源
def monitor_progress():
"""实时监控测试进度,添加动态RPS计算"""
last_count = 0
last_time = start_time
while not completed.is_set():
time.sleep(PROGRESS_INTERVAL)
with statistics_lock:
current_count = total_requests
success_rate = (total_success / total_requests * 100) if total_requests > 0 else 0
current_time = time.perf_counter()
elapsed = current_time - start_time
# 计算实时RPS(最近时间段的)
time_diff = current_time - last_time
rps = (current_count - last_count) / time_diff if time_diff > 0 else 0
last_count = current_count
last_time = current_time
# 计算剩余时间
remaining = TEST_DURATION - elapsed
if remaining < 0:
remaining = 0
print(f"\r⏱️ 已运行: {elapsed:.1f}s | 剩余: {remaining:.1f}s | 📤 总请求: {total_requests} "
f"| ✅ 成功: {total_success} | ❌ 错误: {total_errors} | 成功率:({success_rate:.1f}%) "
f"| ⚡ 实时RPS: {rps:.1f}", end="")
def main():
"""主程序入口"""
global start_time
for URLS in TARGET_URLS:
print(f"🚀 开始压力测试: 目标URL", URLS)
print(f"⏱️ 持续时间: {TEST_DURATION}秒 | 🧵 线程数: {NUM_THREADS} | 📦 任务块: {TASK_CHUNK_SIZE}")
start_time = time.perf_counter()
# 启动监控线程
threading.Thread(target=monitor_progress, daemon=True).start()
try:
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
# 提交所有工作线程
futures = [executor.submit(worker, i) for i in range(NUM_THREADS)]
# 等待测试时间结束
time.sleep(TEST_DURATION)
completed.set()
# 等待所有任务完成
for future in futures:
future.result(timeout=10) # 设置超时防止卡住
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断测试!")
completed.set()
except Exception as e:
print(f"\n\n❌ 发生错误: {str(e)}")
completed.set()
# 计算最终结果
total_time = time.perf_counter() - start_time
rps = total_requests / total_time if total_time > 0 else 0
success_rate = (total_success / total_requests * 100) if total_requests > 0 else 0
print(f"\n\n✅ 测试完成!")
print(f"⏱️ 总时长: {total_time:.2f}秒")
print(f"📊 总请求数: {total_requests} | ✅ 成功: {total_success} | ❌ 错误: {total_errors} | 成功率:({success_rate:.1f}%)")
print(f"⚡ 平均RPS: {rps:.2f}次/秒")
print(f"🧵 并发线程: {NUM_THREADS}")
print(f"🔁 平均延迟: {(total_time * 1000 / total_requests) if total_requests > 0 else 0:.2f}ms")
# 带宽估算 (假设平均响应大小50KB)
bandwidth = (total_success * 50 * 1024 * 8) / (total_time * 1000000) # Mbps
print(f"🌐 估计带宽使用: {bandwidth:.2f} Mbps")
if __name__ == "__main__":
# 禁用SSL验证警告
warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
main()