Files
gangyan/langchain-chat/tests/test_llm_bench_qa.py

163 lines
7.2 KiB
Python
Raw Normal View History

import aiohttp
import asyncio
import time
from tqdm import tqdm
import random
import sys
# 配置
# LLM_MODEL = "Qwen2-72B-Instruct"
# LLM_ENDPOINT = "http://192.168.56.123:8822/v1"
# LLM_MODEL = "deepseek-chat"
# LLM_ENDPOINT = "https://api.deepseek.com/v1"
LLM_MODEL = "qwen-max-2025-01-25"
LLM_ENDPOINT = "https://dashscope.aliyuncs.com/compatible-mode/v1"
TEMPERATURE = 0.7 # 确保每次返回的都不同
MAX_TOKENS = 8192
# 问题列表
questions = [
"为什么鸟儿会唱歌?", "为什么我们有季节?", "为什么星星会闪烁?", "为什么我们会打哈欠?",
"为什么太阳是热的?", "为什么猫会咕噜咕噜叫?", "为什么狗会吠?", "为什么鱼会游泳?",
"为什么我们有指纹?", "为什么我们会打喷嚏?", "为什么我们有眉毛?", "为什么我们有头发?",
"为什么我们有指甲?", "为什么我们有牙齿?", "为什么我们有骨头?", "为什么我们有肌肉?",
"为什么我们有血液?", "为什么我们有心脏?", "为什么我们有肺?", "为什么我们有大脑?",
"为什么我们有皮肤?", "为什么我们有耳朵?", "为什么我们有眼睛?", "为什么我们有鼻子?",
"为什么我们有嘴巴?", "为什么我们有舌头?", "为什么我们有胃?", "为什么我们有肠子?",
"为什么我们有肝脏?", "为什么我们有肾脏?", "为什么我们有膀胱?", "为什么我们有胰腺?",
"为什么我们有脾脏?", "为什么我们有胆囊?", "为什么我们有甲状腺?", "为什么我们有肾上腺?",
"为什么我们有垂体?", "为什么我们有下丘脑?", "为什么我们有胸腺?", "为什么我们有淋巴结?",
"为什么我们有脊髓?", "为什么我们有神经?", "为什么我们有循环系统?", "为什么我们有呼吸系统?",
"为什么我们有消化系统?", "为什么我们有免疫系统?"
]
def log_to_file(file, message):
"""将消息追加写入指定的文件"""
with open(file, 'a', encoding='utf-8') as f:
f.write(message + '\n')
async def fetch(session, url, file=None):
start_time = time.time()
question = random.choice(questions)
json_payload = {
"model": LLM_MODEL,
"messages": [
# {"role": "system", "content": "你的任务是学习和理解用户输入的文段分析其中的实体关系然后根据关系逻辑重新拟定你份合同合同字数要在4000字以上。"},
# {"role": "system", "content": "你的使命是翻译用户输入的文段。注意,一定要翻译完整。"},
{"role": "system", "content": "你的任务是用专业学术语言严谨的科学态度全面的数据支持完整详实地回答user的问题。"},
{"role": "user", "content": question}
],
"temperature": TEMPERATURE,
"max_tokens": MAX_TOKENS,
"stream": False
}
headers = {
"Content-Type": "application/json",
# "Authorization": "Bearer sk-dba93353b0cc447ba55245e4f048c779" # deepseek
"Authorization": "sk-8b498c0de2dc437aab8efa490d4021ba" # qwen
}
try:
async with session.post(url, json=json_payload, headers=headers) as response:
if response.status != 200:
print(f"错误: 收到响应码 {response.status}")
return 0, 0, 0
response_json = await response.json()
end_time = time.time()
request_time = end_time - start_time
completion_tokens = 0
if 'usage' in response_json:
usage = response_json['usage']
completion_tokens = usage.get('completion_tokens', 0)
prompt_tokens = usage.get('prompt_tokens', 0)
else:
print("警告: 响应中缺少 'usage' 字段。")
answer = ""
if 'choices' in response_json and len(response_json['choices']) > 0:
answer = response_json['choices'][0]['message']['content']
# completion_tokens = len(answer) / 1.5 # qwen
# completion_tokens = len(answer) * 0.6 # deepseek
else:
print("警告: 响应中缺少 'choices' 字段或内容为空。")
completion_tokens = 0
# 将输入输出写入文件(保持原有日志格式不变)
if file:
log_to_file(file, f"Q: {question}\nA: {answer}\n")
# 打印输入和输出的token数
# print(f"输入token数: {input_tokens}, 输出token数: {output_tokens}")
return prompt_tokens, completion_tokens, request_time
except Exception as e:
print(f"请求过程中发生异常: {e}")
return 0, 0, 0
async def bound_fetch(sem, session, url, pbar, file=None):
async with sem:
result = await fetch(session, url, file=file)
pbar.update(1)
return result
async def run(load_url, max_concurrent_requests, total_requests, output_file):
sem = asyncio.Semaphore(max_concurrent_requests)
timeout = aiohttp.ClientTimeout(total=6000, connect=6000, sock_read=6000, sock_connect=6000)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
with tqdm(total=total_requests) as pbar:
for _ in range(total_requests):
task = asyncio.create_task(bound_fetch(sem, session, load_url, pbar, file=output_file))
tasks.append(task)
results = await asyncio.gather(*tasks)
# 聚合token数和响应时间
total_input_tokens = sum(result[0] for result in results)
total_output_tokens = sum(result[1] for result in results)
response_times = [result[2] for result in results]
return total_input_tokens, total_output_tokens, response_times
if __name__ == '__main__':
if len(sys.argv) != 3:
print("用法: python llm_test.py <C> <N>")
sys.exit(1)
try:
C = int(sys.argv[1]) # 最大并发数
N = int(sys.argv[2]) # 请求总数
except ValueError:
print("错误: C 和 N 必须是整数。")
sys.exit(1)
url = f'{LLM_ENDPOINT}/chat/completions'
output_file = 'A800_Qwen2.5-72Bint8_bench_.txt'
with open(output_file, 'w', encoding='utf-8') as f:
f.write('') # 清空文件内容
start_time = time.time()
total_input_tokens, total_output_tokens, response_times = asyncio.run(run(url, C, N, output_file))
end_time = time.time()
total_time = end_time - start_time
avg_time_per_request = sum(response_times) / len(response_times) if response_times else 0
tokens_per_second = (total_output_tokens) / total_time if total_time > 0 else 0
final_output = (
"最终表现:\n"
f" 输入token数 : {total_input_tokens:.2f}\n"
f" 输出token数 : {total_output_tokens:.2f}\n"
f" 并发数 : {C}\n"
f" 总请求数 : {N}\n"
f" 总耗时 : {total_time:.2f}\n"
f" 平均耗时 : {avg_time_per_request:.2f}\n"
f" 吞吐(QPS) : {tokens_per_second:.2f} tokens/s"
)
print(final_output)
log_to_file(output_file, final_output) # 将最终的表现也写入文件