|
- from selenium import webdriver
- import threading
- import time
- from os.path import exists
- from os import mkdir
- import hashlib
- from time import sleep
- import bs4
- import re as regular
- import Information_storage
- import requests
- data_base = Information_storage.DataBase_Home()
- class PAGE:
- def __init__(self):
- self.url=''
- self.UA=''
- self.func = 'PAGE'
- def __str__(self):
- return f'{self.func}-{self.url}:UA>{self.UA}'
- class REQUESTS_Base(PAGE):
- def init(self,UA,url,cookies):
- if UA == '':
- UA = f'--user-agent ="Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' \
- f'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36 Edg/80.0.361.66"'
- self.UA = UA
- self.headers = {'Accept': 'text/html, application/xhtml+xml, image/jxr, */*',
- 'Accept - Encoding': 'gzip, deflate',
- 'Accept-Language': 'zh-Hans-CN, zh-Hans; q=0.5',
- 'Connection': 'Keep-Alive',
- 'User-Agent': UA}
- self.requests = lambda *args:None
- self.url = url
- self.cookies = cookies
- self.new = True
- class URL_POST(REQUESTS_Base):#通过requests的post请求
- def __init__(self, url, data,UA='',cookies=None, **kwargs):
- super(URL_POST, self).__init__()
- self.func = 'post'
- self.data = data
- self.requests = requests.post
- self.init(UA,url,cookies)
- def __str__(self):
- return super(URL_POST, self).__str__() + f';data>{self.data}'
- class URL_GET(REQUESTS_Base):#通过requests的post请求
- def __init__(self, url,UA='',cookies=None, **kwargs):
- super(URL_GET, self).__init__()
- self.func = 'simplify_get'
- self.requests = requests.get
- self.init(UA,url,cookies)
- class URL_PAGE(PAGE):
- def __init__(self,url,first_run=False,head=False,no_plugins=True,no_js=False,no_java=False,
- no_img=False,UA='',cookies=None,new=False,down_load_dir='',**kwargs):
- super(URL_PAGE, self).__init__()
- self.url = url
- self.func = 'get'
- self.options = webdriver.ChromeOptions()
- self.cookies = cookies#cookies存储位置
- self.new = new#新键页面or新键浏览器
- self.down_load_dir = down_load_dir
- self.init(first_run,head,no_plugins,no_js,no_java,no_img,UA)
- def init(self,first_run,head,no_plugins,no_js,no_java,no_img,UA):
- self.options.add_argument('disable-infobars')#不显示
- prefs = {'profile.default_content_settings.popups': 0, 'download.default_directory':self.down_load_dir}
- self.options.add_experimental_option('prefs', prefs)#下载设置
- if first_run:
- self.options.add_argument('-first run')
- if head:#无头设置
- print('FFF')
- self.options.add_argument('--headless')
- self.options.add_argument('--disable-gpu')
- if no_plugins:
- self.options.add_argument('--disable-plugins')
- if no_js:
- self.options.add_argument('--disable-javascript')
- if no_java:
- self.options.add_argument('--disable-java')
- if no_img:
- self.options.add_argument('blink-settings=imagesEnabled=false')
- if UA == '':
- UA = (f'user-agent ="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
- f'Chrome/80.0.3987.132 Safari/537.36"')
- # self.options.add_argument(f'--user-agent ="{UA}"')
- self.UA = UA
- def __str__(self):
- return f'{self.func}-{self.url}:UA>{self.UA}'
- class url:#url管理器
- num = 0#url处理器个数
- def __init__(self,dic=f'',dic_run=f''):
- url.num += 1
- self.save_dir = dic
- dic += f'/url[{url.num}].cot_url'
- dic_run += f'/url_run[{url.num}].cot_url'
- self.dir = dic
- self.dir_run = dic_run
- self.file = open(dic,'a')#写入url_history的文件
- self.file_run = open(dic_run,'a')#写入已读url文件
- self.url_list = []#待读url
- self.url_history = []#url历史
- self.filter = {}#过滤函数
- def filter_func(self,url,**kwargs):#url过滤系统
- for i in self.filter:
- if not self.filter[i](url): return False
- return True
- def Add_func(self,func,name):#添加过滤函数
- self.filter[name] = func
- def Del_func(self,index):#删除过滤函数
- del self.filter[list(self.filter.keys())[index]]
- def return_func(self):
- return list(self.filter.keys())
- def add_url(self,url,func,data=None,**kwargs):#添加url
- if func == '':func = 'simplify_get'
- if func == 'get':url_ = url
- else:
- url_ = url + str(data)
- if url_ not in self.url_history and self.filter_func(url,func=func):#1.url不存在历史,2.url满足筛选条件
- if func == 'get':
- self.url_list.append(URL_PAGE(url=url,**kwargs,down_load_dir=self.dir))#添加到待取得url
- elif func == 'simplify_get':
- self.url_list.append(URL_GET(url=url, **kwargs, down_load_dir=self.dir)) # 添加到待取得url
- else:
- self.url_list.append(URL_POST(url=url,data=data,**kwargs)) # 添加到待取得url
- self.url_history.append(url_)#添加到历史url
- self.__out_url(url_)#输出历史url
- return True#写入成功
- return False#写入失败
- def del_url(self,index):#删除url
- self.__out_url_run(f'DELETE {self.url_list[index]}')
- del self.url_list[index]
- def get_url(self) -> (URL_PAGE,URL_POST):#取得url
- url_page = self.url_list[0]
- self.__out_url_run(url_page.url)
- del self.url_list[0]
- return url_page
- def __out_url(self,url):#输出url历史
- self.file.write(f'{url}\n')
- self.file.flush()
- def __out_url_run(self,url):#输出已经运行的url
- self.file_run.write(f'{url}\n')
- self.file_run.flush()
- def return_url(self):
- return self.url_list.copy()
- def return_url_history(self):
- return self.url_history.copy()
- class Page_Downloader:
- num = 0
- def __init__(self,url:url,dic=''):
- self.url = url
- self.dir = dic
- Page_Downloader.num += 1
- self.page_source_dict = {}#页面保存信息
- self.cookie_Thread = None#子进程
- self.browser = None
- self.cookie_dict = {}
- self.cookie_dict_list = {}#sele的cookies
- self.lase_func = ''
- def strat_urlGet(self,*args,func_cookie):#用get请求url ->得到一个页面信息
- self.break_ = False
- self.page_source_dict = {}
- self.nowurl = self.url.get_url()#获取一个url
- url = self.nowurl.url
- if self.nowurl.func == 'get':
- if self.nowurl.new == True and self.lase_func == 'get':#重新启动
- self.browser.quit()
- self.browser = webdriver.Chrome(chrome_options=self.nowurl.options)
- try:
- self.browser.get(url)
- except:
- self.browser = webdriver.Chrome(chrome_options=self.nowurl.options)
- self.browser.get(url)
- try:
- if self.nowurl.new != True:raise Exception
- list_ = self.cookie_dict_list[self.nowurl.cookies]
- self.Tra_cookies()
- try:
- for i in list_:
- self.Add_cookies(i)
- except:pass
- except:
- pass
- self.start_cookies(func_cookie,url)
- else:#requests模式
- try:
- args = {'cookies':self.cookie_dict[self.nowurl.cookies]}
- func_cookie([args['cookies']])
- except:
- args = {}
- func_cookie([])
- if self.nowurl.func == 'post':args['data'] = self.nowurl.data
- self.browser = self.nowurl.requests(url,headers=self.nowurl.headers,**args)
- self.cookie_dict[url] = requests.utils.dict_from_cookiejar(self.browser.cookies)#保存cookies
- func_cookie([self.cookie_dict[url]])
- self.lase_func = self.nowurl.func
- self.Parser.browser = self.browser
- self.Parser.init(url)
- return self.browser
- def start_cookies(self,func_cookie,url):
- self.break_ = True
- def update_cookie():
- nonlocal self
- while self.break_:
- try:
- cookies = self.browser.get_cookies()
- func_cookie(cookies) # 与GUI通信显示cookie
- self.cookie_dict[url] = cookies
- time.sleep(.5)
- except:
- pass
- self.cookie_Thread = threading.Thread(target=update_cookie)
- self.cookie_Thread.start()
- def Del_cookies(self,name):#删除指定cookies
- browser = self.browser
- browser.delete_cookie(name)
- def Tra_cookies(self):#清空cookies
- browser = self.browser
- browser.delete_all_cookies()
- def Add_cookies(self,cookies:dict):#清空cookies
- browser = self.browser
- browser.add_cookie(cookies)
- def update_cookies(self,name,cookies:dict):
- browser = self.browser
- cookies_list = browser.get_cookies()
- for i in cookies_list:
- if i.get('name',None) == name:
- browser.delete_cookie(name)#删除原来cookies
- i.update(cookies)
- browser.add_cookie(i)
- return
- raise Exception
- def set_Page_Parser(self,Parser):
- self.Parser = Parser
- self.Parser.browser = self.browser
- self.Parser.url = self.url
- self.Parser.dir = self.dir
- class Page_Parser:
- def __init__(self,Downloader:Page_Downloader):
- self.Downloader = Downloader
- self.Downloader.set_Page_Parser(self)
- self.func_list = []
- self.func_dict = {}
- self.init()
- def init(self,url=''):
- self.element_dict = {}#记录属性的名字
- self.now_url = url
- def add_base(self,func): # 装饰器
- def wrap(browser=None,num=None,name=None, *args, **kwargs) -> bool:
- try:
- func(browser=browser,num=num, name=name, *args, **kwargs)
- return True
- except:
- return False
- return wrap
- def add_func(self,name,func):
- n = len(self.func_list)
- self.func_list.append(f'{name}[{n}]')
- self.func_dict[f'{name}[{n}]'] = func
- def return_func(self,only=True):
- if only:
- return self.func_list.copy()
- else:
- return [f'var[{index}]@ {i}' for index,i in enumerate(self.func_list.copy())]
- def find_ID(self,id,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,id
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_id(id)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_id(id)
- self.add_func(f'find_ID:{id}',find)#添加func
- def find_class(self,class_name,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,class_name
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_class_name(class_name)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_class_name(class_name)#返回必须是list
- self.add_func(f'find_class:{class_name}',find)#添加func
- def find_name(self,name_,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,name_
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_name(name_)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_name(name_)#返回必须是list
- self.add_func(f'find_name:{name_}',find)#添加func
- def find_xpath(self,xpath,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,xpath
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_xpath(xpath)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_xpath(xpath)#返回必须是list
- self.add_func(f'find_xpath:{xpath}',find)#添加func
- def find_css(self,css_selector,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,css_selector
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_css_selector(css_selector)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_css_selector(css_selector)#返回必须是list
- self.add_func(f'find_css:{css_selector}',find)#添加func
- def find_tag_name(self,tag_name,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,tag_name
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_tag_name(tag_name)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_tag_name(tag_name)#返回必须是list
- self.add_func(f'find_tagName:{tag_name}',find)#添加func\
- def find_link_text(self,link_text,not_all=False,**kwargs):#匹配link
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,link_text
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_link_text(link_text)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_link_text(link_text)#返回必须是list
- self.add_func(f'find_link_text:{link_text}',find)#添加func
- def find_partial_link_text(self,partial_link_text,not_all=False,**kwargs):#模糊匹配
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,partial_link_text
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_partial_link_text(partial_link_text)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_partial_link_text(partial_link_text)]#返回必须是list
- self.add_func(f'find_partial_link_text:{partial_link_text}',find)#添加func
- def find_switch_to_alert(self,*args,**kwargs):#定位弹出框
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self
- if browser == None:browser = self.browser
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.alert()]
- self.add_func(f'find_alert',find)#添加func
- def find_switch_to_active_element(self,*args,**kwargs):#定位焦点元素
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self
- if browser == None:browser = self.browser
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.active_element()]
- self.add_func(f'active_element',find)#添加func
- def find_switch_to_frame(self,reference,is_id=False,*args,**kwargs):#定位Frame
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,reference,is_id
- if browser == None:browser = self.browser
- if reference == None:
- self.element_dict[f'{name}[{num}]'] = [browser.default_content()]# 回到主文档
- elif reference == '':
- self.element_dict[f'{name}[{num}]'] = [browser.parent_frame()]# 回到父文档
- else:
- if is_id:reference = int(reference)
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.frame(str(reference))]# 定位进入文档
- func_name = {None:'主文档','':'父文档'}.get(reference,reference)
- self.add_func(f'find_frame:{func_name}',find)#添加func
- def send_keys(self,text,element_value,index=0,**kwargs):#输入文字
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].send_keys(text)
- self.add_func(f'sent_text:{text}>{element_value}[{index}]', action) # 添加func
- def User_Passwd(self,User,Passwd,element_value,index=0,**kwargs):#输入验证(User&Password)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].authenticate(User,Passwd)
- self.add_func(f'User:Passwd:{User};{Passwd}>{element_value}[{index}]', action) # 添加func
- def clear(self,element_value,index=0,**kwargs):#清空文本
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].clear()
- self.add_func(f'clear_text>{element_value}[{index}]', action) # 添加func
- def click(self,element_value,index=0,**kwargs):#点击按钮
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].click()
- self.add_func(f'click>{element_value}[{index}]', action) # 添加func
- def accept(self,element_value,index=0,**kwargs):#点击确定(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].accept()
- self.add_func(f'accept>{element_value}[{index}]', action) # 添加func
- def dismiss(self,element_value,index=0,**kwargs):#点击取消(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].dismiss()
- self.add_func(f'dismiss>{element_value}[{index}]', action) # 添加func
- def submit(self,element_value,index=0,**kwargs):#提交表单
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].submit()
- self.add_func(f'submit>{element_value}[{index}]', action) # 添加func
- def deselect_by_index(self,element_value,deselect,index=0,**kwargs):#根据index取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_index(int(deselect))
- self.add_func(f'deselect_by_index:{deselect}>{element_value}[{index}]', action) # 添加func
- def deselect_by_text(self,element_value,deselect,index=0,**kwargs):#根据text取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_visible_text(deselect)
- self.add_func(f'deselect_by_text:{deselect}>{element_value}[{index}]', action) # 添加func
- def deselect_by_value(self,element_value,deselect,index=0,**kwargs):#根据value取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_value(deselect)
- self.add_func(f'deselect_by_value:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_index(self,element_value,deselect,index=0,**kwargs):#根据index选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_index(int(deselect))
- self.add_func(f'select_by_index:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_text(self,element_value,deselect,index=0,**kwargs):#根据text选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_visible_text(deselect)
- self.add_func(f'select_by_text:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_value(self,element_value,deselect,index=0,**kwargs):#根据value选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_value(deselect)
- self.add_func(f'select_by_value:{deselect}>{element_value}[{index}]', action) # 添加func
- def back(self,**kwargs):# 返回
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.back()
- self.add_func(f'BACK', action)
- def forward(self,**kwargs):# 前进
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.forward()
- self.add_func(f'FORWARD', action)
- def refresh(self,**kwargs):# 刷新
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.refresh()
- self.add_func(f'REFRESH', action)
- def wait_sleep(self,time:int=2,**kwargs):#暴力等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f'WAIT:{time}s', action)
- def set_wait(self,time:int=2,**kwargs):#隐式等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f'Loading_wait:{time}s', action)
- def run_JS(self,JS,**kwargs):
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- get = self.browser.execute_script(JS)
- if hasattr(get,'__getitem__'):#可切片
- self.element_dict[f'{name}[{num}]'] = get # 返回必须是list
- else:
- self.element_dict[f'{name}[{num}]'] = [get]
- self.add_func(f'run_js:{JS}', action)
- def to_text(self,**kwargs):#获取网页源码
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- try:
- self.element_dict[f'{name}[{num}]'] = [self.browser.page_source,self.now_url]
- except:
- self.element_dict[f'{name}[{num}]'] = [self.browser.text, self.now_url]#request
- self.add_func(f'get_page_source', action)
- def out_html(self,element_value,**kwargs):#输出网页源码
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f'{time.time()}_{self.now_url}'.encode('utf-8'))
- name = md5.hexdigest()
- save_dir = self.dir + '/' + name + '.cotan_source'
- print(save_dir)
- with open(save_dir,'w') as f:
- f.write(self.element_dict[element_value][0])
- with open(save_dir + '.CoTanURL','w') as f:
- f.write(self.element_dict[element_value][1])
- self.add_func(f'write_html<{element_value}', action)
- def del_all_cookies(self,**kwargs):#删除所有曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_all_cookies()
- self.add_func(f'del_all_cookies', action)
- def del_cookies(self,cookies_name,**kwargs):#删除指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_cookie(cookies_name)
- self.add_func(f'del_cookies:{cookies_name}', action)
- def add_cookies(self,cookies,**kwargs):#添加指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.add_cookie(cookies)
- self.add_func(f'add_cookies:{cookies}', action)
- def update_cookies(self,cookies_name,cookies,**kwargs):#更新曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- now_cookies = self.browser.get_cookie(cookies_name)
- self.browser.delete_cookie(cookies_name)
- now_cookies.update(cookies)
- self.browser.add_cookie(now_cookies)
- self.add_func(f'add_cookies:{cookies}', action)
- def get_cookies(self,cookies_name,**kwargs):#获取指定曲奇
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [self.browser.get_cookie(cookies_name)]
- self.add_func(f'get_cookies:{cookies_name}', action)
- def get_all_cookies(self,**kwargs):#获取所有曲奇
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = self.browser.get_cookie()
- self.add_func(f'get_all_cookies', action)
- def make_bs(self, element_value, **kwargs): # 解析成bs4对象
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [bs4.BeautifulSoup(self.element_dict[element_value][0], "html.parser")]
- self.add_func(f'Parsing:{element_value}', action) # 添加func
- def listSlicing(self,index:(slice,int),element_value):
- if type(index) is int:
- return [self.element_dict[element_value][index]]
- else:
- return self.element_dict[element_value][index]
- def to_Database(self,element_value,index,data:(str,list),dataBase_name:str,**kwargs):#传入data Base
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- for bs in iter_list:
- new = []
- for i in data:
- if i == '$name&':new.append(bs.name)
- elif i == '$self&':new.append(str(bs).replace('\n',''))
- elif i == '$string$':new.append(str(bs.string).replace('\n',''))
- else:
- new.append(bs.attrs.get(i,''))
- data_base.add_DataBase(dataBase_name,new)
- self.add_func(f'DataBase:{data}<{element_value}[{index}]>{dataBase_name}', action) # 添加func
- def to_Database_by_re(self,element_value,index,data:str,dataBase_name:str,**kwargs):#通过正则,传入dataBase
- data = regular.compile(data)
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- for bs in iter_list:
- new = regular.findall(data,str(bs))
- data_base.add_DataBase(dataBase_name,new)
- self.add_func(f'DataBase:{data}<{element_value}[{index}]>{dataBase_name}', action) # 添加func
- def findAll(self, element_value,tag:(str,list),attribute:dict,limit,recursive,index:(slice,int),**kwargs):#根据标签定位
- if type(tag) is str:
- tag = str(tag).split(',')
- try:
- limit = int(limit)
- except:
- limit = None
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index,element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(tag,attribute,limit=limit,recursive=recursive)
- except:
- try:
- if str(bs.name) not in tag:raise Exception
- for agrs_name in attribute:
- text = attribute[agrs_name]
- if type(text) is str:
- if bs.attrs[agrs_name] != text:raise Exception
- else:#正则匹配
- if not regular.match(text,bs.attrs[agrs_name]): raise Exception
- re = [bs]
- except:
- re = []
- paser_list += re
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'findAll:{element_value}[{index}]', action) # 添加func
- def findAll_by_text(self, element_value,text:(regular.compile,str),limit,recursive,index:(slice,int),**kwargs):#根据text定位
- try:
- limit = int(limit)
- except:
- limit = None
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index,element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(text=text,limit=limit,recursive=recursive)
- except:
- try:
- if type(text) is str:
- if str(bs.string) != text:raise Exception
- else:
- if not regular.match(text,str(bs.string)):raise Exception
- re = [bs]
- except:
- re = []
- paser_list += re
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'findAll_by_text:{element_value}[{index}]', action) # 添加func
- def __get_other_base(self,element_value,index:(slice,int),who='children',**kwargs):#获得子、后代、兄弟标签的基类
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- if who != 'brothers':
- paser_list += {'children':bs.children,'offspring':bs.descendants,'down':bs.next_siblings,
- 'up':bs.previous_siblings}.get(who,bs.children)
- else:
- paser_list += bs.previous_siblings
- paser_list += bs.next_siblings
- self.element_dict[f'{name}[{num}]'] = list(set(paser_list))
- self.add_func(f'get_{who}:{element_value}[{index}]', action) # 添加func
- def get_children(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index)
- def get_offspring(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'offspring')
- def get_up(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'up')
- def get_down(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'down')
- def get_brothers(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'brothers')
- def get_by_path(self,element_value,index:(slice,int),path,**kwargs):#根据bs4的目录选择
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = eval(str(path),{'self':bs})
- if re == None:raise Exception
- paser_list.append(re)
- except:
- pass
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'get>{path}:{element_value}[{index}]', action) # 添加func
- def Webpage_snapshot(self,**kwargs):
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f'{time.time()}_{self.now_url}'.encode('utf-8'))
- name = md5.hexdigest()
- with open(self.dir + '/' + name + '.png.CoTanURL','w') as f:
- f.write(self.now_url)
- self.browser.save_screenshot(self.dir + '/' + name + '.png')
- sleep(1)
- self.add_func(f'Webpage_snapshot', action) # 添加func
- def add_url(self, element_value, index: (slice, int), url_name,update_func,url_args:dict, **kwargs):# 自动添加url
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- for bs in iter_list:
- try:
- if url_name == '$name&':
- new_url = bs.name
- elif url_name == '$self&':
- new_url = str(bs).replace('\n', '')
- elif url_name == '$string$':
- new_url = str(bs.string).replace('\n', '')
- else:
- new_url = bs.attrs.get(url_name, '')
- url.add_url(new_url, **url_args)
- except:
- pass
- update_func()#更新tkinter
- self.add_func(f'add_URL<{element_value}[{index}]:{url_name}', action) # 添加func
- def to_json(self,**kwargs):
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [self.browser.json()]#request 解析为 json
- self.add_func(f'to_json', action) # 添加func
- def Element_interaction(self,update_func=lambda *args:None):#元素交互
- func_list = self.func_list
- status = None
- def update(func_name):
- nonlocal status,self
- if status:
- success_code = 'Success to run'
- elif status == None:
- success_code = 'No status'
- else:
- success_code = 'Wrong to run'
- value_box = []
- for i in self.element_dict:
- try:
- value_box.append(f'{i}[{len(i)}] = {self.element_dict[i]}')
- except:
- value_box.append(f'{i} = {self.element_dict[i]}')
- update_func(func_name, success_code, value_box) # 信息更新系统
- update('start')
- for func_num in range(len(func_list)):
- func_name = func_list[func_num]
- update(func_name)
- status = self.func_dict[func_name](num=f'{func_num}',name='var')
- update('Finish')
|