1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633 |
- import bs4
- import hashlib
- import os
- import re as regular
- import threading
- import time
- from abc import ABCMeta, abstractmethod
- from time import sleep
- import logging
- from selenium import webdriver
- from selenium.webdriver import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.common.exceptions import InvalidSessionIdException, WebDriverException
- import requests
- from system import plugin_class_loading, get_path, basicConfig
- logging.basicConfig(**basicConfig)
- keys_name_dict = {
- "ctrl": Keys.CONTROL,
- "shift": Keys.SHIFT,
- "tab": Keys.TAB,
- "left_ctrl": Keys.LEFT_CONTROL,
- "left_shift": Keys.LEFT_SHIFT,
- "left_alt": Keys.LEFT_ALT,
- "ALT": Keys.ALT,
- "enter": Keys.ENTER,
- "return": Keys.RETURN,
- "backspace": Keys.BACKSPACE,
- "del": Keys.DELETE,
- "pgup": Keys.PAGE_UP,
- "pgdn": Keys.PAGE_DOWN,
- "home": Keys.HOME,
- "end": Keys.END,
- "esc": Keys.CANCEL,
- "insert": Keys.INSERT,
- "meta": Keys.META,
- "up": Keys.UP,
- "down": Keys.DOWN,
- "right": Keys.RIGHT,
- "left": Keys.LEFT,
- } # 键-值映射
- class PageParserError(Exception):
- pass
- class UrlError(Exception):
- pass
- class CookiesError(Exception):
- pass
- class Database(metaclass=ABCMeta):
- @abstractmethod
- def __str__(self):
- pass
- @abstractmethod
- def close(self):
- pass
- @abstractmethod
- def add_new(self, data):
- pass
- @abstractmethod
- def remove(self):
- pass
- @abstractmethod
- def out_file(self, out_dir):
- pass
- class CoTanDB(Database):
- def __init__(self, name):
- self.dir = rf"{os.getcwd()}{os.sep}Database_dir{os.sep}{name}.cotanDB" # 创建DB文件
- self.file = open(self.dir, "r+" if os.path.exists(self.dir) else "w+")
- self.id = 0
- self.name = name
- for _ in self.file.readlines():
- self.id += 1
- def __str__(self):
- return self.name
- def close(self):
- try:
- self.file.close()
- except IOError:
- pass
- def add_new(self, data):
- data_str = str(self.id)
- for i in data:
- data_str += "," + str(i)
- data_str += "\n"
- self.file.write(data_str)
- self.file.flush()
- self.id += 1
- def remove(self):
- self.file.close()
- os.remove(self.dir)
- def out_file(self, out_dir):
- with open(out_dir + fr"{os.sep}{self.name}.contanDB", "w") as f:
- with open(self.dir) as g:
- f.write(g.read())
- class DatabaseControllerBase:
- def __init__(self):
- self.database = {}
- class AddDatabase(DatabaseControllerBase):
- def add_database(self, name): # 添加数据表
- self.database[name] = CoTanDB(name)
- class DatabaseControllerCustom(metaclass=ABCMeta):
- @abstractmethod
- def close(self, name):
- pass
- @abstractmethod
- def close_all(self):
- pass
- @abstractmethod
- def rm_database(self, name):
- pass
- @abstractmethod
- def out(self, name, save_dir):
- pass
- @abstractmethod
- def return_database(self):
- pass
- class DatabaseController(AddDatabase, DatabaseControllerCustom): # data base控制器
- def add_new(self, name, data): # 添加新内容
- database = self.database.get(name)
- if database is None:
- self.add_database(name)
- database = self.database.get(name)
- database.add_new(data)
- def close(self, name): # 关闭数据表
- try:
- self.database[name].close()
- del self.database[name]
- except IndexError:
- pass
- def close_all(self): # 关闭所有数据表
- for i in self.database:
- self.database[i].close()
- self.database = {}
- def rm_database(self, name): # 删除数据表
- self.database[name].remove()
- del self.database[name]
- def out(self, name, save_dir): # 输出数据表
- self.database[name].out_file(save_dir)
- def return_database(self):
- return list(self.database.keys())
- class LogBase(metaclass=ABCMeta):
- @abstractmethod
- def write(self, data):
- pass
- @abstractmethod
- def close(self):
- pass
- class Log(LogBase):
- def __init__(self, log_dir):
- self.log_dir = log_dir
- self.log_file = open(
- log_dir + f"{os.sep}log.coTanLog",
- "r+" if os.path.exists(log_dir + "log.coTanLog") else "w+",
- )
- def write(self, data):
- self.log_file.write(
- f"[{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))}] "
- + data
- + "\n"
- )
- self.log_file.flush()
- def close(self):
- self.log_file.close()
- class PageBase:
- def __init__(self, time_out):
- self.url = ""
- self.user_agent = ""
- self.mode = "PAGE"
- self.time_out = time_out
- def __str__(self):
- return f"[{self.time_out}s]{self.mode}-{self.url}:UA>{self.user_agent}"
- @abstractmethod
- def init(self, *args, **kwargs):
- pass
- class __RequestsBase(PageBase):
- def init(self, user_agent, url, cookies):
- if user_agent == "":
- user_agent = (
- f'--user-agent ="Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
- f'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36 Edg/80.0.361.66"'
- )
- self.user_agent = user_agent
- self.headers = {
- "Accept": "text/html, application/xhtml+xml, image/jxr, */*",
- "Accept - Encoding": "gzip, deflate",
- "Accept-Language": "zh-Hans-CN, zh-Hans; q=0.5",
- "Connection": "Keep-Alive",
- "User-Agent": user_agent,
- }
- self.url = url
- self.cookies = cookies
- self.new = True
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlGet(__RequestsBase): # 通过requests的post请求
- def __init__(self, url, time_out, user_agent="", cookies=None, **kwargs):
- super(UrlGet, self).__init__(time_out)
- self.mode = "simplify_get"
- self.requests = requests.get
- self.init(user_agent, url, cookies)
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlPost(__RequestsBase): # 通过requests的post请求
- def __init__(self, url, data, time_out, user_agent="", cookies=None, **kwargs):
- super(UrlPost, self).__init__(time_out)
- self.mode = "post"
- self.data = data
- self.requests = requests.post
- self.init(user_agent, url, cookies)
- def __str__(self):
- return super(UrlPost, self).__str__() + f";data>{self.data}"
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlPage(PageBase):
- def __init__(
- self,
- url,
- time_out,
- first_run=False,
- head=False,
- no_plugins=True,
- no_js=False,
- no_java=False,
- no_img=False,
- user_agent="",
- cookies=None,
- new=False,
- down_load_dir="",
- **kwargs,
- ):
- super(UrlPage, self).__init__(time_out)
- self.url = url
- self.mode = "get"
- self.options = webdriver.ChromeOptions()
- self.cookies = cookies # cookies存储位置
- self.new = new # 新键页面or新键浏览器
- self.down_load_dir = down_load_dir
- self.init(first_run, head, no_plugins, no_js, no_java, no_img, user_agent)
- def init(self, first_run, head, no_plugins, no_js, no_java, no_img, user_agent):
- self.options.add_argument("disable-infobars") # 不显示
- prefs = {
- "profile.default_content_settings.popups": 0,
- "download.default_directory": self.down_load_dir,
- }
- self.options.add_experimental_option("prefs", prefs) # 下载设置
- if first_run:
- self.options.add_argument("-first run")
- if head: # 无头设置
- self.options.add_argument("--headless")
- self.options.add_argument("--disable-gpu")
- if no_plugins:
- self.options.add_argument("--disable-plugins")
- if no_js:
- self.options.add_argument("--disable-javascript")
- if no_java:
- self.options.add_argument("--disable-java")
- if no_img:
- self.options.add_argument("blink-settings=imagesEnabled=false")
- if user_agent == "":
- user_agent = (
- f'user-agent ="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
- f'Chrome/80.0.3987.132 Safari/537.36"'
- )
- # self.options.add_argument(f'--user-agent ="{UA}"')
- self.user_agent = user_agent
- def __str__(self):
- return f"{self.mode}-{self.url}:UA>{self.user_agent}"
- class Urlbase(metaclass=ABCMeta):
- url_count = 0 # url处理器个数
- def __init__(self, dic=f"", dic_run=f""):
- Urlbase.url_count += 1
- self.save_dir = dic
- dic += f"{os.sep}url[{Urlbase.url_count}].cot_url"
- dic_run += f"{os.sep}url_run[{Urlbase.url_count}].cot_url"
- self.dir = dic
- self.dir_run = dic_run
- self.file = open(dic, "a") # 写入url_history的文件
- self.file_run = open(dic_run, "a") # 写入已读url文件
- self.url_list = [] # 待读url
- self.url_history = [] # url历史
- self.filter = {} # 过滤函数
- @abstractmethod
- def filter_func(self, url, **kwargs):
- pass
- @abstractmethod
- def add_filter_func(self, func, name):
- pass
- @abstractmethod
- def del_filter_func(self, index):
- pass
- @abstractmethod
- def return_filter_func(self):
- pass
- @abstractmethod
- def close(self):
- pass
- @abstractmethod
- def out_url_history(self, url):
- pass
- @abstractmethod
- def out_url_run(self, url):
- pass
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlFile(Urlbase, metaclass=ABCMeta):
- def close(self):
- self.file.close()
- self.file_run.close()
- def out_url_history(self, url): # 输出url历史
- self.file.write(f"{url}\n")
- self.file.flush()
- def out_url_run(self, url): # 输出已经运行的url
- self.file_run.write(f"{url}\n")
- self.file_run.flush()
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlAdd(Urlbase, metaclass=ABCMeta):
- def filter_func(self, url, **kwargs): # url过滤系统
- for i in self.filter:
- if not self.filter[i](url):
- return False
- return True
- def add_filter_func(self, func, name): # 添加过滤函数
- self.filter[name] = func
- def del_filter_func(self, index): # 删除过滤函数
- del self.filter[list(self.filter.keys())[index]]
- def clean_filter_func(self):
- self.filter = {}
- def return_filter_func(self):
- return list(self.filter.keys())
- @plugin_class_loading(get_path(r'template/crawler'))
- class UrlReturn(Urlbase, metaclass=ABCMeta):
- def del_url(self, index): # 删除url
- self.out_url_run(f"DELETE {self.url_list[index]}")
- del self.url_list[index]
- def get_url(self): # 取得url
- url_page = self.url_list[0]
- self.out_url_run(url_page.url)
- del self.url_list[0]
- return url_page
- def is_finish(self):
- return len(self.url_list) == 0
- def add_url(self, url, func, data=None, must=False, **kwargs): # 添加url
- if func == "":
- func = "get"
- if func == "get":
- url_ = url
- else:
- url_ = url + str(data)
- if must or (url_ not in self.url_history and self.filter_func(url, func=func)): # 1.url不存在历史,2.url满足筛选条件
- if func == "get":
- self.url_list.append(
- UrlPage(url=url, **kwargs, down_load_dir=self.dir)
- ) # 添加到待取得url
- elif func == "simplify_get":
- self.url_list.append(
- UrlGet(url=url, **kwargs, down_load_dir=self.dir)
- ) # 添加到待取得url
- else:
- self.url_list.append(UrlPost(url=url, data=data, **kwargs)) # 添加到待取得url
- self.url_history.append(url_) # 添加到历史url
- self.out_url_history(url_) # 输出历史url
- return True # 写入成功
- return False # 写入失败
- class SeleniumBase(metaclass=ABCMeta):
- @abstractmethod
- def selenium_mode(self, func_cookie, url):
- pass
- class RequestsBase(metaclass=ABCMeta):
- @abstractmethod
- def requests_mode(self, func_cookie, url):
- pass
- class PagedownloaderBase(SeleniumBase, RequestsBase, metaclass=ABCMeta):
- downloader_count = 0
- def __init__(self, url, dic=""):
- self.url = url
- self.dir = dic
- self.log = Log(dic)
- PagedownloaderBase.downloader_count += 1
- self.page_source_dict = {} # 页面保存信息
- self.cookie_Thread = None # 子进程
- self.browser = None
- self.cookie_dict = {}
- self.cookie_dict_list = {} # sele的cookies
- self.last_mode = None
- def set_page_parser(self, parser):
- self.parser = parser
- @abstractmethod
- def monitoring_add_cookies(self, cookies):
- pass
- @abstractmethod
- def monitoring_clear_cookier(self):
- pass
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageDownloaderRun(PagedownloaderBase, metaclass=ABCMeta):
- def close(self):
- self.log.close()
- def stop(self):
- self.break_ = False
- if self.last_mode is not None:
- try:
- self.browser.quit()
- except InvalidSessionIdException:
- pass
- self.last_mode = None
- def start_to_run(self, *args, func_cookie): # 用get请求url ->得到一个页面信息
- self.break_ = False
- self.page_source_dict = {}
- self.url_text = self.url.get_url() # 获取一个url
- url = self.url_text.url
- try:
- if self.url_text.mode == "get":
- self.selenium_mode(func_cookie, url)
- else: # requests模式
- self.requests_mode(func_cookie, url)
- except BaseException:
- raise CookiesError
- finally:
- self.last_mode = self.url_text.mode
- self.parser.browser = self.browser
- self.parser.init(url)
- return self.browser
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageDownloaderCookies(PagedownloaderBase, metaclass=ABCMeta):
- def monitoring_del_cookies(self, name): # 删除指定cookies
- self.browser.delete_cookie(name)
- def monitoring_clear_cookier(self): # 清空cookies
- self.browser.delete_all_cookies()
- def monitoring_add_cookies(self, cookies: dict): # 新增cookies
- self.browser.add_cookie(cookies)
- def monitoring_update_cookies(self, name, cookies: dict):
- cookies_list = self.browser.get_cookies()
- for i in cookies_list:
- if i.get("name", None) == name:
- self.browser.delete_cookie(name) # 删除原来cookies
- i.update(cookies)
- self.browser.add_cookie(i)
- return
- raise Exception
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageDownloaderRequests(PageDownloaderRun, metaclass=ABCMeta):
- def requests_start_cookies(self, func_cookie, url):
- self.cookie_dict[url] = requests.utils.dict_from_cookiejar(
- self.browser.cookies
- ) # 保存cookies
- func_cookie([self.cookie_dict[url]])
- def requests_run(self, parameters, url):
- self.browser = self.url_text.requests(
- url,
- headers=self.url_text.headers,
- timeout=self.url_text.time_out,
- **parameters,
- )
- def requests_data(self, parameters):
- if self.url_text.mode == "post":
- parameters["data"] = self.url_text.data
- return parameters
- def requests_cookies(self, func_cookie):
- try:
- parameters = {"cookies": self.cookie_dict[self.url_text.cookies]}
- except KeyError:
- parameters = {}
- func_cookie([])
- else:
- func_cookie([parameters["cookies"]])
- return parameters
- def requests_mode(self, func_cookie, url):
- parameters = self.requests_cookies(func_cookie)
- parameters = self.requests_data(parameters)
- self.requests_run(parameters, url)
- self.requests_start_cookies(func_cookie, url)
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageDownloaderSelenium(PageDownloaderRun, metaclass=ABCMeta):
- def selenium_quit(self):
- try:
- self.browser.quit()
- except InvalidSessionIdException:
- pass
- def selenium_cookies(self):
- try:
- if not self.url_text.new:
- raise UrlError
- cookies_list = self.cookie_dict_list[self.url_text.cookies]
- except (UrlError, KeyError):
- pass
- else:
- self.monitoring_clear_cookier()
- try:
- for i in cookies_list:
- self.monitoring_add_cookies(i)
- except WebDriverException:
- pass
- def start_selenium(self, quit_=True):
- if quit_:
- self.selenium_quit()
- self.browser = webdriver.Chrome(chrome_options=self.url_text.options)
- def selenium_run(self, url):
- self.browser.set_page_load_timeout(self.url_text.time_out) # 设置页面加载超时
- self.browser.set_script_timeout(self.url_text.time_out) # 设置页面异步js执行超时
- self.browser.get(url)
- def selenium_start_cookies(self, func_cookie, url):
- self.break_ = True
- def update_cookie():
- nonlocal self
- while self.break_:
- try:
- cookies = self.browser.get_cookies()
- func_cookie(cookies) # 与GUI通信显示cookie
- self.cookie_dict[url] = cookies
- time.sleep(0.5)
- except WebDriverException:
- pass
- self.cookie_Thread = threading.Thread(target=update_cookie)
- self.cookie_Thread.start()
- def selenium_mode(self, func_cookie, url):
- if self.url_text.new and self.last_mode == "get": # 重新启动
- self.start_selenium()
- elif self.last_mode is None:
- self.start_selenium(False)
- try:
- self.selenium_run(url)
- except WebDriverException:
- self.start_selenium()
- self.selenium_run(url)
- self.selenium_cookies()
- self.selenium_start_cookies(func_cookie, url)
- class PageParserBase:
- def __init__(self, downloader):
- self.downloader = downloader
- self.downloader.set_page_parser(self)
- self.func_list = []
- self.func_dict = {}
- self.n = 0
- self.init()
- def init(self, url=""):
- self.element_dict = {} # 记录属性的名字
- self.url_text = url
- @staticmethod
- def add_base(func): # 装饰器
- def wrap(num=None, name=None, *args, **kwargs):
- try:
- func(num=num, name=name, *args, **kwargs)
- return True, ''
- except BaseException as e:
- return False, str(e)
- return wrap
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserFunc(PageParserBase):
- def tra_func(self):
- self.func_list = []
- self.func_dict = {}
- self.n = 0
- def add_func(self, name, func):
- self.func_list.append(f"{name}[{self.n}]")
- self.func_dict[f"{name}[{self.n}]"] = func
- self.n += 1
- def del_func(self, index, end=False):
- if end:
- index = len(self.func_list) - index - 1
- del self.func_dict[self.func_list[index]]
- self.func_list[index] = "Func_have_been_del"
- self.func_dict["Func_have_been_del"] = lambda *args, **kwargs: None
- def return_func(self, only=True):
- if only:
- return self.func_list.copy()
- else:
- return [
- f"var[{index}]@ {i}" for index, i in enumerate(self.func_list.copy())
- ]
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserFind(PageParserFunc):
- def find_id(self, id_, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, id_
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_id(id_)
- ] # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = self.browser.find_elements_by_id(id_)
- self.add_func(f"find_ID:{id_}", find) # 添加func
- def find_class(self, class_name, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, class_name
- self.browser = self.browser
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_class_name(class_name)
- ] # 返回必须是list
- else:
- self.element_dict[
- f"{name}[{num}]"
- ] = self.browser.find_elements_by_class_name(
- class_name
- ) # 返回必须是list
- self.add_func(f"find_class:{class_name}", find) # 添加func
- def find_name(self, name_, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, name_
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_name(name_)
- ] # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = self.browser.find_elements_by_name(
- name_
- ) # 返回必须是list
- self.add_func(f"find_name:{name_}", find) # 添加func
- def find_xpath(self, xpath, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, xpath
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_xpath(xpath)
- ] # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = self.browser.find_elements_by_xpath(
- xpath
- ) # 返回必须是list
- self.add_func(f"find_xpath:{xpath}", find) # 添加func
- def find_css(self, css_selector, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, css_selector
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_css_selector(css_selector)
- ] # 返回必须是list
- else:
- self.element_dict[
- f"{name}[{num}]"
- ] = self.browser.find_elements_by_css_selector(
- css_selector
- ) # 返回必须是list
- self.add_func(f"find_css:{css_selector}", find) # 添加func
- def find_tag_name(self, tag_name, not_all=False, **kwargs):
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, tag_name
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_tag_name(tag_name)
- ] # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = self.browser.find_elements_by_tag_name(
- tag_name
- ) # 返回必须是list
- self.add_func(f"find_tagName:{tag_name}", find) # 添加func\
- def find_link_text(self, link_text, not_all=False, **kwargs): # 匹配link
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, link_text
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_link_text(link_text)
- ] # 返回必须是list
- else:
- self.element_dict[
- f"{name}[{num}]"
- ] = self.browser.find_elements_by_link_text(
- link_text
- ) # 返回必须是list
- self.add_func(f"find_link_text:{link_text}", find) # 添加func
- def find_partial_link_text(
- self, partial_link_text, not_all=False, **kwargs
- ): # 模糊匹配
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, partial_link_text
- if not_all:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_partial_link_text(partial_link_text)
- ] # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.find_element_by_partial_link_text(partial_link_text)
- ] # 返回必须是list
- self.add_func(f"find_partial_link_text:{partial_link_text}", find) # 添加func
- def find_switch_to_alert(self, *args, **kwargs): # 定位弹出框
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [self.browser.switch_to.alert()]
- self.add_func(f"find_alert", find) # 添加func
- def find_switch_to_active_element(self, *args, **kwargs): # 定位焦点元素
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [self.browser.switch_to.active_element()]
- self.add_func(f"active_element", find) # 添加func
- def find_switch_to_frame(self, reference, is_id=False, *args, **kwargs): # 定位Frame
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self, reference, is_id
- if reference is None:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.default_content()
- ] # 回到主文档
- elif reference == "":
- self.element_dict[f"{name}[{num}]"] = [self.browser.parent_frame()] # 回到父文档
- else:
- if is_id:
- reference = int(reference)
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.switch_to.frame(str(reference))
- ] # 定位进入文档
- func_name = {None: "主文档", "": "父文档"}.get(reference, reference)
- self.add_func(f"find_frame:{func_name}", find) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserActionListBox(PageParserFunc):
- def deselect_by_index(
- self, element_value, deselect, index=0, **kwargs
- ): # 根据index取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_index(int(deselect))
- self.add_func(
- f"deselect_by_index:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- def deselect_by_text(
- self, element_value, deselect, index=0, **kwargs
- ): # 根据text取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_visible_text(deselect)
- self.add_func(
- f"deselect_by_text:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- def deselect_by_value(
- self, element_value, deselect, index=0, **kwargs
- ): # 根据value取消选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].deselect_by_value(deselect)
- self.add_func(
- f"deselect_by_value:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- def select_by_index(self, element_value, deselect, index=0, **kwargs): # 根据index选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_index(int(deselect))
- self.add_func(
- f"select_by_index:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- def select_by_text(self, element_value, deselect, index=0, **kwargs): # 根据text选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_visible_text(deselect)
- self.add_func(
- f"select_by_text:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- def select_by_value(self, element_value, deselect, index=0, **kwargs): # 根据value选择
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].select_by_value(deselect)
- self.add_func(
- f"select_by_value:{deselect}>{element_value}[{index}]", action
- ) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserAction(PageParserFunc):
- def send_keys(self, text, element_value, index=0, **kwargs): # 输入文字
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].send_keys(text)
- self.add_func(f"sent_text:{text}>{element_value}[{index}]", action) # 添加func
- def authentication(
- self, user, passwd, element_value, index=0, **kwargs
- ): # 输入验证(User&Password)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].authenticate(user, passwd)
- self.add_func(
- f"Authentication:{user};{passwd}>{element_value}[{index}]", action
- ) # 添加func
- def clear(self, element_value, index=0, **kwargs): # 清空文本
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].clear()
- self.add_func(f"clear_text>{element_value}[{index}]", action) # 添加func
- def click(self, element_value, index=0, **kwargs): # 点击按钮
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].click()
- self.add_func(f"click>{element_value}[{index}]", action) # 添加func
- def accept(self, element_value, index=0, **kwargs): # 点击确定(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].accept()
- self.add_func(f"accept>{element_value}[{index}]", action) # 添加func
- def dismiss(self, element_value, index=0, **kwargs): # 点击取消(弹出框)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].dismiss()
- self.add_func(f"dismiss>{element_value}[{index}]", action) # 添加func
- def submit(self, element_value, index=0, **kwargs): # 提交表单
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[element_value][index].submit()
- self.add_func(f"submit>{element_value}[{index}]", action) # 添加func
- def run_js(self, js, **kwargs):
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- get = self.browser.execute_script(js)
- if hasattr(get, "__getitem__"): # 可切片
- self.element_dict[f"{name}[{num}]"] = get # 返回必须是list
- else:
- self.element_dict[f"{name}[{num}]"] = [get]
- self.add_func(f"run_js:{js}", action)
- class PageParserAutomation(PageParserFind, PageParserActionListBox, PageParserAction):
- pass
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserCookies(PageParserFunc):
- def del_all_cookies(self, **kwargs): # 删除所有曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_all_cookies()
- self.add_func(f"del_all_cookies", action)
- def del_cookies(self, cookies_name, **kwargs): # 删除指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.delete_cookie(cookies_name)
- self.add_func(f"del_cookies:{cookies_name}", action)
- def add_cookies(self, cookies, **kwargs): # 添加指定曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.add_cookie(cookies)
- self.add_func(f"add_cookies:{cookies}", action)
- def update_cookies(self, cookies_name, cookies, **kwargs): # 更新曲奇
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- now_cookies = self.browser.get_cookie(cookies_name)
- self.browser.delete_cookie(cookies_name)
- now_cookies.update(cookies)
- self.browser.add_cookie(now_cookies)
- self.add_func(f"add_cookies:{cookies}", action)
- def get_cookies(self, cookies_name, **kwargs): # 获取指定曲奇
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.get_cookie(cookies_name)
- ]
- self.add_func(f"get_cookies:{cookies_name}", action)
- def get_all_cookies(self, **kwargs): # 获取所有曲奇
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = self.browser.get_cookie()
- self.add_func(f"get_all_cookies", action)
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserBrowserActions(PageParserFunc):
- def back(self, **kwargs): # 返回
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.back()
- self.add_func(f"BACK", action)
- def forward(self, **kwargs): # 前进
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.forward()
- self.add_func(f"FORWARD", action)
- def refresh(self, **kwargs): # 刷新
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.refresh()
- self.add_func(f"REFRESH", action)
- def wait_sleep(self, time: int = 2, **kwargs): # 暴力等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f"WAIT:{time}s", action)
- def set_wait(self, time: int = 2, **kwargs): # 隐式等待
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- sleep(time)
- self.add_func(f"Loading_wait:{time}s", action)
- class PageParserBrowser(PageParserBrowserActions, PageParserCookies):
- pass
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserNeighbor(PageParserFunc):
- def __get_other_base(
- self, element_value, index: (slice, int), who="children", **kwargs
- ): # 获得子、后代、兄弟标签的基类
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- if who != "brothers":
- paser_list += {
- "children": bs.children,
- "offspring": bs.descendants,
- "down": bs.next_siblings,
- "up": bs.previous_siblings,
- }.get(who, bs.children)
- else:
- paser_list += bs.previous_siblings
- paser_list += bs.next_siblings
- self.element_dict[f"{name}[{num}]"] = list(set(paser_list))
- self.add_func(f"get_{who}:{element_value}[{index}]", action) # 添加func
- def get_children(self, element_value, index: (slice, int), **kwargs):
- return self.__get_other_base(element_value, index)
- def get_offspring(self, element_value, index: (slice, int), **kwargs):
- return self.__get_other_base(element_value, index, "offspring")
- def get_up(self, element_value, index: (slice, int), **kwargs):
- return self.__get_other_base(element_value, index, "up")
- def get_down(self, element_value, index: (slice, int), **kwargs):
- return self.__get_other_base(element_value, index, "down")
- def get_brothers(self, element_value, index: (slice, int), **kwargs):
- return self.__get_other_base(element_value, index, "brothers")
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserDataFindall(PageParserFunc):
- def findall(
- self,
- element_value,
- tag: (str, list),
- attribute: dict,
- limit,
- recursive,
- index: (slice, int),
- **kwargs,
- ): # 根据标签定位
- if isinstance(tag, str):
- tag = str(tag).split(",")
- try:
- limit = int(limit)
- except ValueError:
- limit = None
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(tag, attribute, limit=limit, recursive=recursive)
- except AttributeError:
- try:
- if str(bs.name) not in tag:
- raise PageParserError
- for agrs_name in attribute:
- text = attribute[agrs_name]
- if isinstance(text, str):
- if bs.attrs[agrs_name] != text:
- raise PageParserError
- else: # 正则匹配
- if not regular.match(text, bs.attrs[agrs_name]):
- raise PageParserError
- re = [bs]
- except PageParserError:
- re = []
- paser_list += re
- self.element_dict[f"{name}[{num}]"] = paser_list
- self.add_func(f"findAll:{element_value}[{index}]", action) # 添加func
- def findall_by_text(
- self,
- element_value,
- text: (regular.compile, str),
- limit,
- recursive,
- index: (slice, int),
- **kwargs,
- ): # 根据text定位
- try:
- limit = int(limit)
- except ValueError:
- limit = None
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = bs.find_all(text=text, limit=limit, recursive=recursive)
- except AttributeError:
- try:
- if isinstance(text, str):
- if str(bs.string) != text:
- raise PageParserError
- else:
- if not regular.match(text, str(bs.string)):
- raise PageParserError
- re = [bs]
- except PageParserError:
- re = []
- paser_list += re
- self.element_dict[f"{name}[{num}]"] = paser_list
- self.add_func(f"findAll_by_text:{element_value}[{index}]", action) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserDatabase(PageParserFunc):
- def to_database(
- self, element_value, index, data: (str, list), database_name: str, **kwargs
- ): # 传入data Base
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- for bs in iter_list:
- new = []
- for i in data:
- if i == "$name&":
- new.append(bs.name)
- elif i == "$self&":
- new.append(str(bs).replace("\n", ""))
- elif i == "$string$":
- new.append(str(bs.string).replace("\n", ""))
- else:
- new.append(bs.attrs.get(i, ""))
- data_base.add_database(database_name, new)
- self.add_func(
- f"DataBase:{data}<{element_value}[{index}]>{database_name}", action
- ) # 添加func
- def to_database_by_re(
- self, element_value, index, data: str, database_name: str, **kwargs
- ): # 通过正则,传入dataBase
- data = regular.compile(data)
- @self.add_base
- def action(*args, **kwargs):
- global data_base
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- for bs in iter_list:
- new = regular.findall(data, str(bs))
- data_base.add_database(database_name, new)
- self.add_func(
- f"DataBase:{data}<{element_value}[{index}]>{database_name}", action
- ) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserDataSource(PageParserFunc):
- def to_text(self, **kwargs): # 获取网页源码
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- try:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.page_source,
- self.url_text,
- ]
- except AttributeError:
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.text,
- self.url_text,
- ] # request
- self.add_func(f"get_page_source", action)
- def out_html(self, element_value, **kwargs): # 输出网页源码
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f"{time.time()}_{self.url_text}".encode("utf-8"))
- name = md5.hexdigest()
- save_dir = self.dir + f"{os.sep}" + name + ".cotan_source"
- print(save_dir)
- with open(save_dir, "w") as f:
- f.write(self.element_dict[element_value][0])
- with open(save_dir + ".CoTanURL", "w") as f:
- f.write(self.element_dict[element_value][1])
- self.add_func(f"write_html<{element_value}", action)
- def make_bs(self, element_value, **kwargs): # 解析成bs4对象
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [
- bs4.BeautifulSoup(self.element_dict[element_value][0], "html.parser")
- ]
- self.add_func(f"Parsing:{element_value}", action) # 添加func
- def add_url(
- self,
- element_value,
- index: (slice, int),
- url_name,
- update_func,
- url_args: dict,
- **kwargs,
- ): # 自动添加url
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- for bs in iter_list:
- try:
- if url_name == "$name&":
- new_url = bs.name
- elif url_name == "$self&":
- new_url = str(bs).replace("\n", "")
- elif url_name == "$string$":
- new_url = str(bs.string).replace("\n", "")
- else:
- new_url = bs.attrs.get(url_name, "")
- self.downloader.url.add_url(new_url, **url_args)
- except AttributeError:
- pass
- update_func() # 更新tkinter
- self.add_func(f"add_URL<{element_value}[{index}]:{url_name}", action) # 添加func
- def to_json(self, **kwargs):
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.json()
- ] # request 解析为 json
- self.add_func(f"to_json", action) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserTool(PageParserFunc):
- def list_slicing(self, index: (slice, int), element_value):
- if isinstance(index, int):
- return [self.element_dict[element_value][index]]
- else:
- return self.element_dict[element_value][index]
- def get_by_path(
- self, element_value, index: (slice, int), path, **kwargs
- ): # 根据bs4的目录选择
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- iter_list = self.list_slicing(index, element_value)
- paser_list = []
- for bs in iter_list:
- try:
- re = eval(str(path), {"self": bs})
- if re is None:
- raise PageParserError
- paser_list.append(re)
- except BaseException as e:
- logging.warning(str(e))
- self.element_dict[f"{name}[{num}]"] = paser_list
- self.add_func(f"get>{path}:{element_value}[{index}]", action) # 添加func
- def webpage_snapshot(self, **kwargs):
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- md5 = hashlib.md5() # 应用MD5算法
- md5.update(f"{time.time()}_{self.url_text}".encode("utf-8"))
- name = md5.hexdigest()
- with open(self.dir + f"{os.sep}" + name + ".png.CoTanURL", "w") as f:
- f.write(self.url_text)
- self.browser.save_screenshot(self.dir + f"{os.sep}" + name + ".png")
- sleep(1)
- self.add_func(f"Webpage_snapshot", action) # 添加func
- class PageParserData(PageParserDatabase, PageParserDataSource, PageParserDataFindall, PageParserTool):
- pass
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserChainsWindow(PageParserFunc):
- def get_all_windows(self, *args, **kwargs): # 获取所有句柄
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self
- # 获得窗口句柄
- self.element_dict[f"{name}[{num}]"] = self.browser.window_handles
- self.add_func(f"get_all_windows", find) # 添加func
- def get_now_windows(self, *args, **kwargs): # 获取当前窗口句柄
- @self.add_base
- def find(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [
- self.browser.current_window_handle
- ] # 获得当前窗口句柄
- self.add_func(f"get_now_window", find) # 添加func
- def switch_to_windwos(self, element_value, index=0, **kwargs): # 切换窗口
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.browser.switch_to.window(self.element_dict[element_value][index])
- self.add_func(f"switch_to_window>{element_value}[{index}]", action) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserClick(PageParserFunc):
- def action_click(self, chains, element_value, index, **kwargs): # 单击左
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].click(self.element_dict[element_value][index])
- self.add_func(f"[{chains}]click>[{element_value}][{index}]", action) # 添加func
- def action_double_click(self, chains, element_value, index, **kwargs): # 双击左
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].double_click(
- self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]double_click>[{element_value}][{index}]", action
- ) # 添加func
- def action_click_right(self, chains, element_value, index, **kwargs): # 点击右
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].context_click(
- self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]right_click>[{element_value}][{index}]", action
- ) # 添加func
- def action_click_and_hold(self, chains, element_value, index, **kwargs): # 按住左
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].click_and_hold(
- self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]click_and_hold>[{element_value}][{index}]", action
- ) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserChainsMouse(PageParserFunc):
- def action_release(self, chains, element_value, index, **kwargs): # 松开左键
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].release(
- self.element_dict[element_value][index]
- )
- self.add_func(f"[{chains}]release>[{element_value}][{index}]", action) # 添加func
- def action_drag_and_drop(
- self, chains, element_value, index, element_value2, index2, **kwargs
- ): # 拽托、松开
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].drag_and_drop(
- self.element_dict[element_value][index],
- self.element_dict[element_value2][index2],
- )
- self.add_func(
- f"[{chains}]drag_and_drop>[{element_value}][{index}]", action
- ) # 添加func
- def action_move(self, chains, element_value, index, **kwargs): # 移动鼠标
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].move_to_element(
- self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]drag_and_drop>[{element_value}][{index}]", action
- ) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserChainsKeys(PageParserFunc):
- @staticmethod
- def special_keys(key: str, is_special_keys):
- if is_special_keys:
- return keys_name_dict.get(key.lower(), key), f"[{key.upper()}]"
- else:
- return key, key
- def action_key_down(
- self, chains, key, element_value, index, is_special_keys, **kwargs
- ): # down
- new_key, key = self.special_keys(key, is_special_keys)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].key_down(
- new_key, self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]key_down>{key}:[{element_value}][{index}]", action
- ) # 添加func
- def action_key_up(
- self, chains, key, element_value, index, is_special_keys, **kwargs
- ): # down
- new_key, key = self.special_keys(key, is_special_keys)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].key_up(
- new_key, self.element_dict[element_value][index]
- )
- self.add_func(
- f"[{chains}]key_up>{key}:[{element_value}][{index}]", action
- ) # 添加func
- def action_send_keys_to_element(
- self, chains, key, element_value, index, is_special_keys, **kwargs
- ):
- new_key, key = self.special_keys(key, is_special_keys)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].send_keys_to_element(
- self.element_dict[element_value][index], new_key
- )
- self.add_func(
- f"[{chains}]sent>{key}:[{element_value}][{index}]", action
- ) # 添加func
- def action_send_keys(self, chains, key, is_special_keys, **kwargs): # 发送到焦点元素
- new_key, key = self.special_keys(key, is_special_keys)
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].send_keys(new_key)
- self.add_func(f"[{chains}].sent>{key}", action) # 添加func
- @plugin_class_loading(get_path(r'template/crawler'))
- class PageParserChains(PageParserChainsWindow, PageParserClick, PageParserChainsMouse,
- PageParserChainsKeys):
- def make_action_chains(self, **kwargs): # 创建动作链
- @self.add_base
- def action(num, name, *args, **kwargs):
- nonlocal self
- self.element_dict[f"{name}[{num}]"] = [ActionChains(self.browser)]
- self.add_func(f"make_ActionChains", action) # 添加func
- def action_run(self, chains, run_time=1, **kwargs): # 执行
- @self.add_base
- def action(*args, **kwargs):
- nonlocal self
- self.element_dict[chains][0].perform()
- sleep(run_time)
- self.add_func(f"[{chains}].run<{run_time}s", action) # 添加func
- for i in range(1, 13): # F1 - F12按键
- keys_name_dict[f"f{i}"] = eval(f"Keys.F{i}", {'Keys': Keys})
- data_base = DatabaseController()
|