-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgitRepository.py
More file actions
174 lines (147 loc) · 7.02 KB
/
gitRepository.py
File metadata and controls
174 lines (147 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
import os
import threading
import requests
from qtCore import *
from git.repo import Repo
from git.repo.fun import is_git_dir
class GitRepository(QObject):
"""
git repository manager
"""
state_changed = pyqtSignal(str)
def __init__(self, local_path):
super(GitRepository, self).__init__()
self.local_path = local_path
self.screen_repo_url = "https://github.com/Mixwarebot/MixwareScreen.git"
self.screen_repo = None
self.screen_local_commit = None
self.screen_remote_commit = None
self.latest_firmware_url = 'https://api.github.com/repos/Mixwarebot/Mixware-Hyper-X-Firmware/releases/latest'
self.latest_firmware_version = None
self.firmware_path = 'firmware.bin'
self.firmware_url = None
self._state = None
self.initial()
self.screen_check_thread = None
self.screen_pull_thread = None
self.firmware_check_thread = None
self.firmware_download_thread = None
def initial(self):
"""
init git repository(local)
:return:
"""
if not os.path.exists(self.local_path):
os.makedirs(self.local_path)
git_local_path = os.path.join(self.local_path, '.git')
if is_git_dir(git_local_path):
self.screen_repo = Repo(self.local_path)
self.change_state(self.tr("Initialize git repository succeeded."))
def change_state(self, state: str):
self._state = state
logging.info(self._state)
self.state_changed.emit(self._state)
def screen_check(self):
"""
update remote status
check local commit & remote commit
:return:
"""
if self.screen_repo:
if self.tr("Updating Mixware Screen.") not in self._state:
self.change_state(self.tr("Checking Mixware Screen."))
try:
# os.system('git remote update origin --p')
# self.repo.remote().update()
self.screen_repo.remote().fetch()
logging.debug(f"git repository FETCH_HEAD: {self.screen_repo.git.rev_parse('FETCH_HEAD')}")
logging.debug(f"git repository HEAD: {self.screen_repo.git.rev_parse('HEAD')}")
logging.debug(f"git repository is_dirty: {self.screen_repo.is_dirty()}")
local_branch = self.screen_repo.active_branch
remote_branch = self.screen_repo.remotes.origin.refs[local_branch.name]
commits_ahead = list(self.screen_repo.iter_commits(f'{remote_branch.name}'))
self.screen_remote_commit = str(commits_ahead[0])[0:7]
self.screen_local_commit = str(self.commits()[0]['commit'])
logging.debug(f'local: {self.screen_local_commit}, remote: {self.screen_remote_commit}')
if self.tr("Updating Mixware Screen.") not in self._state:
self.change_state(self.tr("Mixware Screen check successful."))
except:
self.screen_local_commit = None
self.screen_remote_commit = None
if self.tr("Updating Mixware Screen.") not in self._state:
self.change_state(self.tr("Mixware Screen check failed."))
# commits_ahead = list(self.repo.iter_commits(f'{remote_branch.name}..{local_branch.name}'))
# commits_behind = list(self.repo.iter_commits(f'{local_branch.name}..{remote_branch.name}'))
# print(f"本地分支领先远程分支 {len(commits_ahead)} 个提交")
# print(f"本地分支落后远程分支 {len(commits_behind)} 个提交")
def start_screen_check(self):
self.screen_check_thread = threading.Thread(target=self.screen_check)
self.screen_check_thread.start()
def screen_exist_update(self):
'''
remote commit != local commit -> exist update -> True
:return:
'''
return self.screen_remote_commit and self.screen_remote_commit != self.screen_local_commit
def screen_pull(self):
"""
Pull the latest code from remote
:return:
"""
try:
self.change_state(self.tr("Updating Mixware Screen."))
self.screen_repo.git.pull()
# self.screen_check()
self.screen_local_commit = str(self.commits()[0]['commit'])
logging.debug(f'local: {self.screen_local_commit}, remote: {self.screen_remote_commit}')
if self.screen_remote_commit and self.screen_local_commit == self.screen_remote_commit:
self.change_state(self.tr("Mixware Screen updated successfully."))
# os.system('sudo systemctl restart MixwareScreen')
else:
self.change_state(self.tr("Mixware Screen update failed."))
except:
self.change_state(self.tr("Mixware Screen update failed."))
def start_screen_pull(self):
self.screen_pull_thread = threading.Thread(target=self.screen_pull)
self.screen_pull_thread.start()
def commits(self):
"""
get all commit log
:return:
"""
commit_log = self.screen_repo.git.log('--pretty={"commit":"%h"}', max_count=10)
log_list = commit_log.split('\n')
return [eval(item) for item in log_list]
def firmware_check(self):
self.change_state(self.tr("Checking for firmware updates."))
try:
firmware_latest_version_request = requests.get(self.latest_firmware_url)
self.latest_firmware_version = firmware_latest_version_request.json()['tag_name']
self.change_state(self.tr("Latest firmware version is Marlin {}.".format(self.latest_firmware_version)))
except:
self.change_state(self.tr("Check for firmware update failed."))
self.firmware_check_thread = None
def start_firmware_check(self):
self.firmware_check_thread = threading.Thread(target=self.firmware_check, daemon=True)
self.firmware_check_thread.start()
def get_firmware_latest_version(self):
return self.latest_firmware_version
def set_firmware_path(self, path):
self.firmware_path = path + '/firmware.bin'
def firmware_download(self):
self.change_state(self.tr("Downloading the latest firmware."))
try:
self.firmware_url = f'https://github.com/Mixwarebot/Mixware-Hyper-X-Firmware/releases/download/{self.latest_firmware_version}/firmware.bin'
firmware_request = requests.get(self.firmware_url)
if firmware_request.content != "Not Found":
with open(self.firmware_path, 'wb') as file:
file.write(firmware_request.content)
self.change_state(self.tr("Firmware download successful."))
else:
self.change_state(self.tr("Firmware download failed."))
except:
self.change_state(self.tr("Firmware download failed."))
def start_firmware_download(self):
self.firmware_download_thread = threading.Thread(target=self.firmware_download)
self.firmware_download_thread.start()