GitController.py 23 KB


  1. # -*- coding: <encoding name> -*-
  2. from git import Repo
  3. from os.path import split, exists
  4. import os
  5. import subprocess
  6. from time import time
  7. import random
  8. sys_seeting = dict(
  9. shell=True,
  10. stdin=subprocess.PIPE,
  11. stdout=subprocess.PIPE,
  12. stderr=subprocess.STDOUT,
  13. universal_newlines=True,
  14. )
  15. git_path = "git" # git的地址,如果配置了环境变量则不需要修改
  16. stop_key = "【操作完成】" # 存储stopKey的global变量
  17. passwd = (
  18. "1234567890qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM" # stopKey的候选词库
  19. )
  20. class GitRepo: # git的基类
  21. def __init__(self, repo_dir, *args, **kwargs):
  22. self.url = None
  23. try:
  24. if exists(repo_dir + r"/.git"): # 是否为git 仓库
  25. pass
  26. elif repo_dir[-4:] == ".git":
  27. repo_dir = repo_dir[:-5] # -5,得把/去掉
  28. else:
  29. raise Exception
  30. except BaseException:
  31. subprocess.Popen(
  32. f"{git_path} init", cwd=self.repo_dir, **sys_seeting
  33. ).wait()
  34. self.repo_dir = repo_dir # 仓库地址(末尾不带/)
  35. self.repo = Repo(repo_dir) # 读取一个仓库
  36. self.name = split(repo_dir)[-1]
  37. self.have_clone = True
  38. def make_stop_key(self): # 生成一个随机stopKey
  39. global stop_key, passwd
  40. code = ""
  41. for _ in range(8): # 八位随机数
  42. code += passwd[random.randint(0, len(passwd) - 1)] # 时间戳+8位随机数
  43. stop_key = (str(time()) + code).replace(".", "")
  44. return stop_key
  45. def get_flie_list(self, file_list, is_file=True, pat=" "):
  46. if file_list == ".":
  47. file = ".."
  48. else:
  49. file_ = []
  50. for i in file_list:
  51. if i[: len(self.repo_dir)] == self.repo_dir:
  52. file_.append(i[len(self.repo_dir) + 1 :]) # +1是为了去除/
  53. if not is_file:
  54. return file_
  55. file = pat.join(file_)
  56. return file
  57. def dir_list(self, all=True):
  58. listfile = []
  59. if all:
  60. listfile += [
  61. f'[当前分支] {self.repo.active_branch} 工作区{"不" if self.repo.is_dirty() else ""}干净 -> {self.name}'
  62. ]
  63. listfile += [
  64. f'{"[配置文件]" if i == ".git" else "[未跟踪]"if i in self.repo.untracked_files else "[已跟踪]"} {i}'
  65. for i in os.listdir(self.repo_dir)
  66. ]
  67. return listfile
  68. def add(self, file_list):
  69. file = self.get_flie_list(file_list)
  70. return subprocess.Popen(
  71. f"echo 添加文件... && {git_path} add {file} && echo {self.make_stop_key()}",
  72. cwd=self.repo_dir,
  73. **sys_seeting,
  74. )
  75. def del_cached_file(self, file_list):
  76. file = self.get_flie_list(file_list)
  77. return subprocess.Popen(
  78. f"echo 撤销文件... && {git_path} rm --cached {file} && echo {self.make_stop_key()}",
  79. cwd=self.repo_dir,
  80. **sys_seeting,
  81. )
  82. def commit_file(self, m):
  83. return subprocess.Popen(
  84. f'echo 提交文件: && {git_path} commit -m "{m}" && echo {self.make_stop_key()}',
  85. cwd=self.repo_dir,
  86. **sys_seeting,
  87. )
  88. def status(self): # 执行status
  89. return subprocess.Popen(
  90. f"echo 仓库状态: && {git_path} status && echo {self.make_stop_key()}",
  91. cwd=self.repo_dir,
  92. **sys_seeting,
  93. )
  94. def log(self, graph, pretty, abbrev): # 执行log
  95. args = ""
  96. if graph:
  97. args += " --graph"
  98. if pretty:
  99. args += " --pretty=oneline"
  100. if abbrev:
  101. args += " --abbrev-commit"
  102. return subprocess.Popen(
  103. f"echo 仓库日志: && {git_path} log{args} && echo {self.make_stop_key()}",
  104. cwd=self.repo_dir,
  105. **sys_seeting,
  106. )
  107. def do_log(self): # 执行reflog
  108. return subprocess.Popen(
  109. f"echo 操作记录: && {git_path} reflog && echo {self.make_stop_key()}",
  110. cwd=self.repo_dir,
  111. **sys_seeting,
  112. )
  113. def diff(self, master="HEAD"): # 执行diff
  114. return subprocess.Popen(
  115. f"echo 文件日志: && {git_path} diff {master} && echo {self.make_stop_key()}",
  116. cwd=self.repo_dir,
  117. **sys_seeting,
  118. )
  119. def reset(self, head="HEAD~1", reset_type=0):
  120. if reset_type == 0:
  121. type_ = "--mixed" # 退回到工作区
  122. elif reset_type == 1:
  123. type_ = "--soft" # 退回到暂存区
  124. else:
  125. type_ = "--hard" # 退回到暂存区
  126. return subprocess.Popen(
  127. f"echo 回退... && {git_path} reset {type_} {head} && echo {self.make_stop_key()}",
  128. cwd=self.repo_dir,
  129. **sys_seeting,
  130. )
  131. def checkout(self, file_list):
  132. if len(file_list) >= 1: # 多于一个文件,不用--,空格
  133. file = self.get_flie_list(file_list, pat=" ")
  134. return subprocess.Popen(
  135. f"echo 丢弃修改: && {git_path} checkout {file} && echo {self.make_stop_key()}",
  136. cwd=self.repo_dir,
  137. **sys_seeting,
  138. )
  139. elif len(file_list) == 1:
  140. return subprocess.Popen(
  141. f"echo 丢弃修改: && {git_path} checkout -- {file_list[0]} && echo {self.make_stop_key()}",
  142. cwd=self.repo_dir,
  143. **sys_seeting,
  144. )
  145. else:
  146. return subprocess.Popen(
  147. f"echo 丢弃修改: && {git_path} checkout * && echo {self.make_stop_key()}",
  148. cwd=self.repo_dir,
  149. **sys_seeting,
  150. )
  151. def rm(self, file_list): # 删除版本库中的文件
  152. file = self.get_flie_list(file_list)
  153. return subprocess.Popen(
  154. f"echo 删除... && {git_path} rm {file}", cwd=self.repo_dir, **sys_seeting
  155. )
  156. def branch_view(self): # 查看本地分支和远程分支
  157. return subprocess.Popen(
  158. f"echo 仓库分支: && {git_path} branch -a && echo 远程仓库信息: && {git_path} remote -v && "
  159. f"echo 分支详情: && {git_path} branch -vv && echo {self.make_stop_key()}",
  160. cwd=self.repo_dir,
  161. **sys_seeting,
  162. )
  163. def new_branch(self, branch_name, origin): # 新建分支
  164. return subprocess.Popen(
  165. f"echo 新建分支... && {git_path} branch {branch_name} {origin} && echo {self.make_stop_key()}",
  166. cwd=self.repo_dir,
  167. **sys_seeting,
  168. )
  169. def switch_branch(self, branch_name): # 切换分支
  170. return subprocess.Popen(
  171. f"echo 切换分支... && {git_path} switch {branch_name} && echo {self.make_stop_key()}",
  172. cwd=self.repo_dir,
  173. **sys_seeting,
  174. )
  175. def del_branch(self, branch_name, del_type): # 删除分支
  176. return subprocess.Popen(
  177. f"echo 删除分支... && {git_path} branch -{del_type} {branch_name} && echo {self.make_stop_key()}",
  178. cwd=self.repo_dir,
  179. **sys_seeting,
  180. )
  181. def merge_branch(self, branch_name, no_ff, m=""): # 合并分支
  182. if no_ff:
  183. no_ff = f' --no-ff -m "{m}"' # --no-ff前有空格
  184. else:
  185. no_ff = ""
  186. return subprocess.Popen(
  187. f"echo 合并分支... && {git_path} merge{no_ff} {branch_name} && echo {self.make_stop_key()}",
  188. cwd=self.repo_dir,
  189. **sys_seeting,
  190. )
  191. def merge_abort(self): # 退出冲突处理
  192. return subprocess.Popen(
  193. f"echo 冲突处理退出... && {git_path} merge --abort && echo {self.make_stop_key()}",
  194. cwd=self.repo_dir,
  195. **sys_seeting,
  196. )
  197. def save_stash(self): # 保存工作区
  198. return subprocess.Popen(
  199. f"echo 保存工作区... && {git_path} stash && echo {self.make_stop_key()}",
  200. cwd=self.repo_dir,
  201. **sys_seeting,
  202. )
  203. def get_stash_list(self): # 工作区列表
  204. return subprocess.Popen(
  205. f"echo 工作区列表: && {git_path} stash list && echo {self.make_stop_key()}",
  206. cwd=self.repo_dir,
  207. **sys_seeting,
  208. )
  209. def apply_stash(self, stash_num="0"): # 恢复工作区
  210. return subprocess.Popen(
  211. f"echo 恢复工作区... && {git_path} stash apply stash@{{{stash_num}}} && echo {self.make_stop_key()}",
  212. cwd=self.repo_dir,
  213. **sys_seeting,
  214. )
  215. def drop_stash(self, stash_num="0"): # 删除工作区
  216. return subprocess.Popen(
  217. f"echo 删除工作区... && {git_path} stash drop stash@{{{stash_num}}} && echo {self.make_stop_key()}",
  218. cwd=self.repo_dir,
  219. **sys_seeting,
  220. )
  221. def cherry_pick(self, commit): # 补丁
  222. return subprocess.Popen(
  223. f"echo 补丁... && {git_path} cherry-pick {commit} && echo {self.make_stop_key()}",
  224. cwd=self.repo_dir,
  225. **sys_seeting,
  226. )
  227. def del_remote(self, remote_name):
  228. return subprocess.Popen(
  229. f"echo 删除远程仓库... && {git_path} remote remove {remote_name} && echo {self.make_stop_key()}",
  230. cwd=self.repo_dir,
  231. **sys_seeting,
  232. )
  233. def remote_add(self, remote, remote_name):
  234. return subprocess.Popen(
  235. f"echo 添加远程仓库... && {git_path} remote add {remote_name} {remote} && echo {self.make_stop_key()}",
  236. cwd=self.repo_dir,
  237. **sys_seeting,
  238. )
  239. def bind_branch(self, local_name, remote_name):
  240. return subprocess.Popen(
  241. f"echo 分支绑定... && {git_path} branch --set-upstream-to={remote_name} {local_name} && echo {self.make_stop_key()}",
  242. cwd=self.repo_dir,
  243. **sys_seeting,
  244. )
  245. def push_tag(self, tag, remote_name):
  246. return subprocess.Popen(
  247. f"echo 推送标签... && {git_path} push {remote_name} {tag} && echo {self.make_stop_key()}",
  248. cwd=self.repo_dir,
  249. **sys_seeting,
  250. )
  251. def del_tag(self, tag):
  252. return subprocess.Popen(
  253. f"echo 删除本地标签... && {git_path} tag -d {tag} && echo {self.make_stop_key()}",
  254. cwd=self.repo_dir,
  255. **sys_seeting,
  256. )
  257. def add_tag(self, tag, commit, message=""):
  258. a = " -a"
  259. if message != "":
  260. message = f' -m "{message}"' # 自带空格
  261. else:
  262. a = ""
  263. if commit != "":
  264. commit = f" {commit}" # 自带空格
  265. return subprocess.Popen(
  266. f"echo 添加标签... && {git_path} tag{a} {tag}{commit}{message} && echo {self.make_stop_key()}",
  267. cwd=self.repo_dir,
  268. **sys_seeting,
  269. )
  270. def get_tag_list(self, condition=""):
  271. if condition != "":
  272. condition = f" -l {condition}"
  273. return subprocess.Popen(
  274. f"echo 标签列表: && {git_path} tag{condition} && echo {self.make_stop_key()}",
  275. cwd=self.repo_dir,
  276. **sys_seeting,
  277. )
  278. def search_commit(self, condition):
  279. return subprocess.Popen(
  280. f"echo 查询结果: && {git_path} show {condition} && echo {self.make_stop_key()}",
  281. cwd=self.repo_dir,
  282. **sys_seeting,
  283. )
  284. def pull_push(
  285. self,
  286. pull_or_push=0,
  287. remote="",
  288. remote_branch="",
  289. local="",
  290. allow=False,
  291. u=False,
  292. f=False,
  293. ):
  294. # 处理逻辑
  295. # 1)remote去斜杠第一个作为主机名字
  296. # 2) 从remote分离主机名(如果没指定)
  297. # 3) 如果local为空,用HEAD填充
  298. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  299. split = remote.split("/")
  300. try:
  301. remote_name = split[0] # 获取主机名 1)
  302. except BaseException:
  303. remote_name = "" # 没有主机名 1)
  304. branch_split = remote_branch.split("/")
  305. if len(branch_split) >= 2 and remote_name == "":
  306. remote_name = branch_split[0] # 2)
  307. remote_branch = "/".join(branch_split[1:]) # 2)
  308. if local.replace(" ", "") == "":
  309. local = "HEAD" # 3)
  310. if remote_name == "": # 4)
  311. branch = ""
  312. else:
  313. if pull_or_push == 1:
  314. # 注意,local不可以为空,也不会为空
  315. if remote_branch != "":
  316. # git push <远程主机名> <本地分支名>:<远程分支名>
  317. branch = f"{local}:{remote_branch}"
  318. else:
  319. branch = f"{local}" # 要去掉冒号
  320. else:
  321. if remote_branch != "HEAD":
  322. # git push <远程主机名> <本地分支名>:<远程分支名>
  323. branch = f"{remote_branch}:{local}"
  324. else:
  325. branch = f"{remote_branch}"
  326. if allow:
  327. history = " --allow-unrelated-histories"
  328. else:
  329. history = ""
  330. push_pull = {0: "pull", 1: f"push{' -u' if u else ''}{' -f' if f else ''}"}
  331. return subprocess.Popen(
  332. f"""echo 与服务器连接... && {git_path} {push_pull.get(pull_or_push, "pull")}{history} {remote_name} {branch}
  333. && echo {self.make_stop_key()}""",
  334. cwd=self.repo_dir,
  335. **sys_seeting,
  336. )
  337. def del_branch_remote(self, remote, remote_branch):
  338. remote_split = remote.split("/")
  339. try:
  340. remote_name = remote_split[0] # 获取主机名 1)
  341. except BaseException:
  342. remote_name = "" # 没有主机名 1)
  343. branch_split = remote_branch.split("/")
  344. if len(branch_split) >= 2 and remote_name == "":
  345. remote_name = branch_split[0] # 2)
  346. remote_branch = "/".join(branch_split[1:]) # 2)
  347. return subprocess.Popen(
  348. f"echo 删除远程分支... && {git_path} push {remote_name} :{remote_branch} && echo {self.make_stop_key()}",
  349. cwd=self.repo_dir,
  350. **sys_seeting,
  351. )
  352. def del_tag_remote(self, remote, tag):
  353. return subprocess.Popen(
  354. f"echo 删除远程标签... && {git_path} push {remote} :refs/tags/{tag} && echo {self.make_stop_key()}",
  355. cwd=self.repo_dir,
  356. **sys_seeting,
  357. )
  358. def fetch(self, remote, remote_branch, local):
  359. # 处理逻辑
  360. # 1)remote去斜杠第一个作为主机名字
  361. # 2) 从remote分离主机名(如果没指定)
  362. # 3) 如果local为空,用HEAD填充
  363. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  364. split = remote.split("/")
  365. try:
  366. remote_name = split[0] # 获取主机名 1)
  367. except BaseException:
  368. remote_name = "" # 没有主机名 1)
  369. branch_split = remote_branch.split("/")
  370. if len(branch_split) >= 2 and remote_name == "":
  371. remote_name = branch_split[0] # 2)
  372. remote_branch = "/".join(branch_split[1:]) # 2)
  373. if local.replace(" ", "") == "":
  374. local = "HEAD" # 3)
  375. if remote_name == "": # 4)
  376. branch = ""
  377. else:
  378. if remote_branch != "HEAD":
  379. # git push <远程主机名> <本地分支名>:<远程分支名>
  380. branch = f"{remote_branch}:{local}"
  381. else:
  382. branch = f"{remote_branch}"
  383. return subprocess.Popen(
  384. f"""echo 更新远程仓库... && {git_path} fetch {remote_name} {branch} && echo {self.make_stop_key()}""",
  385. cwd=self.repo_dir,
  386. **sys_seeting,
  387. )
  388. def customize_command(self, command: str):
  389. return subprocess.Popen(
  390. f"{command} && echo {self.make_stop_key()}",
  391. cwd=self.repo_dir,
  392. **sys_seeting,
  393. )
  394. def clone(self, url):
  395. return subprocess.Popen(
  396. f"echo 克隆操作不被允许 && echo {self.make_stop_key()}",
  397. cwd=self.repo_dir,
  398. **sys_seeting,
  399. )
  400. def make_dir(self, dir):
  401. if len(dir) == "":
  402. return dir
  403. inside = "/"
  404. if dir[0] == "/":
  405. inside = ""
  406. return self.repo_dir + inside + dir
  407. def reset_file(self, hard, file_list): # 注意版本回退是:Reset_File
  408. file = self.get_flie_list(file_list)
  409. return subprocess.Popen(
  410. f"""echo 回退文件... && {git_path} reset {hard} {file} && echo {self.make_stop_key()}""",
  411. cwd=self.repo_dir,
  412. **sys_seeting,
  413. )
  414. def rename_branch(self, old_name, new_name):
  415. return subprocess.Popen(
  416. f"""echo 回退文件... && {git_path} branch -m {old_name} {new_name} && echo {self.make_stop_key()}""",
  417. cwd=self.repo_dir,
  418. **sys_seeting,
  419. )
  420. class CloneGit(GitRepo): # Clone一个git
  421. def __init__(self, repo_dir, *args, **kwargs):
  422. self.Repo_Dic = repo_dir # 仓库地址
  423. self.url = None
  424. self.name = split(repo_dir)[-1]
  425. self.have_clone = False
  426. def clone(self, url):
  427. if self.have_clone:
  428. super(CloneGit, self).clone(url)
  429. self.have_clone = True
  430. return subprocess.Popen(
  431. f"echo 正在克隆... && {git_path} clone {url} {self.Repo_Dic}",
  432. cwd=split(self.Repo_Dic)[0],
  433. **sys_seeting,
  434. )
  435. def after_clone(self):
  436. self.repo = Repo(self.Repo_Dic)
  437. class GitCtrol:
  438. def __init__(self):
  439. self.git_dict = {} # 名字-文件位置
  440. self.git_type_dict = {} # 名字-类型
  441. def open_repo(self, repo_dir, **kwargs):
  442. git = GitRepo(repo_dir)
  443. self.git_dict[git.name] = git
  444. self.git_type_dict[git.name] = "init"
  445. return git.name
  446. def clone_repo(self, repo_dir, **kwargs):
  447. git = CloneGit(repo_dir)
  448. self.git_dict[git.name] = git
  449. self.git_type_dict[git.name] = "clone"
  450. return git.name
  451. def get_git(self, name):
  452. return self.git_dict[name]
  453. def get_git_dict(self):
  454. return self.git_dict.copy()
  455. def get_dir(self, name):
  456. return self.get_git(name).dir_list()
  457. def add_file(self, name, file_list):
  458. return self.get_git(name).add(file_list)
  459. def del_cached_file(self, name, file_list):
  460. return self.get_git(name).del_cached_file(file_list) # 移除出去暂存区
  461. def commit_file(self, name, message):
  462. return self.get_git(name).commit_file(message)
  463. def log(self, name, graph, pretty, abbrev):
  464. return self.get_git(name).log(graph, pretty, abbrev)
  465. def do_log(self, name):
  466. return self.get_git(name).do_log()
  467. def status(self, name):
  468. return self.get_git(name).status()
  469. def diff_file(self, name, brach):
  470. return self.get_git(name).diff(brach)
  471. def back_version(self, name, head, reset_type=0):
  472. return self.get_git(name).reset(head, reset_type) # 版本回退HEAD
  473. def back_version_file(self, name, head, file_list):
  474. return self.get_git(name).reset_file(head, file_list) # 文件回退
  475. def checkout_version(self, name, file):
  476. return self.get_git(name).checkout(file) # 弹出
  477. def rm(self, name, file):
  478. return self.get_git(name).rm(file)
  479. def branch_view(self, name):
  480. return self.get_git(name).branch_view()
  481. def new_branch(self, name, new_branch, origin):
  482. return self.get_git(name).new_branch(new_branch, origin)
  483. def switch_branch(self, name, branch_name):
  484. return self.get_git(name).switch_branch(branch_name)
  485. def del_branch(self, name, branch_name, type_):
  486. d = {1: "d", 0: "D"}.get(type_, "d")
  487. return self.get_git(name).del_branch(branch_name, d)
  488. def merge_branch(self, name, branch_name, no_ff, m):
  489. return self.get_git(name).merge_branch(branch_name, no_ff, m)
  490. def merge_abort(self, name):
  491. return self.get_git(name).merge_abort()
  492. def save_stash(self, name):
  493. return self.get_git(name).save_stash()
  494. def stash_list(self, name):
  495. return self.get_git(name).get_stash_list()
  496. def apply_stash(self, name, stash_num="0"):
  497. return self.get_git(name).apply_stash(stash_num)
  498. def drop_stash(self, name, stash_num="0"):
  499. return self.get_git(name).drop_stash(stash_num)
  500. def cherry_pick(self, name, commit):
  501. return self.get_git(name).cherry_pick(commit)
  502. def del_remote(self, name, remote_name):
  503. return self.get_git(name).del_remote(remote_name)
  504. def remote_add(self, name, remote, remote_name):
  505. return self.get_git(name).remote_add(remote, remote_name)
  506. def bind_branch(self, name, local_name, remote_name):
  507. return self.get_git(name).bind_branch(local_name, remote_name)
  508. def pull_from_remote(
  509. self, name, local_name, remote_name, remote_branch, allow=False, u=False
  510. ):
  511. return self.get_git(name).pull_push(
  512. 0, remote_name, remote_branch, local_name, allow, u, False
  513. )
  514. def push_to_remote(
  515. self, name, local_name, remote_name, remote_branch, u=False, f=False
  516. ):
  517. return self.get_git(name).pull_push(
  518. 1, remote_name, remote_branch, local_name, False, u, f
  519. ) # push没有allow选项
  520. def tag(self, name, condition=""):
  521. return self.get_git(name).get_tag_list(condition) # push没有allow选项
  522. def show_new(self, name, condition):
  523. return self.get_git(name).search_commit(condition) # push没有allow选项
  524. def add_tag(self, name, tag, commit, message=""):
  525. return self.get_git(name).add_tag(tag, commit, message) # push没有allow选项
  526. def push_tag(self, name, tag, remoto):
  527. return self.get_git(name).push_tag(tag, remoto)
  528. def push_all_tag(self, name, remoto):
  529. return self.get_git(name).push_tag("--tags", remoto)
  530. def del_tag_remote(self, name, remote, tag):
  531. return self.get_git(name).del_tag_remote(remote, tag)
  532. def del_branch_remote(self, name, remote, remote_branch):
  533. return self.get_git(name).del_branch_remote(remote, remote_branch)
  534. def del_tag(self, name, tag):
  535. return self.get_git(name).del_tag(tag)
  536. def fetch(self, name, local_name, remote_name, remote_branch):
  537. return self.get_git(name).fetch(remote_name, remote_branch, local_name)
  538. def customize_command(self, name, command: str):
  539. return self.get_git(name).customize_command(command)
  540. def clone(self, name, url):
  541. return self.get_git(name).clone(url)
  542. def after_clone(self, name):
  543. try:
  544. return self.get_git(name).after_clone()
  545. except BaseException:
  546. return None
  547. def make_dir(self, name, dir):
  548. return self.get_git(name).make_dir(dir)
  549. def rename_branch(self, name, old_name, new_name):
  550. return self.get_git(name).rename_branch(old_name, new_name)