1
0

Git_Ctrl.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. # -*- coding: <encoding name> -*-
  2. from git import Repo
  3. from os.path import split,exists
  4. import os
  5. import subprocess
  6. from time import time
  7. import random
  8. sys_seeting = dict(shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True)
  9. git_path = 'git'# git的地址,如果配置了环境变量则不需要修改
  10. stopKey = '【操作完成】'#存储stopKey的global变量
  11. passwd = '1234567890qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM'#stopKey的候选词库
  12. class git_Repo:#git的基类
  13. def __init__(self,Dic,*args,**kwargs):
  14. self.url = None
  15. try:
  16. if exists(Dic + r'/.git'): # 是否为git 仓库
  17. pass
  18. elif Dic[-4:] =='.git':
  19. Dic = Dic[:-5] # -5,得把/去掉
  20. else:raise Exception
  21. except:
  22. subprocess.Popen(f'{git_path} init',cwd=self.Repo_Dic,**sys_seeting).wait()
  23. self.Repo_Dic = Dic # 仓库地址(末尾不带/)
  24. self.repo = Repo(Dic)
  25. self.name = split(Dic)[-1]
  26. self.have_clone = True
  27. def make_stopKey(self):#生成一个随机stopKey
  28. global stopKey,passwd
  29. code = ''
  30. for _ in range(8):#八位随机数
  31. code += passwd[random.randint(0, len(passwd) - 1)]#时间戳+8位随机数
  32. stopKey = (str(time()) + code).replace('.','')
  33. return stopKey
  34. def Flie_List(self,file_list,is_file=True,pat=' '):
  35. if file_list == '.':
  36. file = '.'
  37. else:
  38. file_ = []
  39. for i in file_list:
  40. if i[:len(self.Repo_Dic)] == self.Repo_Dic:
  41. file_.append(i[len(self.Repo_Dic) + 1:]) # +1是为了去除/
  42. if not is_file:return file_
  43. file = pat.join(file_)
  44. return file
  45. def dir_list(self,all=True):
  46. listfile = []
  47. if all:
  48. listfile += [f'[当前分支] {self.repo.active_branch} 工作区{"不" if self.repo.is_dirty() else ""}干净 -> {self.name}']
  49. listfile += [f'{"[配置文件]" if i == ".git" else "[未跟踪]"if i in self.repo.untracked_files else "[已跟踪]"} {i}'
  50. for i in os.listdir(self.Repo_Dic)]
  51. return listfile
  52. def Add_File(self,file_list):
  53. file = self.Flie_List(file_list)
  54. return subprocess.Popen(f'echo 添加文件... && {git_path} add {file} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  55. def Reset_File(self,file_list):
  56. file = self.Flie_List(file_list)
  57. return subprocess.Popen(f'echo 撤销文件... && {git_path} rm --cached {file} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  58. def Commit_File(self,m):
  59. return subprocess.Popen(f'echo 提交文件: && {git_path} commit -m "{m}" && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  60. def Status(self):#执行status
  61. return subprocess.Popen(f'echo 仓库状态: && {git_path} status && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  62. def Log(self,graph,pretty,abbrev):#执行log
  63. args = ''
  64. if graph:
  65. args += ' --graph'
  66. if pretty:
  67. args += ' --pretty=oneline'
  68. if abbrev:
  69. args += ' --abbrev-commit'
  70. return subprocess.Popen(f'echo 仓库日志: && {git_path} log{args} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  71. def refLog(self):#执行reflog
  72. return subprocess.Popen(f'echo 操作记录: && {git_path} reflog && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  73. def Diff_File(self,MASTER='HEAD'):#执行diff
  74. return subprocess.Popen(f'echo 文件日志: && {git_path} diff {MASTER} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  75. def reset(self,HEAD='HEAD~1',Type=0):
  76. if Type == 0:
  77. type_ = '--mixed'#退回到工作区
  78. elif Type == 1:
  79. type_ = '--soft'#退回到暂存区
  80. else:
  81. type_ = '--hard' # 退回到暂存区
  82. return subprocess.Popen(f'echo 回退... && {git_path} reset {type_} {HEAD} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  83. def checkout(self,file_list):
  84. if len(file_list) >= 1:#多于一个文件,不用--,空格
  85. file = self.Flie_List(file_list,pat=' ')
  86. return subprocess.Popen(f'echo 丢弃修改: && {git_path} checkout {file} && echo {self.make_stopKey()}',cwd=self.Repo_Dic,**sys_seeting)
  87. elif len(file_list) == 1:
  88. return subprocess.Popen(f'echo 丢弃修改: && {git_path} checkout -- {file_list[0]} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  89. else:
  90. return subprocess.Popen(f'echo 丢弃修改: && {git_path} checkout * && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  91. def rm(self,file_list):#删除版本库中的文件
  92. file = self.Flie_List(file_list)
  93. return subprocess.Popen(f'echo 删除... && {git_path} rm {file}', cwd=self.Repo_Dic,**sys_seeting)
  94. def check_Branch(self):#查看本地分支和远程分支
  95. return subprocess.Popen(f'echo 仓库分支: && {git_path} branch -a && echo 远程仓库信息: && {git_path} remote -v && '
  96. f'echo 分支详情: && {git_path} branch -vv && echo {self.make_stopKey()}',
  97. cwd=self.Repo_Dic,**sys_seeting)
  98. def new_Branch(self,branch_name, origin):#新建分支
  99. return subprocess.Popen(f'echo 新建分支... && {git_path} branch {branch_name} {origin} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  100. def switch_Branch(self,branch_name):#切换分支
  101. return subprocess.Popen(f'echo 切换分支... && {git_path} switch {branch_name} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  102. def del_Branch(self,branch_name,del_type):#删除分支
  103. return subprocess.Popen(f'echo 删除分支... && {git_path} branch -{del_type} {branch_name} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  104. def merge_Branch(self,branch_name,no_ff,m=''):#合并分支
  105. if no_ff:no_ff = f' --no-ff -m "{m}"'#--no-ff前有空格
  106. else:no_ff = ''
  107. return subprocess.Popen(f'echo 合并分支... && {git_path} merge{no_ff} {branch_name} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  108. def merge_abort(self):#退出冲突处理
  109. return subprocess.Popen(f'echo 冲突处理退出... && {git_path} merge --abort && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  110. def Save_stash(self):#保存工作区
  111. return subprocess.Popen(f'echo 保存工作区... && {git_path} stash && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  112. def Stash_List(self):#工作区列表
  113. return subprocess.Popen(f'echo 工作区列表: && {git_path} stash list && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  114. def Apply_stash(self,stash_num = '0'):#恢复工作区
  115. return subprocess.Popen(f'echo 恢复工作区... && {git_path} stash apply stash@{{{stash_num}}} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  116. def Drop_stash(self,stash_num = '0'):#删除工作区
  117. return subprocess.Popen(f'echo 删除工作区... && {git_path} stash drop stash@{{{stash_num}}} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  118. def cherry_pick(self,commit):#补丁
  119. return subprocess.Popen(f'echo 补丁... && {git_path} cherry-pick {commit} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  120. def Del_remote(self,remote_name):
  121. return subprocess.Popen(f'echo 删除远程仓库... && {git_path} remote remove {remote_name} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  122. def Add_remote(self,remote,remote_name):
  123. return subprocess.Popen(f'echo 添加远程仓库... && {git_path} remote add {remote_name} {remote} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  124. def Bind_remote(self,local_name,remote_name):
  125. return subprocess.Popen(f'echo 分支绑定... && {git_path} branch --set-upstream-to={remote_name} {local_name} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,
  126. **sys_seeting)
  127. def push_Tag(self,tag,remote_name):
  128. return subprocess.Popen(f'echo 推送标签... && {git_path} push {remote_name} {tag} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  129. def del_tag(self,tag):
  130. return subprocess.Popen(f'echo 删除本地标签... && {git_path} tag -d {tag} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  131. def Add_Tag(self,tag,commit,message=''):
  132. a = ' -a'
  133. if message != '':
  134. message = f' -m "{message}"'#自带空格
  135. else:
  136. a = ''
  137. if commit != '':
  138. commit = f' {commit}'#自带空格
  139. return subprocess.Popen(f'echo 添加标签... && {git_path} tag{a} {tag}{commit}{message} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,
  140. **sys_seeting)
  141. def Tag(self,condition=''):
  142. if condition != '':
  143. condition = f' -l {condition}'
  144. return subprocess.Popen(f'echo 标签列表: && {git_path} tag{condition} && echo {self.make_stopKey()}', cwd=self.Repo_Dic,**sys_seeting)
  145. def show_new(self,condition):
  146. return subprocess.Popen(f'echo 查询结果: && {git_path} show {condition} && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  147. def Pull_Push_remote(self,Pull_Push=0,remote='',remote_branch='',local='',allow=False,u=False,f=False):
  148. #处理逻辑
  149. # 1)remote去斜杠第一个作为主机名字
  150. # 2) 从remote分离主机名(如果没指定)
  151. # 3) 如果local为空,用HEAD填充
  152. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  153. split = remote.split('/')
  154. try:
  155. remote_name = split[0]#获取主机名 1)
  156. except:
  157. remote_name = ''#没有主机名 1)
  158. branch_split = remote_branch.split('/')
  159. if len(branch_split) >= 2 and remote_name == '':
  160. remote_name = branch_split[0]# 2)
  161. remote_branch = '/'.join(branch_split[1:])# 2)
  162. if local.replace(' ','') == '':local = 'HEAD'# 3)
  163. if remote_name == '':# 4)
  164. branch = ''
  165. else:
  166. if Pull_Push == 1:
  167. # 注意,local不可以为空,也不会为空
  168. if remote_branch != '':branch = f'{local}:{remote_branch}'# git push <远程主机名> <本地分支名>:<远程分支名>
  169. else:branch = f'{local}'#要去掉冒号
  170. else:
  171. if remote_branch != 'HEAD':branch = f'{remote_branch}:{local}' # git push <远程主机名> <本地分支名>:<远程分支名>
  172. else:branch = f'{remote_branch}'
  173. if allow:
  174. history = ' --allow-unrelated-histories'
  175. else:
  176. history = ''
  177. push_pull = {0:"pull",1:f"push{' -u' if u else ''}{' -f' if f else ''}"}
  178. return subprocess.Popen(f'''echo 与服务器连接... && {git_path} {push_pull.get(Pull_Push,"pull")}{history} {remote_name} {branch} && echo {self.make_stopKey()}''',
  179. cwd=self.Repo_Dic, **sys_seeting)
  180. def del_Branch_remote(self,remote,remote_branch):
  181. split = remote.split('/')
  182. try:
  183. remote_name = split[0] # 获取主机名 1)
  184. except:
  185. remote_name = '' # 没有主机名 1)
  186. branch_split = remote_branch.split('/')
  187. if len(branch_split) >= 2 and remote_name == '':
  188. remote_name = branch_split[0] # 2)
  189. remote_branch = '/'.join(branch_split[1:]) # 2)
  190. return subprocess.Popen(f'echo 删除远程分支... && {git_path} push {remote_name} :{remote_branch} && echo {self.make_stopKey()}',cwd=self.Repo_Dic, **sys_seeting)
  191. def del_Tag_remote(self,remote,tag):
  192. return subprocess.Popen(f'echo 删除远程标签... && {git_path} push {remote} :refs/tags/{tag} && echo {self.make_stopKey()}',cwd=self.Repo_Dic, **sys_seeting)
  193. def fetch(self,remote,remote_branch,local):
  194. # 处理逻辑
  195. # 1)remote去斜杠第一个作为主机名字
  196. # 2) 从remote分离主机名(如果没指定)
  197. # 3) 如果local为空,用HEAD填充
  198. # 4) 如果以上后,主机名仍为空,则local和分支均为空
  199. split = remote.split('/')
  200. try:
  201. remote_name = split[0] # 获取主机名 1)
  202. except:
  203. remote_name = '' # 没有主机名 1)
  204. branch_split = remote_branch.split('/')
  205. if len(branch_split) >= 2 and remote_name == '':
  206. remote_name = branch_split[0] # 2)
  207. remote_branch = '/'.join(branch_split[1:]) # 2)
  208. if local.replace(' ', '') == '': local = 'HEAD' # 3)
  209. if remote_name == '': # 4)
  210. branch = ''
  211. else:
  212. if remote_branch != 'HEAD':
  213. branch = f'{remote_branch}:{local}' # git push <远程主机名> <本地分支名>:<远程分支名>
  214. else:
  215. branch = f'{remote_branch}'
  216. return subprocess.Popen(f'''echo 更新远程仓库... && {git_path} fetch {remote_name} {branch} && echo {self.make_stopKey()}''', cwd=self.Repo_Dic, **sys_seeting)
  217. def Customize(self,command:str):
  218. return subprocess.Popen(command,cwd=self.Repo_Dic, **sys_seeting)
  219. def Clone(self,url):
  220. return subprocess.Popen(f'echo 克隆操作不被允许 && echo {self.make_stopKey()}', cwd=self.Repo_Dic, **sys_seeting)
  221. def make_Dir(self,dir):
  222. if len(dir) == '':return dir
  223. inside = '/'
  224. if dir[0] == '/':
  225. inside = ''
  226. return self.Repo_Dic + inside + dir
  227. def reset_File(self,hard,file_list):#注意版本回退是:Reset_File
  228. file = self.Flie_List(file_list)
  229. return subprocess.Popen(
  230. f'''echo 回退文件... && {git_path} reset {hard} {file} && echo {self.make_stopKey()}''',
  231. cwd=self.Repo_Dic, **sys_seeting)
  232. def Branch_new(self,old_name,new_name):
  233. return subprocess.Popen(
  234. f'''echo 回退文件... && {git_path} branch -m {old_name} {new_name} && echo {self.make_stopKey()}''',
  235. cwd=self.Repo_Dic, **sys_seeting)
  236. class Clone_git(git_Repo):#Clone一个git
  237. def __init__(self, Dic, *args, **kwargs):
  238. self.Repo_Dic = Dic # 仓库地址
  239. self.url = None
  240. self.name = split(Dic)[-1]
  241. self.have_clone = False
  242. def Clone(self,url):
  243. if self.have_clone:super(Clone_git, self).Clone(url)
  244. self.have_clone = True
  245. return subprocess.Popen(f'echo 正在克隆... && {git_path} clone {url} {self.Repo_Dic}', cwd=split(self.Repo_Dic)[0], **sys_seeting)
  246. def after_Clone(self):
  247. self.repo = Repo(self.Repo_Dic)
  248. class git_Ctrol:
  249. def __init__(self):
  250. self.git_Dic = {}#名字-文件位置
  251. self.gitType_Dic = {}#名字-类型
  252. def Add_init(self,Dic,**kwargs):
  253. git = git_Repo(Dic)
  254. self.git_Dic[git.name] = git
  255. self.gitType_Dic[git.name] = 'init'
  256. return git.name
  257. def Clone_init(self,Dic,**kwargs):
  258. git = Clone_git(Dic)
  259. self.git_Dic[git.name] = git
  260. self.gitType_Dic[git.name] = 'clone'
  261. return git.name
  262. def get_git(self,name):
  263. return self.git_Dic[name]
  264. def get_git_Dic(self):
  265. return self.git_Dic.copy()
  266. def get_Dir(self,name):
  267. return self.get_git(name).dir_list()
  268. def add_File(self,name,dic_list):
  269. return self.get_git(name).Add_File(dic_list)
  270. def reset_File(self,name,dic_list):
  271. return self.get_git(name).Reset_File(dic_list)
  272. def commit_File(self,name,m):
  273. return self.get_git(name).Commit_File(m)
  274. def log(self,name,graph,pretty,abbrev):
  275. return self.get_git(name).Log(graph,pretty,abbrev)
  276. def reflog(self,name):
  277. return self.get_git(name).refLog()
  278. def status(self,name):
  279. return self.get_git(name).Status()
  280. def diff_File(self,name,MASTER):
  281. return self.get_git(name).Diff_File(MASTER)
  282. def back_version(self,name,HEAD,Type=0):
  283. return self.get_git(name).reset(HEAD,Type)
  284. def back_version_file(self,name,HEAD,File_list):
  285. return self.get_git(name).reset_File(HEAD, File_list)
  286. def checkout_version(self,name,file):
  287. return self.get_git(name).checkout(file)
  288. def rm(self,name,file):
  289. return self.get_git(name).rm(file)
  290. def check_Branch(self,name):
  291. return self.get_git(name).check_Branch()
  292. def new_Branch(self,name, new_branch, origin):
  293. return self.get_git(name).new_Branch(new_branch, origin)
  294. def switch_Branch(self,name,branch_name):
  295. return self.get_git(name).switch_Branch(branch_name)
  296. def Del_Branch(self,name,branch_name,type_):
  297. d = {1:'d',0:'D'}.get(type_,'d')
  298. return self.get_git(name).del_Branch(branch_name,d)
  299. def merge_Branch(self,name,branch_name,no_ff,m):
  300. return self.get_git(name).merge_Branch(branch_name,no_ff,m)
  301. def merge_abort(self,name):
  302. return self.get_git(name).merge_abort()
  303. def Save_stash(self,name):
  304. return self.get_git(name).Save_stash()
  305. def Stash_List(self,name):
  306. return self.get_git(name).Stash_List()
  307. def Apply_stash(self,name,stash_num='0'):
  308. return self.get_git(name).Apply_stash(stash_num)
  309. def Drop_stash(self,name,stash_num='0'):
  310. return self.get_git(name).Drop_stash(stash_num)
  311. def cherry_pick(self,name,commit):
  312. return self.get_git(name).cherry_pick(commit)
  313. def Del_remote(self,name,remote_name):
  314. return self.get_git(name).Del_remote(remote_name)
  315. def Add_remote(self,name,remote,remote_name):
  316. return self.get_git(name).Add_remote(remote,remote_name)
  317. def Bind_remote(self,name,local_name,remote_name):
  318. return self.get_git(name).Bind_remote(local_name,remote_name)
  319. def Pull_remote(self,name,local_name,remote_name,remote_branch,allow=False,u=False,f=False):
  320. return self.get_git(name).Pull_Push_remote(0,remote_name,remote_branch,local_name,allow,u,False)
  321. def Push_remote(self,name,local_name,remote_name,remote_branch,allow=False,u=False,f=False):
  322. return self.get_git(name).Pull_Push_remote(1,remote_name,remote_branch,local_name,False,u,f)#push没有allow选项
  323. def Tag(self,name, condition=''):
  324. return self.get_git(name).Tag(condition) # push没有allow选项
  325. def show_new(self,name, condition):
  326. return self.get_git(name).show_new(condition) # push没有allow选项
  327. def Add_Tag(self,name,tag,commit,message=''):
  328. return self.get_git(name).Add_Tag(tag,commit,message) # push没有allow选项
  329. def push_Tag(self,name, tag, remoto):
  330. return self.get_git(name).push_Tag(tag, remoto)
  331. def push_allTag(self,name, remoto):
  332. return self.get_git(name).push_Tag('--tags', remoto)
  333. def del_Tag_remote(self,name , remote, tag):
  334. return self.get_git(name).del_Tag_remote(remote, tag)
  335. def del_Branch_remote(self,name , remote, remote_Branch):
  336. return self.get_git(name).del_Branch_remote(remote, remote_Branch)
  337. def del_tag(self,name,tag):
  338. return self.get_git(name).del_tag(tag)
  339. def Fetch(self,name,local_name,remote_name,remote_branch):
  340. return self.get_git(name).fetch(remote_name,remote_branch,local_name)
  341. def Customize(self,name,command:str):
  342. return self.get_git(name).Customize(command)
  343. def Clone(self,name,url):
  344. return self.get_git(name).Clone(url)
  345. def After_Clone(self,name):
  346. try:
  347. return self.get_git(name).after_Clone()
  348. except:
  349. return None
  350. def make_Dir(self,name,dir):
  351. return self.get_git(name).make_Dir(dir)
  352. def Branch_new(self,name, old_name, new_name):
  353. return self.get_git(name).Branch_new(old_name,new_name)