123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671 |
- from selenium import webdriver
- import threading
- import time
- from os.path import exists
- from os import mkdir
- import hashlib
- from time import sleep
- import bs4
- import re as regular
- import Information_storage
- data_base = Information_storage.DataBase_Home()
- class URL_PAGE():
- def __init__(self,url,func='get'):
- self.url = url
- self.func = func
- def __str__(self):
- return self.url
- class url:#url管理器
- num = 0#url处理器个数
- def __init__(self,dic=f'',dic_run=f''):
- url.num += 1
- dic += f'/url[{url.num}].cot_url'
- dic_run += f'/url_run[{url.num}].cot_url'
- self.dir = dic
- self.dir_run = dic_run
- self.file = open(dic,'a')#写入url_history的文件
- self.file_run = open(dic_run,'a')#写入已读url文件
- self.url_list = []#待读url
- self.url_history = []#url历史
- self.filter = {}#过滤函数
- def filter_func(self,url):#url过滤系统
- for i in self.filter:
- if not self.filter[i](url): return False
- return True
- def Add_func(self,func,name):#添加过滤函数
- self.filter[name] = func
- def Del_func(self,index):#删除过滤函数
- del self.filter[list(self.filter.keys())[index]]
- def return_func(self):
- return list(self.filter.keys())
- def add_url(self,url):#添加url
- if url not in self.url_history and self.filter_func(url):#1.url不存在历史,2.url满足筛选条件
- self.url_list.append(URL_PAGE(url,'get'))#添加到待取得url
- self.url_history.append(url)#添加到历史url
- self.__out_url(url)#输出历史url
- return True#写入成功
- return False#写入失败
- def del_url(self,index):#删除url
- self.__out_url_run(f'DELETE {self.url_list[index]}')
- del self.url_list[index]
- def get_url(self) -> URL_PAGE:#取得url
- url_page = self.url_list[0]
- self.__out_url_run(url_page.url)
- del self.url_list[0]
- return url_page
- def __out_url(self,url):#输出url历史
- self.file.write(f'{url}\n')
- self.file.flush()
- def __out_url_run(self,url):#输出已经运行的url
- self.file_run.write(f'{url}\n')
- self.file_run.flush()
- def return_url(self):
- return self.url_list.copy()
- def return_url_history(self):
- return self.url_history.copy()
- class Page_Downloader:
- num = 0
- def __init__(self,url:url,dic=''):
- self.url = url
- self.dir = dic
- Page_Downloader.num += 1
- self.page_source_dict = {}#页面保存信息
- self.cookie_Thread = None#子进程
- self.browser = None
- def __seeting(self,*args):#设置参数,请求头
- options = webdriver.ChromeOptions()
- options.add_argument('disable-infobars')# 不显示提示语句
- for i in args:
- if i == '':continue
- options.add_argument(i)
- return options
- def strat_urlGet(self,*args,func_cookie):#用get请求url ->得到一个页面信息
- self.break_ = False
- self.page_source_dict = {}
- self.nowurl = self.url.get_url()#获取一个url
- url = self.nowurl.url
- self.browser = webdriver.Chrome(chrome_options=self.__seeting(*args))
- self.browser.get(url)
- self.break_ = True
- def update_cookie():
- nonlocal self
- while self.break_:
- try:
- func_cookie(self.browser.get_cookies()) # 与GUI通信显示cookie
- time.sleep(.5)
- except:pass
- self.cookie_Thread = threading.Thread(target=update_cookie)
- self.cookie_Thread.start()
- self.Parser.browser = self.browser
- self.Parser.init(url)
- return self.browser
- def Del_cookies(self,name):#删除指定cookies
- browser = self.browser
- browser.delete_cookie(name)
- def Tra_cookies(self):#清空cookies
- browser = self.browser
- browser.delete_all_cookies()
- def Add_cookies(self,cookies:dict):#清空cookies
- browser = self.browser
- browser.add_cookie(cookies)
- def update_cookies(self,name,cookies:dict,):
- browser = self.browser
- cookies_list = browser.get_cookies()
- for i in cookies_list:
- if i.get('name',None) == name:
- browser.delete_cookie(name)#删除原来cookies
- i.update(cookies)
- browser.add_cookie(i)
- return
- raise Exception
- def set_Page_Parser(self,Parser):
- self.Parser = Parser
- self.Parser.browser = self.browser
- self.Parser.url = self.url
- self.Parser.dir = self.dir
- class Page_Parser:
- def __init__(self,Downloader:Page_Downloader):
- self.Downloader = Downloader
- self.Downloader.set_Page_Parser(self)
- self.func_list = []
- self.func_dict = {}
- self.init()
- def init(self,url=''):
- self.element_dict = {}#记录属性的名字
- self.now_url = url
- def add_base(self,func): # 装饰器
- def wrap(browser=None,num=None,name=None, *args, **kwargs) -> bool:
- try:
- func(browser=browser,num=num, name=name, *args, **kwargs)
- return True
- except:
- return False
- return wrap
- def add_func(self,name,func):
- n = len(self.func_list)
- self.func_list.append(f'{name}[{n}]')
- self.func_dict[f'{name}[{n}]'] = func
- def return_func(self,only=True):
- if only:
- return self.func_list.copy()
- else:
- return [f'var[{index}]@ {i}' for index,i in enumerate(self.func_list.copy())]
- def find_ID(self,id,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,id
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_id(id)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_id(id)
- self.add_func(f'find_ID:{id}',find)#添加func
- def find_class(self,class_name,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,class_name
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_class_name(class_name)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_class_name(class_name)#返回必须是list
- self.add_func(f'find_class:{class_name}',find)#添加func
- def find_name(self,name_,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,name_
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_name(name_)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_name(name_)#返回必须是list
- self.add_func(f'find_name:{name_}',find)#添加func
- def find_xpath(self,xpath,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,xpath
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_xpath(xpath)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_xpath(xpath)#返回必须是list
- self.add_func(f'find_xpath:{xpath}',find)#添加func
- def find_css(self,css_selector,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,css_selector
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_css_selector(css_selector)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_css_selector(css_selector)#返回必须是list
- self.add_func(f'find_css:{css_selector}',find)#添加func
- def find_tag_name(self,tag_name,not_all=False,**kwargs):
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,tag_name
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_tag_name(tag_name)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_tag_name(tag_name)#返回必须是list
- self.add_func(f'find_tagName:{tag_name}',find)#添加func\
- def find_link_text(self,link_text,not_all=False,**kwargs):#匹配link
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,link_text
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_link_text(link_text)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = browser.find_elements_by_link_text(link_text)#返回必须是list
- self.add_func(f'find_link_text:{link_text}',find)#添加func
- def find_partial_link_text(self,partial_link_text,not_all=False,**kwargs):#模糊匹配
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,partial_link_text
- if browser == None:browser = self.browser
- if not_all:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_partial_link_text(partial_link_text)]#返回必须是list
- else:self.element_dict[f'{name}[{num}]'] = [browser.find_element_by_partial_link_text(partial_link_text)]#返回必须是list
- self.add_func(f'find_partial_link_text:{partial_link_text}',find)#添加func
- def find_switch_to_alert(self,*args,**kwargs):#定位弹出框
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self
- if browser == None:browser = self.browser
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.alert()]
- self.add_func(f'find_alert',find)#添加func
- def find_switch_to_active_element(self,*args,**kwargs):#定位焦点元素
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self
- if browser == None:browser = self.browser
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.active_element()]
- self.add_func(f'active_element',find)#添加func
- def find_switch_to_frame(self,reference,is_id=False,*args,**kwargs):#定位Frame
- @self.add_base
- def find(browser, num, name, *args, **kwargs):
- nonlocal self,reference,is_id
- if browser == None:browser = self.browser
- if reference == None:
- self.element_dict[f'{name}[{num}]'] = [browser.default_content()]# 回到主文档
- elif reference == '':
- self.element_dict[f'{name}[{num}]'] = [browser.parent_frame()]# 回到父文档
- else:
- if is_id:reference = int(reference)
- self.element_dict[f'{name}[{num}]'] = [browser.switch_to.frame(str(reference))]# 定位进入文档
- func_name = {None:'主文档','':'父文档'}.get(reference,reference)
- self.add_func(f'find_frame:{func_name}',find)#添加func
- def send_keys(self,text,element_value,index=0,**kwargs):#输入文字
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].send_keys(text)
- self.add_func(f'sent_text:{text}>{element_value}[{index}]', action) # 添加func
- def User_Passwd(self,User,Passwd,element_value,index=0,**kwargs):#输入验证(User&Password)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].authenticate(User,Passwd)
- self.add_func(f'User:Passwd:{User};{Passwd}>{element_value}[{index}]', action) # 添加func
- def clear(self,element_value,index=0,**kwargs):#清空文本
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].clear()
- self.add_func(f'clear_text>{element_value}[{index}]', action) # 添加func
- def click(self,element_value,index=0,**kwargs):#点击按钮
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].click()
- self.add_func(f'click>{element_value}[{index}]', action) # 添加func
- def accept(self,element_value,index=0,**kwargs):#点击确定(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].accept()
- self.add_func(f'accept>{element_value}[{index}]', action) # 添加func
- def dismiss(self,element_value,index=0,**kwargs):#点击取消(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].dismiss()
- self.add_func(f'dismiss>{element_value}[{index}]', action) # 添加func
- def submit(self,element_value,index=0,**kwargs):#提交表单
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].submit()
- self.add_func(f'submit>{element_value}[{index}]', action) # 添加func
- def deselect_by_index(self,element_value,deselect,index=0,**kwargs):#根据index取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_index(int(deselect))
- self.add_func(f'deselect_by_index:{deselect}>{element_value}[{index}]', action) # 添加func
- def deselect_by_text(self,element_value,deselect,index=0,**kwargs):#根据text取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_visible_text(deselect)
- self.add_func(f'deselect_by_text:{deselect}>{element_value}[{index}]', action) # 添加func
- def deselect_by_value(self,element_value,deselect,index=0,**kwargs):#根据value取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_value(deselect)
- self.add_func(f'deselect_by_value:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_index(self,element_value,deselect,index=0,**kwargs):#根据index选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_index(int(deselect))
- self.add_func(f'select_by_index:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_text(self,element_value,deselect,index=0,**kwargs):#根据text选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_visible_text(deselect)
- self.add_func(f'select_by_text:{deselect}>{element_value}[{index}]', action) # 添加func
- def select_by_value(self,element_value,deselect,index=0,**kwargs):#根据value选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_value(deselect)
- self.add_func(f'select_by_value:{deselect}>{element_value}[{index}]', action) # 添加func
- def back(self,**kwargs):# 返回
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.back()
- self.add_func(f'BACK', action)
- def forward(self,**kwargs):# 前进
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.forward()
- self.add_func(f'FORWARD', action)
- def refresh(self,**kwargs):# 刷新
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.refresh()
- self.add_func(f'REFRESH', action)
- def wait_sleep(self,time:int=2,**kwargs):#暴力等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f'WAIT:{time}s', action)
- def set_wait(self,time:int=2,**kwargs):#隐式等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f'Loading_wait:{time}s', action)
- def run_JS(self,JS,**kwargs):
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- get = self.browser.execute_script(JS)
- if hasattr(get,'__getitem__'):#可切片
- self.element_dict[f'{name}[{num}]'] = get # 返回必须是list
- else:
- self.element_dict[f'{name}[{num}]'] = [get]
- self.add_func(f'run_js:{JS}', action)
- def to_text(self,**kwargs):#获取网页源码
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [self.browser.page_source,self.now_url]
- self.add_func(f'get_page_source', action)
- def out_html(self,element_value,**kwargs):#输出网页源码
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f'{time.time()}_{self.now_url}'.encode('utf-8'))
- name = md5.hexdigest()
- save_dir = self.dir + '/' + name + '.html'
- print(save_dir)
- with open(save_dir,'w') as f:
- f.write(self.element_dict[element_value][0])
- with open(save_dir + '.CoTanURL','w') as f:
- f.write(self.element_dict[element_value][1])
- self.add_func(f'write_html<{element_value}', action)
- def del_all_cookies(self,**kwargs):#删除所有曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_all_cookies()
- self.add_func(f'del_all_cookies', action)
- def del_cookies(self,cookies_name,**kwargs):#删除指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_cookie(cookies_name)
- self.add_func(f'del_cookies:{cookies_name}', action)
- def add_cookies(self,cookies,**kwargs):#添加指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.add_cookie(cookies)
- self.add_func(f'add_cookies:{cookies}', action)
- def update_cookies(self,cookies_name,cookies,**kwargs):#更新曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- now_cookies = self.browser.get_cookie(cookies_name)
- self.browser.delete_cookie(cookies_name)
- now_cookies.update(cookies)
- self.browser.add_cookie(now_cookies)
- self.add_func(f'add_cookies:{cookies}', action)
- def get_cookies(self,cookies_name,**kwargs):#获取指定曲奇
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [self.browser.get_cookie(cookies_name)]
- self.add_func(f'get_cookies:{cookies_name}', action)
- def get_all_cookies(self,**kwargs):#获取所有曲奇
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = self.browser.get_cookie()
- self.add_func(f'get_all_cookies', action)
- def make_bs(self, element_value, **kwargs): # 解析成bs4对象
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- self.element_dict[f'{name}[{num}]'] = [bs4.BeautifulSoup(self.element_dict[element_value][0], "html.parser")]
- self.add_func(f'Parsing:{element_value}', action) # 添加func
- def listSlicing(self,index:(slice,int),element_value):
- if type(index) is int:
- return [self.element_dict[element_value][index]]
- else:
- return self.element_dict[element_value][index]
- def to_Database(self,element_value,index,data:(str,list),dataBase_name:str,**kwargs):#传入data Base
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- for bs in iter_list:
- new = []
- for i in data:
- if i == '$name&':new.append(bs.name)
- elif i == '$self&':new.append(str(bs).replace('\n',''))
- elif i == '$string$':new.append(str(bs.string).replace('\n',''))
- else:
- new.append(bs.attrs.get(i,''))
- data_base.add_DataBase(dataBase_name,new)
- self.add_func(f'DataBase:{data}<{element_value}[{index}]>{dataBase_name}', action) # 添加func
- def to_Database_by_re(self,element_value,index,data:str,dataBase_name:str,**kwargs):#通过正则,传入dataBase
- data = regular.compile(data)
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- for bs in iter_list:
- new = regular.findall(data,str(bs))
- data_base.add_DataBase(dataBase_name,new)
- self.add_func(f'DataBase:{data}<{element_value}[{index}]>{dataBase_name}', action) # 添加func
- def findAll(self, element_value,tag:(str,list),attribute:dict,limit,recursive,index:(slice,int),**kwargs):#根据标签定位
- if type(tag) is str:
- tag = str(tag).split(',')
- try:
- limit = int(limit)
- except:
- limit = None
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index,element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(tag,attribute,limit=limit,recursive=recursive)
- except:
- try:
- if str(bs.name) not in tag:raise Exception
- for agrs_name in attribute:
- text = attribute[agrs_name]
- if type(text) is str:
- if bs.attrs[agrs_name] != text:raise Exception
- else:#正则匹配
- if not regular.match(text,bs.attrs[agrs_name]): raise Exception
- re = [bs]
- except:
- re = []
- paser_list += re
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'findAll:{element_value}[{index}]', action) # 添加func
- def findAll_by_text(self, element_value,text:(regular.compile,str),limit,recursive,index:(slice,int),**kwargs):#根据text定位
- try:
- limit = int(limit)
- except:
- limit = None
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index,element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(text=text,limit=limit,recursive=recursive)
- except:
- try:
- if type(text) is str:
- if str(bs.string) != text:raise Exception
- else:
- if not regular.match(text,str(bs.string)):raise Exception
- re = [bs]
- except:
- re = []
- paser_list += re
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'findAll_by_text:{element_value}[{index}]', action) # 添加func
- def __get_other_base(self,element_value,index:(slice,int),who='children',**kwargs):#获得子、后代、兄弟标签的基类
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- if who != 'brothers':
- paser_list += {'children':bs.children,'offspring':bs.descendants,'down':bs.next_siblings,
- 'up':bs.previous_siblings}.get(who,bs.children)
- else:
- paser_list += bs.previous_siblings
- paser_list += bs.next_siblings
- self.element_dict[f'{name}[{num}]'] = list(set(paser_list))
- self.add_func(f'get_{who}:{element_value}[{index}]', action) # 添加func
- def get_children(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index)
- def get_offspring(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'offspring')
- def get_up(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'up')
- def get_down(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'down')
- def get_brothers(self,element_value,index:(slice,int),**kwargs):
- return self.__get_other_base(element_value,index,'brothers')
- def get_by_path(self,element_value,index:(slice,int),path,**kwargs):#根据bs4的目录选择
- @self.add_base
- def action(num,name,*args, **kwargs):
- nonlocal self
- iter_list = self.listSlicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = eval(str(path),{'self':bs})
- if re == None:raise Exception
- paser_list.append(re)
- except:
- pass
- self.element_dict[f'{name}[{num}]'] = paser_list
- self.add_func(f'get>{path}:{element_value}[{index}]', action) # 添加func
- def Webpage_snapshot(self,**kwargs):
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f'{time.time()}_{self.now_url}'.encode('utf-8'))
- name = md5.hexdigest()
- with open(self.dir + '/' + name + '.png.CoTanURL','w') as f:
- f.write(self.now_url)
- self.browser.save_screenshot(self.dir + '/' + name + '.png')
- sleep(1)
- self.add_func(f'Webpage_snapshot', action) # 添加func
- def Element_interaction(self,update_func=lambda *args:None):#元素交互
- func_list = self.func_list
- status = None
- def update(func_name):
- nonlocal status,self
- if status:
- success_code = 'Success to run'
- elif status == None:
- success_code = 'No status'
- else:
- success_code = 'Wrong to run'
- value_box = []
- for i in self.element_dict:
- try:
- value_box.append(f'{i}[{len(i)}] = {self.element_dict[i]}')
- except:
- value_box.append(f'{i} = {self.element_dict[i]}')
- update_func(func_name, success_code, value_box) # 信息更新系统
- update('start')
- for func_num in range(len(func_list)):
- func_name = func_list[func_num]
- update(func_name)
- status = self.func_dict[func_name](num=f'{func_num}',name='var')
- update('Finish')
|