pythondns模块实现dns压测

发布时间:2024-06-15 16:01

# coding=utf-8
\"\"\"
执行DNS压测
\"\"\"
import time

import dns.resolver
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

# 指定dns服务器
myResolver = dns.resolver.Resolver()
myResolver.nameservers = [\'8.8.8.8\']

# 常数初始化
base_threads = 10
increment_threads = 10
threads_limit = 10
query_domain = \'www.baidu.com\'
expected_result = \'www.wshifen.com.\'
pressure_duration_per_round = 0.01

# 结果集初始化
concurrency = []
dig_result = []
response_time = []


def dig(threads_num):
    start = time.time()
    result = None
    try:
        result = myResolver.resolve(query_domain, \'A\')
    except dns.exception.DNSException as e:
        print(e)
    end = time.time()
    if result is None:
        dig_result.append(False)
        print(\"dig fail\\n\")
    elif expected_result in str(result.rrset):
        dig_result.append(True)
        print(\"dig success\\n\")
    else:
        dig_result.append(False)
        print(\"dig fail\\n\")
    response_time.append(end - start)
    concurrency.append(threads_num)


while True:
    if base_threads > threads_limit:
        break
    with ThreadPoolExecutor(max_workers=base_threads) as thread_pool:
        start_time = time.time()
        while True:
            cur_time = time.time()
            if cur_time - start_time > pressure_duration_per_round:
                break
            thread_pool.submit(dig, base_threads)
    thread_pool.shutdown(wait=True)
    print(\"Threads:%d\" % base_threads)
    base_threads += increment_threads


df = pd.DataFrame({
    \"concurrency\": concurrency,
    \"dig_result\": dig_result,
    \"response_time\": response_time,
})

df.to_csv(\'/Users/wuhan/Desktop/python/threads/dns_overload/out/test_result.csv\', index=False)
print(\'Script Finished\')

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号