1
0

template.py 18 KB


  1. import os
  2. import random
  3. import subprocess
  4. from abc import ABCMeta, abstractmethod
  5. from git import Repo
  6. from os.path import exists, split
  7. from time import time
  8. from system import plugin_class_loading, get_path
  9. sys_seeting = dict(
  10. shell=True,
  11. stdin=subprocess.PIPE,
  12. stdout=subprocess.PIPE,
  13. stderr=subprocess.STDOUT,
  14. universal_newlines=True,
  15. )
  16. git_path = "git" # git的地址,如果配置了环境变量则不需要修改
  17. stop_key = "【操作完成】" # 存储stopKey的global变量
  18. passwd = (
  19. "1234567890qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM" # stopKey的候选词库
  20. )
  21. class GitBase(metaclass=ABCMeta):
  22. def __init__(self, repo_dir, *args, **kwargs):
  23. self.repo_dir = ''
  24. self.repo = None
  25. self.init(repo_dir)
  26. @abstractmethod
  27. def init(self, repo_dir):
  28. pass
  29. @staticmethod
  30. def make_stop_key(): # 生成一个随机stopKey
  31. global stop_key
  32. code = ""
  33. for _ in range(8): # 八位随机数
  34. code += passwd[random.randint(0, len(passwd) - 1)] # 时间戳+8位随机数
  35. stop_key = (str(time()) + code).replace(".", "")
  36. return stop_key
  37. def get_flie_list(self, file_list, is_file=True, pat=" "):
  38. if file_list == ".":
  39. file = ".."
  40. else:
  41. file_ = []
  42. for i in file_list:
  43. if i[: len(self.repo_dir)] == self.repo_dir:
  44. file_.append(i[len(self.repo_dir) + 1:]) # +1是为了去除/
  45. if not is_file:
  46. return file_
  47. file = pat.join(file_)
  48. return file
  49. @plugin_class_loading(get_path(r'template/gitrepo'))
  50. class ViewClasses(GitBase, metaclass=ABCMeta):
  51. def status(self): # 执行status
  52. return subprocess.Popen(
  53. f"echo 仓库状态: && {git_path} status && echo {self.make_stop_key()}",
  54. cwd=self.repo_dir,
  55. **sys_seeting,
  56. )
  57. def rm(self, file_list): # 删除版本库中的文件
  58. file = self.get_flie_list(file_list)
  59. return subprocess.Popen(
  60. f"echo 删除... && {git_path} rm {file}", cwd=self.repo_dir, **sys_seeting
  61. )
  62. def dir_list(self, all_=True):
  63. listfile = []
  64. if all_:
  65. listfile += [
  66. f'[当前分支] {self.repo.active_branch} 工作区{"不" if self.repo.is_dirty() else ""}干净 -> {self.name}'
  67. ]
  68. listfile += [
  69. f'{"[配置文件]" if i == ".git" else "[未跟踪]"if i in self.repo.untracked_files else "[已跟踪]"} {i}'
  70. for i in os.listdir(self.repo_dir)
  71. ]
  72. return listfile
  73. def log(self, graph, pretty, abbrev): # 执行log
  74. args = ""
  75. if graph:
  76. args += " --graph"
  77. if pretty:
  78. args += " --pretty=oneline"
  79. if abbrev:
  80. args += " --abbrev-commit"
  81. return subprocess.Popen(
  82. f"echo 仓库日志: && {git_path} log{args} && echo {self.make_stop_key()}",
  83. cwd=self.repo_dir,
  84. **sys_seeting,
  85. )
  86. def do_log(self): # 执行reflog
  87. return subprocess.Popen(
  88. f"echo 操作记录: && {git_path} reflog && echo {self.make_stop_key()}",
  89. cwd=self.repo_dir,
  90. **sys_seeting,
  91. )
  92. def diff(self, master="HEAD"): # 执行diff
  93. return subprocess.Popen(
  94. f"echo 文件日志: && {git_path} diff {master} && echo {self.make_stop_key()}",
  95. cwd=self.repo_dir,
  96. **sys_seeting,
  97. )
  98. def branch_view(self): # 查看本地分支和远程分支
  99. return subprocess.Popen(
  100. f"echo 仓库分支: && {git_path} branch -a && echo 远程仓库信息: && {git_path} remote -v && "
  101. f"echo 分支详情: && {git_path} branch -vv && echo {self.make_stop_key()}",
  102. cwd=self.repo_dir,
  103. **sys_seeting,
  104. )
  105. def get_stash_list(self): # 工作区列表
  106. return subprocess.Popen(
  107. f"echo 工作区列表: && {git_path} stash list && echo {self.make_stop_key()}",
  108. cwd=self.repo_dir,
  109. **sys_seeting,
  110. )
  111. def get_tag_list(self, condition=""):
  112. if condition != "":
  113. condition = f" -l {condition}"
  114. return subprocess.Popen(
  115. f"echo 标签列表: && {git_path} tag{condition} && echo {self.make_stop_key()}",
  116. cwd=self.repo_dir,
  117. **sys_seeting,
  118. )
  119. def search_commit(self, condition):
  120. return subprocess.Popen(
  121. f"echo 查询结果: && {git_path} show {condition} && echo {self.make_stop_key()}",
  122. cwd=self.repo_dir,
  123. **sys_seeting,
  124. )
  125. @plugin_class_loading(get_path(r'template/gitrepo'))
  126. class NewClasses(GitBase, metaclass=ABCMeta):
  127. def add(self, file_list):
  128. file = self.get_flie_list(file_list)
  129. return subprocess.Popen(
  130. f"echo 添加文件... && {git_path} add {file} && echo {self.make_stop_key()}",
  131. cwd=self.repo_dir,
  132. **sys_seeting,
  133. )
  134. def commit_file(self, m):
  135. return subprocess.Popen(
  136. f'echo 提交文件: && {git_path} commit -m "{m}" && echo {self.make_stop_key()}',
  137. cwd=self.repo_dir,
  138. **sys_seeting,
  139. )
  140. def new_branch(self, branch_name, origin): # 新建分支
  141. return subprocess.Popen(
  142. f"echo 新建分支... && {git_path} branch {branch_name} {origin} && echo {self.make_stop_key()}",
  143. cwd=self.repo_dir,
  144. **sys_seeting,
  145. )
  146. def save_stash(self): # 保存工作区
  147. return subprocess.Popen(
  148. f"echo 保存工作区... && {git_path} stash && echo {self.make_stop_key()}",
  149. cwd=self.repo_dir,
  150. **sys_seeting,
  151. )
  152. def cherry_pick(self, commit): # 补丁
  153. return subprocess.Popen(
  154. f"echo 补丁... && {git_path} cherry-pick {commit} && echo {self.make_stop_key()}",
  155. cwd=self.repo_dir,
  156. **sys_seeting,
  157. )
  158. def remote_add(self, remote, remote_name):
  159. return subprocess.Popen(
  160. f"echo 添加远程仓库... && {git_path} remote add {remote_name} {remote} && echo {self.make_stop_key()}",
  161. cwd=self.repo_dir,
  162. **sys_seeting,
  163. )
  164. def bind_branch(self, local_name, remote_name):
  165. return subprocess.Popen(
  166. f"echo 分支绑定... && {git_path} branch --set-upstream-to={remote_name} {local_name} && echo "
  167. f"{self.make_stop_key()}",
  168. cwd=self.repo_dir,
  169. **sys_seeting,
  170. )
  171. def add_tag(self, tag, commit, message=""):
  172. a = " -a"
  173. if message != "":
  174. message = f' -m "{message}"' # 自带空格
  175. else:
  176. a = ""
  177. if commit != "":
  178. commit = f" {commit}" # 自带空格
  179. return subprocess.Popen(
  180. f"echo 添加标签... && {git_path} tag{a} {tag}{commit}{message} && echo {self.make_stop_key()}",
  181. cwd=self.repo_dir,
  182. **sys_seeting,
  183. )
  184. @plugin_class_loading(get_path(r'template/gitrepo'))
  185. class RemoveClass(GitBase, metaclass=ABCMeta):
  186. def del_cached_file(self, file_list):
  187. file = self.get_flie_list(file_list)
  188. return subprocess.Popen(
  189. f"echo 撤销文件... && {git_path} rm --cached {file} && echo {self.make_stop_key()}",
  190. cwd=self.repo_dir,
  191. **sys_seeting,
  192. )
  193. def del_branch(self, branch_name, del_type): # 删除分支
  194. return subprocess.Popen(
  195. f"echo 删除分支... && {git_path} branch -{del_type} {branch_name} && echo {self.make_stop_key()}",
  196. cwd=self.repo_dir,
  197. **sys_seeting,
  198. )
  199. def drop_stash(self, stash_num="0"): # 删除工作区
  200. return subprocess.Popen(
  201. f"echo 删除工作区... && {git_path} stash drop stash@{{{stash_num}}} && echo {self.make_stop_key()}",
  202. cwd=self.repo_dir,
  203. **sys_seeting,
  204. )
  205. def del_remote(self, remote_name):
  206. return subprocess.Popen(
  207. f"echo 删除远程仓库... && {git_path} remote remove {remote_name} && echo {self.make_stop_key()}",
  208. cwd=self.repo_dir,
  209. **sys_seeting,
  210. )
  211. def del_tag(self, tag):
  212. return subprocess.Popen(
  213. f"echo 删除本地标签... && {git_path} tag -d {tag} && echo {self.make_stop_key()}",
  214. cwd=self.repo_dir,
  215. **sys_seeting,
  216. )
  217. def del_branch_remote(self, remote, remote_branch):
  218. remote_split = remote.split("/")
  219. remote_name = remote_split[0] # 获取主机名 1)
  220. branch_split = remote_branch.split("/")
  221. if len(branch_split) >= 2 and remote_name == "":
  222. remote_name = branch_split[0] # 2)
  223. remote_branch = "/".join(branch_split[1:]) # 2)
  224. return subprocess.Popen(
  225. f"echo 删除远程分支... && {git_path} push {remote_name} :{remote_branch} && echo {self.make_stop_key()}",
  226. cwd=self.repo_dir,
  227. **sys_seeting,
  228. )
  229. def del_tag_remote(self, remote, tag):
  230. return subprocess.Popen(
  231. f"echo 删除远程标签... && {git_path} push {remote} :refs/tags/{tag} && echo {self.make_stop_key()}",
  232. cwd=self.repo_dir,
  233. **sys_seeting,
  234. )
  235. @plugin_class_loading(get_path(r'template/gitrepo'))
  236. class BackClasses(GitBase, metaclass=ABCMeta):
  237. def reset(self, head="HEAD~1", reset_type=0):
  238. if reset_type == 0:
  239. type_ = "--mixed" # 退回到工作区
  240. elif reset_type == 1:
  241. type_ = "--soft" # 退回到暂存区
  242. else:
  243. type_ = "--hard" # 退回到暂存区
  244. return subprocess.Popen(
  245. f"echo 回退... && {git_path} reset {type_} {head} && echo {self.make_stop_key()}",
  246. cwd=self.repo_dir,
  247. **sys_seeting,
  248. )
  249. def checkout(self, file_list):
  250. if len(file_list) >= 1: # 多于一个文件,不用--,空格
  251. file = self.get_flie_list(file_list, pat=" ")
  252. return subprocess.Popen(
  253. f"echo 丢弃修改: && {git_path} checkout {file} && echo {self.make_stop_key()}",
  254. cwd=self.repo_dir,
  255. **sys_seeting,
  256. )
  257. elif len(file_list) == 1:
  258. return subprocess.Popen(
  259. f"echo 丢弃修改: && {git_path} checkout -- {file_list[0]} && echo {self.make_stop_key()}",
  260. cwd=self.repo_dir,
  261. **sys_seeting,
  262. )
  263. else:
  264. return subprocess.Popen(
  265. f"echo 丢弃修改: && {git_path} checkout * && echo {self.make_stop_key()}",
  266. cwd=self.repo_dir,
  267. **sys_seeting,
  268. )
  269. def apply_stash(self, stash_num="0"): # 恢复工作区
  270. return subprocess.Popen(
  271. f"echo 恢复工作区... && {git_path} stash apply stash@{{{stash_num}}} && echo {self.make_stop_key()}",
  272. cwd=self.repo_dir,
  273. **sys_seeting,
  274. )
  275. def reset_file(self, hard, file_list): # 注意版本回退是:Reset_File
  276. file = self.get_flie_list(file_list)
  277. return subprocess.Popen(
  278. f"""echo 回退文件... && {git_path} reset {hard} {file} && echo {self.make_stop_key()}""",
  279. cwd=self.repo_dir,
  280. **sys_seeting,
  281. )
  282. @plugin_class_loading(get_path(r'template/gitrepo'))
  283. class ParallelClasses(GitBase, metaclass=ABCMeta):
  284. def switch_branch(self, branch_name): # 切换分支
  285. return subprocess.Popen(
  286. f"echo 切换分支... && {git_path} switch {branch_name} && echo {self.make_stop_key()}",
  287. cwd=self.repo_dir,
  288. **sys_seeting,
  289. )
  290. def merge_branch(self, branch_name, no_ff, m=""): # 合并分支
  291. if no_ff:
  292. no_ff = f' --no-ff -m "{m}"' # --no-ff前有空格
  293. else:
  294. no_ff = ""
  295. return subprocess.Popen(
  296. f"echo 合并分支... && {git_path} merge{no_ff} {branch_name} && echo {self.make_stop_key()}",
  297. cwd=self.repo_dir,
  298. **sys_seeting,
  299. )
  300. def merge_abort(self): # 退出冲突处理
  301. return subprocess.Popen(
  302. f"echo 冲突处理退出... && {git_path} merge --abort && echo {self.make_stop_key()}",
  303. cwd=self.repo_dir,
  304. **sys_seeting,
  305. )
  306. def rename_branch(self, old_name, new_name):
  307. return subprocess.Popen(
  308. f"""echo 回退文件... && {git_path} branch -m {old_name} {new_name} && echo {self.make_stop_key()}""",
  309. cwd=self.repo_dir,
  310. **sys_seeting,
  311. )
  312. @plugin_class_loading(get_path(r'template/gitrepo'))
  313. class RemoteClasses(GitBase, metaclass=ABCMeta):
  314. def push_tag(self, tag, remote_name):
  315. return subprocess.Popen(
  316. f"echo 推送标签... && {git_path} push {remote_name} {tag} && echo {self.make_stop_key()}",
  317. cwd=self.repo_dir,
  318. **sys_seeting,
  319. )
  320. def pull_push(
  321. self,
  322. pull_or_push=0,
  323. remote="",
  324. remote_branch="",
  325. local="",
  326. allow=False,
  327. u=False,
  328. f=False,
  329. ):
  330. # 处理逻辑
  331. # 1)remote去斜杠第一个作为主机名字
  332. # 2) 从remote分离主机名(如果没指定)
  333. # 3) 如果local为空,用HEAD填充
  334. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  335. split_ = remote.split("/")
  336. remote_name = split_[0] # 获取主机名 1)
  337. branch_split = remote_branch.split("/")
  338. if len(branch_split) >= 2 and remote_name == "":
  339. remote_name = branch_split[0] # 2)
  340. remote_branch = "/".join(branch_split[1:]) # 2)
  341. if local.replace(" ", "") == "":
  342. local = "HEAD" # 3)
  343. if remote_name == "": # 4)
  344. branch = ""
  345. else:
  346. if pull_or_push == 1:
  347. # 注意,local不可以为空,也不会为空
  348. if remote_branch != "":
  349. # git push <远程主机名> <本地分支名>:<远程分支名>
  350. branch = f"{local}:{remote_branch}"
  351. else:
  352. branch = f"{local}" # 要去掉冒号
  353. else:
  354. if remote_branch != "HEAD":
  355. branch = f"{remote_branch}:{local}"
  356. else:
  357. branch = f"{remote_branch}"
  358. if allow:
  359. history = " --allow-unrelated-histories"
  360. else:
  361. history = ""
  362. push_pull = {0: "pull", 1: f"push{' -u' if u else ''}{' -f' if f else ''}"}
  363. return subprocess.Popen(
  364. f"""echo 与服务器连接... && {git_path} {push_pull.get(pull_or_push, "pull")}{history} {remote_name} {branch}
  365. && echo {self.make_stop_key()}""",
  366. cwd=self.repo_dir,
  367. **sys_seeting,
  368. )
  369. def fetch(self, remote, remote_branch, local):
  370. # 处理逻辑
  371. # 1)remote去斜杠第一个作为主机名字
  372. # 2) 从remote分离主机名(如果没指定)
  373. # 3) 如果local为空,用HEAD填充
  374. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  375. split_ = remote.split("/")
  376. remote_name = split_[0] # 获取主机名 1)
  377. branch_split = remote_branch.split("/")
  378. if len(branch_split) >= 2 and remote_name == "":
  379. remote_name = branch_split[0] # 2)
  380. remote_branch = "/".join(branch_split[1:]) # 2)
  381. if local.replace(" ", "") == "":
  382. local = "HEAD" # 3)
  383. if remote_name == "": # 4)
  384. branch = ""
  385. else:
  386. if remote_branch != "HEAD":
  387. # git push <远程主机名> <本地分支名>:<远程分支名>
  388. branch = f"{remote_branch}:{local}"
  389. else:
  390. branch = f"{remote_branch}"
  391. return subprocess.Popen(
  392. f"""echo 更新远程仓库... && {git_path} fetch {remote_name} {branch} && echo {self.make_stop_key()}""",
  393. cwd=self.repo_dir,
  394. **sys_seeting,
  395. )
  396. def clone(self, url):
  397. return subprocess.Popen(
  398. f"echo 克隆操作不被允许 && echo {self.make_stop_key()}",
  399. cwd=self.repo_dir,
  400. **sys_seeting,
  401. )
  402. @plugin_class_loading(get_path(r'template/gitrepo'))
  403. class GitRepo(ViewClasses, NewClasses, RemoveClass, BackClasses, ParallelClasses, RemoteClasses): # git的基类
  404. def init(self, repo_dir):
  405. print(repo_dir)
  406. self.url = None
  407. try:
  408. if exists(repo_dir + rf"{os.sep}.git"): # 是否为git 仓库
  409. pass
  410. elif repo_dir.endswith(".git"):
  411. repo_dir = repo_dir[:-5] # -5,得把/去掉
  412. else:
  413. assert False
  414. except (AssertionError, IndexError):
  415. subprocess.Popen(
  416. f"{git_path} init", cwd=repo_dir, **sys_seeting
  417. ).wait()
  418. self.repo_dir = repo_dir # 仓库地址(末尾不带/)
  419. self.repo = Repo(repo_dir) # 读取一个仓库
  420. self.name = split(repo_dir)[-1]
  421. self.have_clone = True
  422. def customize_command(self, command: str):
  423. return subprocess.Popen(
  424. f"{command} && echo {self.make_stop_key()}",
  425. cwd=self.repo_dir,
  426. **sys_seeting,
  427. )
  428. def make_dir(self, dir_):
  429. if len(dir_) == "":
  430. return dir_
  431. if dir_[0].startswith(os.sep):
  432. inside = ""
  433. else:
  434. inside = os.sep
  435. return self.repo_dir + inside + dir_
  436. @plugin_class_loading(get_path(r'template/gitrepo'))
  437. class CloneGit(GitRepo): # Clone一个git
  438. def init(self, repo_dir, *args, **kwargs):
  439. self.Repo_Dic = repo_dir # 仓库地址
  440. self.url = None
  441. self.name = split(repo_dir)[-1]
  442. self.have_clone = False
  443. def clone(self, url):
  444. if self.have_clone:
  445. super(CloneGit, self).clone(url)
  446. self.have_clone = True
  447. return subprocess.Popen(
  448. f"echo 正在克隆... && {git_path} clone {url} {self.Repo_Dic}",
  449. cwd=split(self.Repo_Dic)[0],
  450. **sys_seeting,
  451. )
  452. def after_clone(self):
  453. self.repo = Repo(self.Repo_Dic)