发布时间:2023-08-16 12:00
夏季学期的专业实践被分配到了如标题所示的这个任务。在这里把完成这个任务的整个流程记录一下。
首先拿到了一个有一堆ipv4段的文件(提示无法安全下载可以复制到浏览器):
总共有300多万个这样的IP段。最终的任务就是:给定一个IP地址,输出这个IP地址的信息(如国家、省份、城市、运营商)。
IP段总共有300多万个,为了保证效率我们只在每个IP段里取一个IP地址爬取其信息。先随便找到一个批量查询的网站(单个查询的实在是太慢了),我拿的是这个。首先分析一下这个网站爬取的格式:
可以看到这个结果在一个表格里:
HTTP请求方式是post,表单格式如下(F12-网络-在网页中再次提交一下表单):
下面我们开始写爬虫部分的代码。如果python没有装requests库和BeautifulSoup需要装一下:
pip install bs4
pip install requests
函数原型:
import requests
from bs4 import BeautifulSoup
def IPBatchQuery(ipList):
pass
其中ipList为ipv4字符串的列表,至多200个。
首先用requests库来获得查询结果:
data = {'txt_ip': '\n'.join(ipList)}
response = requests.post('http://www.jsons.cn/ipbatch/', data = data)
然后利用BeautifulSoup库对html文本进行处理,我们发现网页里只有这一个表格:
因此我们直接查找tr的位置。
for tr in soup.find_all('tr'):
tds = tr.find_all('td')
info = []
# 对于表格的每列
for td in tds:
s = td.string # td.string获取表格内容
if not s == None:
info.append(s.strip()) # 以防万一去除前后的空格
if len(info) > 0: # 为0的是第一行(第一行是,没有信息被加入到info中)
lst.append(tuple(info))
return lst
整个函数如下:
def IPBatchQuery(ipList):
lst = []
if(len(ipList) > 200):
print('不能查询超过200个!')
return lst
data = {'txt_ip': '\n'.join(ipList)}
response = requests.post('http://www.jsons.cn/ipbatch/', data = data)
soup = BeautifulSoup(response.text, 'html.parser')
for tr in soup.find_all('tr'):
tds = tr.find_all('td')
info = []
for td in tds:
s = td.string
if not s == None:
info.append(s.strip())
if len(info) > 0: # 为0的是第一行
lst.append(tuple(info))
return lst
检验一下结果:
if __name__ == '__main__':
print(IPBatchQuery(['1.1.1.1', '1.3.4.5']))
应该没什么问题。当时在做的时候打算先把这些IP段的查询结果写入文本文件中,再写到数据库,毕竟之前没学过用python操作数据库,想把这两部分分离开。我把查询部分放到了服务器上,代码做了微调。此外在查询时(1)每5秒查询200条(防止查询太快把自己的IP封了,可持续发展)(2)每查询一次输出一下(3)需要先在目录下新建一个ipv4_data.txt
,为了观察结果。服务器上的代码如下:
import time
import requests
from bs4 import BeautifulSoup
def IPBatchQuery(ipList):
lst = []
if(len(ipList) > 200):
print('不能查询超过200个!')
return
data = {'txt_ip': '\n'.join(ipList)}
response = requests.post('http://www.jsons.cn/ipbatch/', data = data)
f = open('txt.txt', 'w', encoding='utf-8')
soup = BeautifulSoup(response.text, 'html.parser')
for tr in soup.find_all('tr'):
tds = tr.find_all('td')
info = []
for td in tds:
s = td.string
if not s == None:
# 这里也调整了一下,为了防止有空的结果,用'XX'占一下位
if s.strip() == '':
info.append('XX')
else:
info.append(s.strip())
if len(info) > 0: # 为0的是第一行
lst.append(tuple(info))
return lst
ipv4_source = open('ipv4_source.txt', 'r')
# 先在目录下新建一个ipv4_data.txt,感觉w+和a+都不太方便
ipv4_data = open('ipv4_data.txt', 'r+', encoding='utf-8')
# 用来记录IP段源文件里的行数,忽略已经爬过的IP段
cnt = 0
# 已经存在的条数
lines = 0
# 这次爬的条数
output = 0
# IP列表,就是把IP段后面的掩码去掉的列表,如['1.1.0.0']
lst = []
# IP段的列表,如['1.1.0.0/16']
raw_lst = []
flag = False
# 获取一下之前爬的条数,从这里开始爬(防止爬的时候出现异常),顺便把文件指针指向最后
for line in ipv4_data:
if len(line.strip()) > 0:
lines += 1
# 打印一下有多少条
print('已经爬了' + str(lines) + '条')
for line in ipv4_source:
# 跳过已经查询过的IP段
if cnt < lines:
cnt += 1
continue
# 打印一下从哪个IP段开始爬
if flag == False:
flag = True
print('从' + line.strip() + '开始')
slash_pos = line.find('/')
ip = line[0:slash_pos]
lst.append(ip)
raw_lst.append(line)
output += 1
try:
# 攒够200条爬一次
if output%200 == 0:
result = IPBatchQuery(lst)
for i in range(200):
ipv4_data.write(raw_lst[i].strip() + ' ' + ' '.join(result[i]) + '\n')
lst = []
raw_lst = []
print('成功写入')
time.sleep(5)
except:
print(str(cnt) + '行处发生异常')
# 最后可能会不足200条,特殊处理一下
try:
if output > 0:
result = IPBatchQuery(lst)
for i in range(len(lst)):
ipv4_data.write(raw_lst[i].strip() + ' ' + ' '.join(result[i]) + '\n')
lst = []
except:
print('最后200条内发生了异常')
把数据写入数据库
现在我们把文本文件写入MySQL数据库中。我首先写了一个操作数据库的工具类DBUtils
备用(需要安装MySQL环境和python的MySQL库):
pip install mysql-connector
下面这个文件在根目录下的./Utils/DBUtils
下:
import mysql.connector
class DBUtils:
# 连接数据库
def __init__(self, host, usr, pwd):
self.host = host
self.usr = usr
self.pwd = pwd
self.db = mysql.connector.connect(host = host, user = usr, passwd = pwd, buffered = True)
# 检查数据库是否存在
def checkDB(self, name):
cursor = self.db.cursor()
cursor.execute('show databases')
for x in cursor:
if x[0] == name:
return True
return False
# 创建数据库
def createDB(self, name):
if(self.checkDB(name)):
print('数据库已存在')
return
self.execute('CREATE DATABASE ' + name)
print('已创建数据库' + name)
# 执行MySQL查询
def execute(self, sql):
cursor = self.db.cursor()
cursor.execute(sql)
return cursor
# 连接数据库
def connectDB(self, name):
self.db = mysql.connector.connect(host = self.host, user = self.usr, passwd = self.pwd, database = name, buffered = True)
print('已连接数据库' + name)
# 检查表是否存在
def checkTable(self, name):
cursor = self.db.cursor()
cursor.execute('show tables')
for table in cursor:
if table[0] == name:
return True
return False
# 提交修改
def commit(self):
self.db.commit()
然后我们把爬到的文件写入数据库:
创建数据库
from Utils.DBUtils import *
# 这里是自己数据库的账号和密码
db = DBUtils('localhost', 'username', 'password')
db.createDB('ip_info')
创建表
if not db.checkTable('ipv4'):
db.execute('''CREATE TABLE ipv4 (id int auto_increment primary key,
ipv4 varchar(20),
queried_country varchar(20), queried_province varchar(40), queried_city varchar(40),
queried_operator varchar(60))''')
创建的表有6列,分别是id、ipv4段、查询到的国家、省份、城市、运营商。
将数据写入数据库
cnt = 0
# 打开文件
f = open('ipv4_data.txt', 'r', encoding='utf-8')
for line in f:
# 用空格分隔
data = line.split(' ')
try:
db.execute(
'''INSERT INTO ipv4 (ipv4, queried_country, queried_province, queried_city, queried_operator)
VALUES (\'%s\', \'%s\', \'%s\', \'%s\', \'%s\')'''
% tuple(map(lambda x: x if x != 'XX' else '', data)))
except:
pass
cnt += 1
if cnt % 10000 == 0:
db.commit()
print('已写入%d条数据'%(cnt))
break
# 把最后的不足10000条部分也写入
db.commit()
tuple(map(lambda x: x if x != 'XX' else '', data))
这一句是把查询结果中的XX
变为空字符串,因为我们不希望数据库里出现XX
这种信息。每10000条commit一次,如果不commit执行结果不会反应在数据库中。程序运行完应该是这样的:
给定IP查询其IP段
现在还差一步任务就能完成了:给定一个IP,我们需要知道它属于哪个IP段。根据最长匹配原则,我们必须遍历整个数据库,来找到一个IP所属的最小的子网。在这里我采用01字典树(01Trie)来实现这个匹配的过程,思想如下:对于每个IP段,如224.0.0.0/3
,把它的公共部分(对于224.0.0.0/3
即前3位,实际上不可能有这么大的IP段,举这个例子是方便说明)插入01Trie中。插入了两个IP段224.0.0.0/3
和128.0.0.0/2
的01Trie如下图(双边的圆圈表示终止节点,它代表了一个IP段):
假如我们拿到了一个IP地址255.255.1.2
,想要知道它属于哪个IP段。于是我们拿着这个IP从01Trie的根节点出发,顺着该IP的二进制往下走。由于255.255.1.2
的前3位都是1,三步后就会走到224.0.0.0/3
这个节点,它是一个终止节点,因此我们记录下来这个结果。然后我们发现不能继续往下走了,就直接返回224.0.0.0/3
作为查询结果。当然,如果这棵树更大一点,我们在更深的地方可能会遇到更小的IP段,就要将更小的IP段作为查询结果了。(如,224.0.0.0/3
下面还有一个255.255.0.0/16
,就要返回255.255.0.0/16
)
数据结构的实现
python这种语言实在不是太适合写数据结构,我就直接用C++来实现了。下面是数据结构的成员变量:
class IPTree {
private:
// 把unsigned int(32位)作为存储IP的数据类型。如果需要ipv6可以用__uint128_t之类的数据类型
typedef unsigned ipsize_t;
struct node {
bool final; // 标记该节点是否为终止节点(是否为一个IP段)
node* left;
node* right;
node() {
final = false;
this->left = nullptr;
this->right = nullptr;
}
node(bool final) {
this->final = final;
this->left = left;
this->right = right;
}
};
int IP_LENGTH;// IP的长度,ipv4为32,ipv6为128.这里我们只实现ipv4
node* root;// 根节点
std::ofstream output;// 用于保存结果的输出流,一会再说明
public:
IPTree() {
this->IP_LENGTH = 32;// 默认为ipv4,ipv6的道理也是一样的,不过会麻烦一些
root = new node();
}
};
下面再编写我们两个主要的方法insert
(用于插入IP段)和query
(用于查询一个ipv4所在的IP段)。
insert
void insert(std::string IP_segment) {
// 找到'/'的下标
int slash_pos = IP_segment.find('/');
//去掉后面的掩码位,如"/24"
std::string IP = IP_segment.substr(0, slash_pos);
//取得掩码的位数,如24
int mask = atoi(IP_segment.substr(slash_pos + 1).c_str());
//把该IP段转化为一个位向量,用于01Trie的插入
ipsize_t bits = ipv4_to_int(IP);
node* now = root;
for (int i = 1; i <= mask; i++) {
char bit = (bits >> (IP_LENGTH - i)) & 1;// 取出该位二进制
if (!bit) {//向左
if (now->left == nullptr) now->left = new node();
now = now->left;
}
else {//向右
if (now->right == nullptr) now->right = new node();
now = now->right;
}
}
//最后标记为终止节点
now->final = true;
}
中间用到了一个辅助函数ipv4_to_int
,用于把ipv4地址(string类型)转化为位向量(其实就是unsigned类型),代码如下:
//例:给定ipv4="255.128.0.1",
//返回的unsigned数据类型二进制表示为:
//1111 1111 1000 0000 0000 0000 0000 0001
ipsize_t ipv4_to_int(std::string ipv4) {
ipsize_t ret = 0;
char *p = new char[20], *ip = new char[20];
strcpy(ip, ipv4.c_str());
//获取每一节(类似于Java String.split)
p = strtok(ip, ".");
int i = 3;
while (p) {
ret |= atoi(p) << (8 * i--);
p = strtok(NULL, ".");
}
return ret;
}
query
这个函数给定一个string类型的ipv4地址,返回其ip段。
std::string query(std::string IP) {
ipsize_t bits = ipv4_to_int(IP);
int i = 0, depth = 0, mask = 0;
node* now = root;
while (now != nullptr) {
char bit = (bits >> (IP_LENGTH - (++i))) & 1;// 取出该位二进制
//当前位为1向右走,否则向左走
now = bit ? now->right : now->left;
// 如果当前节点是一个终止节点,就记录一下最长的掩码位数(如24)
if (now && now->final) {
mask = i;
}
}
//没有查询到
if (!mask) return "";
bits &= (ipsize_t)(~0) << (IP_LENGTH - mask);// 把右边清成0。这里用到了一些简单的位运算性质
//再把bits转换成ipv4/ipv6格式
return int_to_ipv4(bits) + "/" + std::to_string(mask);
}
最后也有一个辅助函数int_to_ipv4
,是把一个位向量转化为ipv4的点分十进制字符串格式。
std::string int_to_ipv4(ipsize_t ipv4) {
std::string ret;
for (int i = 3; i >= 0; i--) {
unsigned char val = (ipv4 >> (i * 8)) & 0xFF;
ret += std::to_string(val) + ((i == 0) ? "" : ".");
}
return ret;
}
对数据进行保存、读取
我们再给这个IPTree数据结构增加两个用于读写的方法,来实现对IP段相关数据的存档。首先我们考虑保存问题:我们要做的就是把一棵01Trie一一映射到一个字符串,把这个字符串保存到文本文件里,以供下次使用。我们很自然地想到利用先序遍历,来生成一个唯一的字符串:
private:
//先序遍历
void dfs_write(node* now) {
if (now == nullptr) {
output << '#';
return;
}
else {
output << (int)now->final;
}
dfs_write(now->left);
dfs_write(now->right);
}
public:
void write(std::string file) {
output.open(file);
dfs_write(root);
}
其中output是一个输出流,在上面的成员变量部分提到过。这种先序遍历的方式能保证字符串和二叉树的一一对应,比如下图:
这棵二叉树对应的字符串就是"0#01##0#1##"
。我们可以再按照相同的规则把这个字符串转化为二叉树:
private:
node* dfs_read(FILE* file) {
char ch = fgetc(file);
node* now = nullptr;
if (ch == '0') {
now = new node(false);
}
else if (ch == '1') {
now = new node(true);
}
else return now; // 是#
now->left = dfs_read(file);
now->right = dfs_read(file);
return now;
}
public:
void read(std::string file) {
FILE* f = fopen(file.c_str(), "r");
root = dfs_read(f);
}
整个IPTree.h
如下:
#pragma once
#include
#include
#include
#include
class IPTree {
private:
typedef unsigned long ipsize_t;
struct node {
bool final; // 标记该节点是否为终止节点(是否为一个IP段)
node* left;
node* right;
node() {
final = false;
this->left = nullptr;
this->right = nullptr;
}
node(bool final) {
this->final = final;
this->left = left;
this->right = right;
}
};
int IP_LENGTH;// 32(ipv4)或128(ipv6)
node* root;// 根节点
std::ofstream output;
ipsize_t ipv4_to_int(std::string ipv4) {
ipsize_t ret = 0;
char* p = new char[20], * ip = new char[20];
strcpy(ip, ipv4.c_str());
//获取每一节(Java String.split)
p = strtok(ip, ".");
int i = 3;
while (p) {
ret |= atoi(p) << (8 * i--);
p = strtok(NULL, ".");
}
return ret;
}
std::string int_to_ipv4(ipsize_t ipv4) {
std::string ret;
for (int i = 3; i >= 0; i--) {
unsigned char val = (ipv4 >> (i * 8)) & 0xFF;
ret += std::to_string(val) + ((i == 0) ? "" : ".");
}
return ret;
}
node* dfs_read(FILE* file) {
char ch;
ch = fgetc(file);
node* now = nullptr;
if (ch == '0') {
now = new node(false);
}
else if (ch == '1') {
now = new node(true);
}
else return now; // 是#
now->left = dfs_read(file);
now->right = dfs_read(file);
return now;
}
//先序遍历
void dfs_write(node* now) {
if (now == nullptr) {
output << '#';
return;
}
else {
output << (int)now->final;
}
dfs_write(now->left);
dfs_write(now->right);
}
public:
//默认为IPV4
IPTree() {
this->IP_LENGTH = 32;
root = new node();
}
IPTree(int IP_LENGTH) {
this->IP_LENGTH = IP_LENGTH;
root = new node();
}
// 从文件中初始化IP树(先序)
void read(std::string file) {
FILE* f = fopen(file.c_str(), "r");
root = dfs_read(f);
}
//把IP树保存在文件中(先序)
void write(std::string file) {
output.open(file);
dfs_write(root);
}
//插入一个IP段,如1.0.0.0/24
void insert(std::string IP_segment) {
int slash_pos = IP_segment.find('/');
std::string IP = IP_segment.substr(0, slash_pos);
int mask = atoi(IP_segment.substr(slash_pos + 1).c_str());
ipsize_t bits = ipv4_to_int(IP);
node* now = root;
for (int i = 1; i <= mask; i++) {
char bit = (bits >> (IP_LENGTH - i)) & 1;// 取出该位二进制
if (!bit) {//向左
if (now->left == nullptr) now->left = new node();
now = now->left;
}
else {//向右
if (now->right == nullptr) now->right = new node();
now = now->right;
}
}
now->final = true;
}
//查询一个具体IP的IP段,若未查询到返回空串(ipv6返回大写的串)
std::string query(std::string IP) {
ipsize_t bits = ipv4_to_int(IP);
int i = 0, depth = 0, mask = 0;
node* now = root;
while (now != nullptr) {
char bit = (bits >> (IP_LENGTH - (++i))) & 1;// 取出该位二进制
now = bit ? now->right : now->left;
if (now && now->final) {
mask = i;
}
}
//没有查询到
if (!mask) return "";
bits &= (ipsize_t)(~0) << (IP_LENGTH - mask);// 把右边清成0
//再把bits转换成ipv4/ipv6格式
return int_to_ipv4(bits) + "/" + std::to_string(mask);
}
};
之后,我们就可以通过如下的调用来初始化01Trie:
int main() {
IPTree ipt;
std::string ips;
std::ifstream input;
//ipv4_source是最开始给定的那个文件
input.open("ipv4_source.txt");
while(input >> ips) {
ipt.insert(ips);
}
//把数据写入data.txt中
ipt.write("data.txt");
}
data.txt
应该是长这个样子的(这个相比原始的数据大小压缩了77%):
得到data.txt
后,我们就可以通过读取这个文件来初始化了:
int main() {
IPTree ipt;
ipt.read("data.txt");
std::cout << ipt.query("1.2.3.4");
}
综合一下
我们的主要功能部分已经大部分实现了,下面只需要把它们拼装一下留出来一个查询接口就可以了。由于我们的01Trie采用C++实现,我们最终的查询接口最好也是C++的。为此我们还需要配置一下C++的MySQL环境,可以参见这篇博客(Visual Studio下)。
我们新建一个IPQueryManager
类:
//下面两个为调用mysql必要的头文件(windows下)
#include
#include
#include
#include
//IP树的头文件
#include "iptree.h"
class IPQueryManager {
private:
//mysql相关
MYSQL mysql;
MYSQL_ROW row = NULL;
//IP树
IPTree ipt;
//ip段信息的文件名(带后缀),就是刚才得到的一大堆01#那个文件
const char* ZIPFILE_NAME = "data.txt";
public:
IPQueryManager(const char* host, const char* user, const char* password, const char* database) {
#ifdef _WIN64
SetConsoleOutputCP(65001); // 设置控制台编码utf-8(windows),否则可能会乱码
#endif
mysql_init(&mysql);
//API可自行搜索
mysql_real_connect(&mysql, host, user, password, database, 3306, NULL, 0);
mysql_set_character_set(&mysql, "utf8");
ipt.read(ZIPFILE_NAME);
std::cout << "Load finish!" << std::endl;
}
~IPQueryManager() {
mysql_close(&mysql);
}
};
下面完成最后的接口:
std::string IPQuery(std::string ipv4) {
std::string ret;
std::string q = "select * from ipv4 where ipv4=\'" + ipt.query(ipv4) + "\'";
if (mysql_real_query(&mysql, q.c_str(), (unsigned long)strlen(q.c_str()))) {
std::cout << "Query failure!" << std::endl;
return "";
}
MYSQL_RES* res = mysql_store_result(&mysql);
if (res == NULL) {
std::cout << "MySQL restore failure!" << std::endl;
return "";
}
while (row = mysql_fetch_row(res)) {
for (int i = 2; i <= 5; i++) {
std::string str;
if (row[i] != NULL) {
str = row[i];
ret += str + ((i == 5) ? "" : ",");
}
else str = "";
}
}
return ret;
}
运行一下试试:
int main()
{
// 改成自己的数据库信息
IPQueryManager ipqm("localhost", "username", "password", "ip_info");
std::cout << ipqm.IPQuery_true("1.0.4.0");
}