-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathurlwatch-hook.py
324 lines (254 loc) · 12.3 KB
/
urlwatch-hook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# -*- coding: utf-8 -*-
import re
import logging
import os
import unicodedata
import string
import time
import random
from urllib.parse import urlparse
from appdirs import AppDirs
import lxml.html
import yaml
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
import urlwatch
from urlwatch import filters
from urlwatch import jobs
from urlwatch import reporters
from urlwatch import handler
logger = logging.getLogger(__name__)
class GitSubPath(filters.FilterBase):
"""This is a Dummyfilter for git-report.
Its only purpose is to provide a subfilter String as path for gitreporter
"""
__kind__ = 'git-path'
def filter(self, data, subfilter=None):
if subfilter is None:
raise ValueError('git-path needs a name for a Subfolder in the Git Repository')
return data
class bUnicodeDummy(filters.FilterBase):
"""This is a Dummyfilter for git-report
If you use non asscii charakters in your Name you can change the filename Whitelist to a Blacklist
"""
__kind__ = 'bUnicode'
def filter(self, data, subfilter=None):
if subfilter is None:
subfilter = True
return data
class bNoHash(filters.FilterBase):
"""This is a Dummyfilter for git-report
If set to True the URL Hash is not added to the Filename.
"""
__kind__ = 'bNoHash'
def filter(self, data, subfilter=None):
if subfilter is None:
subfilter = True
return data
class SyosetuFilter(filters.FilterBase):
"""Its a Novel Chapter Filter for ncode.syosetsu.com"""
__kind__ = "Syosetu"
def filter(self, data, subfilter):
MATCH = {'url': re.compile('(http|https)://ncode.syosetu.com/.*')}
d = self.job.to_dict()
# It's a match if we have at least one key/value pair that matches,
# and no key/value pairs that do not match
matches = [v.match(d[k]) for k, v in MATCH.items() if k in d]
result = len(matches) > 0 and all(matches)
if not result:
raise ValueError("The Syosetsu Filter's just works with ncode.syosetu.com")
# Xpath Filter start
path = "//div[contains(@id,'novel_contents')]/div[contains(@id,'novel_color')]"
exclude = "//div[contains(@id,'novel_contents')]/div[contains(@id,'novel_color')]/div[contains(@id,'novel_no')]"
subfilter = {'method': 'html',
'path': path,
'exclude': exclude}
data = filters.XPathFilter.filter(self, data, subfilter)
# Xpath Filter end
# No Empty Lines start
data = "".join([line for line in data.splitlines(True) if line.strip("\r\n").strip()])
# No Empty Lines end
return data
class SyosetuIndexFilter(filters.FilterBase):
"""Its a Index Filter that returns a urls.yaml.
It Reads a Novelindex from ncode.syosetsu.com and generates a urls.yaml
subfilters are:
path: <string> # The Path with Filename for the new url.yaml (currently not in use)
shortname: <string> # A short name of the Novel, it will be added to the job name and git-path filter in the new Jobs"""
__kind__ = "SyosetuIndex"
def filter(self, data, subfilter):
MATCH = {'url': re.compile('(http|https)://ncode.syosetu.com/.*')}
d = self.job.to_dict()
# It's a match if we have at least one key/value pair that matches,
# and no key/value pairs that do not match
matches = [v.match(d[k]) for k, v in MATCH.items() if k in d]
result = len(matches) > 0 and all(matches)
if not result:
raise ValueError("The Syosetsu Filter's just works with ncode.syosetu.com")
# Check Substring and initilize some variables
if isinstance(subfilter, str):
shortname = subfilter
path = None
elif isinstance(subfilter, dict):
shortname = subfilter.get("shortname", None)
path = subfilter.get("path", None)
else:
shortname = None
path = None # Maybe i use this later
# Set git-path and bUnicode start
name = "SyosetuIndex"
if self.job.filter is None:
self.job.filter = "git-path:" + name + ",bUnicode:True"
else:
self.job.filter = self.job.filter + ",git-path:" + name + ",bUnicode:True"
# Set git-path and bUnicode end
# Xpath Filter start
xpath = "//div[contains(@id,'novel_contents')]/div[contains(@id,'novel_color')] \
/div[contains(@class,'index_box')]/dl/dd/a"
exclude = ""
xpathsubfilter = {'method': 'html',
'path': xpath,
'exclude': exclude}
linklist = filters.XPathFilter.filter(self, data, xpathsubfilter)
# Xpath Filter end
# Parse the Linklist and convert it to a urlwatch urls.yaml
html = lxml.html.fromstring(linklist)
jobList = []
for link in html.xpath("//a"):
sLinkname = link.text
sLinkhref = link.attrib.get("href", None)
iId = sLinkhref.split('/')[-2] # It returns the Chapter ID
sTargetUrl = self.job.get_location() + '/' + iId + '/'
link.attrib["href"] = sTargetUrl
if shortname is None:
shortname = sLinkhref.split('/')[-3] # It returns the Novel ID
sLinkname = "{id}-{shortname}-{sLinkname}".format(id=iId, shortname=shortname, sLinkname=sLinkname)
links = {'kind': 'url',
'filter': 'Syosetu,bUnicode:True,git-path:{name}/{shortname}'.format(name=name, shortname=shortname),
'name': sLinkname,
'url': sTargetUrl}
newJob = jobs.UrlJob.from_dict(links)
jobList.append(newJob)
linklist = lxml.html.tostring(html, encoding='unicode')
return yaml.dump_all([job.serialize() for job in jobList], default_flow_style=False)
# Custom Git Reporter
class GitReport(reporters.ReporterBase):
"""Create a File for each Job and Commit it to a Git Repository"""
__kind__ = 'gitreport'
def submit(self):
if self.config.get('enabled', False) is False:
return
from git import Repo
# We look if there is a Git Path in the config or we use a fallback
urlwatch_cache_dir = AppDirs(urlwatch.pkgname).user_cache_dir
fallback = os.path.join(urlwatch_cache_dir, 'git')
git_path = self.config.get('path', fallback)
if (git_path == ''):
logger.info('Git path is emptry. Using: ' + os.path.abspath(fallback))
git_path = fallback
# Look if the Folder is presend and if not create it
if not os.path.exists(git_path):
logger.debug('Create Folder: ' + git_path)
os.mkdir(git_path)
# Because its a new Folder, create a new Repository
repo = Repo.init(os.path.abspath(git_path))
else:
repo = Repo(os.path.abspath(git_path))
# Check if we have a remote Repository and fetch changes befor adding or changin files.
if repo.remotes != []:
print("Fetch and Pull from Git Repository")
remote = True
repo.remotes.origin.fetch() # This 2 Steps need some time.
repo.remotes.origin.pull()
else:
remote = False
commit_message = ""
new_files = []
# Write all Changes.
for job_state in self.report.get_filtered_job_states(self.job_states):
# Unchanged or Error states are nothing we can do with
if (job_state.verb == "unchanged" or job_state.verb == "error"):
continue
# I try to get a filterlist with its parameter
# if we find git-path filter then lets read its parameter
filters = {}
if job_state.job.filter is not None:
if isinstance(job_state.job.filter, list):
for item in job_state.job.filter:
key = next(iter(item))
filter_kind, filter_value = key, item[key]
filters[filter_kind] = filter_value
elif isinstance(job_state.job.filter, str):
filterslist = job_state.job.filter.split(',')
for key in filterslist:
if len(key.split(':', 1)) == 2:
filters[key.split(':', 1)[0]] = key.split(':', 1)[1]
parsed_uri = urlparse(job_state.job.get_location())
result = '{uri.netloc}'.format(uri=parsed_uri)
if filters.get('git-path', None) is not None:
job_path = os.path.join(git_path, filters['git-path'])
if not os.path.exists(job_path):
os.mkdir(job_path)
else:
# Check if the job_path exist and if not create it
job_path = os.path.join(git_path, result)
if not os.path.exists(job_path):
os.mkdir(job_path)
# Generate a save Filename
if(filters.get('bUnicode', False)): # bUnicode is a Dummyfilter, he does nothing else as to provide a Boolean
filename = self.clean_filename2(job_state.job.pretty_name())
else:
filename = self.clean_filename(job_state.job.pretty_name())
# Check if the Filename is presend without the hash or if the usage of the hash is diabled. If so, use that file instead.
if (os.path.exists(os.path.join(job_path, filename + '.txt')) and os.path.isfile(os.path.join(job_path, filename + '.txt')) or filters.get('bNoHash', False)):
filename += '.txt'
else:
filename += '.' + job_state.job.get_guid() + '.txt'
# Create the File or override the old file
with open(os.path.join(job_path, filename), 'w+', encoding='utf-8') as writer:
writer.write(job_state.new_data)
new_files.append(os.path.join(job_path, filename))
message = "%s\n%s \n%s\n\n" % (job_state.job.pretty_name(), result, job_state.job.get_location())
commit_message += message
# Add all Changes in one Commit
if (len(list(self.report.get_filtered_job_states(self.job_states))) > 0):
repo.index.add(new_files)
repo.index.commit(commit_message)
# Check if we have a remote Repository and push the changes.
if remote:
print("Push Changes to the Repository ...")
repo.remotes.origin.push()
print("Done.")
# This Function is from https://gist.github.com/wassname/1393c4a57cfcbf03641dbc31886123b8
@staticmethod
def clean_filename(filename, replace=' '):
whitelist = "-_.() %s%s" % (string.ascii_letters, string.digits)
char_limit = 210 # I add a Sha-1 Hash and the file extension
# replace spaces
for r in replace:
filename = filename.replace(r, '_')
# keep only valid ascii chars
cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
# keep only whitelisted chars
cleaned_filename = ''.join(c for c in cleaned_filename if c in whitelist)
if len(cleaned_filename) > char_limit:
logger.info("Warning, filename truncated because it was over {}. Filenames may no longer be unique".format(char_limit))
return cleaned_filename[:char_limit]
# This Function is from https://gist.github.com/wassname/1393c4a57cfcbf03641dbc31886123b8
# I changed this to a blacklist to fit my needs with asian Filenames
@staticmethod
def clean_filename2(filename, replace=' '):
blacklist = "|*/\\%&$§!?=<>:\""
char_limit = 210 # I add a Sha-1 Hash and the file extension
# replace spaces
for r in replace:
filename = filename.replace(r, '_')
# keep only valid ascii chars
cleaned_filename = unicodedata.normalize('NFKD', filename)
# remove blacklistet chars
cleaned_filename = ''.join(c for c in cleaned_filename if c not in blacklist)
if len(cleaned_filename) > char_limit:
logger.info("Warning, filename truncated because it was over {}. Filenames may no longer be unique".format(char_limit))
return cleaned_filename[:char_limit]