# coding=utf-8
\"\"\"
执行DNS压测
\"\"\"
import time
import dns.resolver
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
# 指定dns服务器
myResolver = dns.resolver.Resolver()
myResolver.nameservers = [\'8.8.8.8\']
# 常数初始化
base_threads = 10
increment_threads = 10
threads_limit = 10
query_domain = \'www.baidu.com\'
expected_result = \'www.wshifen.com.\'
pressure_duration_per_round = 0.01
# 结果集初始化
concurrency = []
dig_result = []
response_time = []
def dig(threads_num):
start = time.time()
result = None
try:
result = myResolver.resolve(query_domain, \'A\')
except dns.exception.DNSException as e:
print(e)
end = time.time()
if result is None:
dig_result.append(False)
print(\"dig fail\\n\")
elif expected_result in str(result.rrset):
dig_result.append(True)
print(\"dig success\\n\")
else:
dig_result.append(False)
print(\"dig fail\\n\")
response_time.append(end - start)
concurrency.append(threads_num)
while True:
if base_threads > threads_limit:
break
with ThreadPoolExecutor(max_workers=base_threads) as thread_pool:
start_time = time.time()
while True:
cur_time = time.time()
if cur_time - start_time > pressure_duration_per_round:
break
thread_pool.submit(dig, base_threads)
thread_pool.shutdown(wait=True)
print(\"Threads:%d\" % base_threads)
base_threads += increment_threads
df = pd.DataFrame({
\"concurrency\": concurrency,
\"dig_result\": dig_result,
\"response_time\": response_time,
})
df.to_csv(\'/Users/wuhan/Desktop/python/threads/dns_overload/out/test_result.csv\', index=False)
print(\'Script Finished\')