Skip to content

Commit 36084f9

Browse files
committed
Serializing crawler queue transactions
Fix #867
1 parent 8a6ec96 commit 36084f9

File tree

2 files changed

+48
-76
lines changed

2 files changed

+48
-76
lines changed

minet/crawl/queue.py

Lines changed: 25 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ class BrokenCrawlerQueue(Exception):
182182
# because its #.get method can block, but will raise Empty when drained.
183183
# This is fine with `quenouille` if we don't forget to call #.unblock
184184
# on panic.
185+
# NOTE: sqlite needs to serialize writes, which means it is not particularly
186+
# useful in our case to allow for concurrent access to the queue because most
187+
# of its operations need to update the database somehow. This also means we
188+
# can rely on a single lock to make multithreaded transactions safe.
185189
class CrawlerQueue:
186190
# Params
187191
persistent: bool
@@ -192,15 +196,13 @@ class CrawlerQueue:
192196

193197
# State
194198
tasks: Dict[str, int]
195-
put_connection: sqlite3.Connection
196-
task_connection: sqlite3.Connection
199+
connection: sqlite3.Connection
197200
counter: int
198201
cleanup_interval: int
199202
vacuum_interval: int
200203
waiter: Condition
201204
currently_waiting_count: int
202-
put_lock: Lock
203-
task_lock: Lock
205+
transaction_lock: Lock
204206
broken: bool
205207

206208
def __init__(
@@ -257,29 +259,22 @@ def __init__(
257259

258260
self.waiter = Condition()
259261
self.currently_waiting_count = 0
260-
self.put_lock = Lock()
261-
self.task_lock = Lock()
262+
self.transaction_lock = Lock()
262263
self.broken = False
263264

264-
# NOTE: we need two connection if we are to allow concurrent
265-
# put and task acquisition.
266-
self.put_connection = sqlite3.connect(full_path, check_same_thread=False)
267-
self.task_connection = sqlite3.connect(full_path, check_same_thread=False)
265+
self.connection = sqlite3.connect(full_path, check_same_thread=False)
268266

269267
# NOTE: it's seems it is safer and common practice to
270268
# reexecute pragmas each time because they might not
271269
# be stored persistently in some instances.
272-
self.put_connection.executescript(SQL_PRAGMAS)
273-
self.put_connection.commit()
274-
275-
self.task_connection.executescript(SQL_PRAGMAS)
276-
self.task_connection.commit()
270+
self.connection.executescript(SQL_PRAGMAS)
271+
self.connection.commit()
277272

278273
if inspect:
279274
return
280275

281276
# Setup
282-
with self.global_transaction() as cursor:
277+
with self.transaction() as cursor:
283278
if not self.resuming:
284279
cursor.executescript(SQL_CREATE)
285280
else:
@@ -302,33 +297,11 @@ def __init__(
302297
cursor.execute("VACUUM;")
303298

304299
@contextmanager
305-
def put_transaction(self):
306-
cursor = None
307-
try:
308-
with self.put_lock, self.put_connection:
309-
cursor = self.put_connection.cursor()
310-
yield cursor
311-
finally:
312-
if cursor is not None:
313-
cursor.close()
314-
315-
@contextmanager
316-
def task_transaction(self):
317-
cursor = None
318-
try:
319-
with self.task_lock, self.task_connection:
320-
cursor = self.task_connection.cursor()
321-
yield cursor
322-
finally:
323-
if cursor is not None:
324-
cursor.close()
325-
326-
@contextmanager
327-
def global_transaction(self):
300+
def transaction(self):
328301
cursor = None
329302
try:
330-
with self.put_lock, self.task_lock, self.task_connection:
331-
cursor = self.task_connection.cursor()
303+
with self.transaction_lock, self.connection:
304+
cursor = self.connection.cursor()
332305
yield cursor
333306
finally:
334307
if cursor is not None:
@@ -340,7 +313,7 @@ def explain_query_plan(self, sql: str) -> str:
340313

341314
sql = sql.replace("?", "1")
342315

343-
with self.global_transaction() as cursor:
316+
with self.transaction() as cursor:
344317
cursor.execute("EXPLAIN QUERY PLAN %s" % sql)
345318

346319
return "\n".join(row[3] for row in iterate_over_sqlite_cursor(cursor))
@@ -350,14 +323,14 @@ def __count(self, cursor: sqlite3.Cursor) -> int:
350323
return cursor.fetchone()[0]
351324

352325
def qsize(self) -> int:
353-
with self.global_transaction() as cursor:
326+
with self.transaction() as cursor:
354327
return self.__count(cursor)
355328

356329
def __len__(self) -> int:
357330
return self.qsize()
358331

359332
def put_many(self, jobs: Iterable[CrawlJob]) -> int:
360-
with self.put_transaction() as cursor:
333+
with self.transaction() as cursor:
361334
rows = []
362335

363336
for job in jobs:
@@ -422,7 +395,7 @@ def get(self, block=True) -> CrawlJob:
422395
if self.broken:
423396
raise BrokenCrawlerQueue
424397

425-
with self.task_transaction() as cursor:
398+
with self.transaction() as cursor:
426399
cursor.execute(
427400
SQL_GET_JOB % ("ASC" if not self.is_lifo else "DESC"),
428401
(now(),),
@@ -497,7 +470,7 @@ def unblock(self) -> None:
497470
def worked_groups(self) -> Dict[str, Tuple[int, int]]:
498471
g = {}
499472

500-
with self.global_transaction() as cursor:
473+
with self.transaction() as cursor:
501474
cursor.execute(
502475
'SELECT "group", "count", "allowed" FROM "parallelism" WHERE "count" > 0;'
503476
)
@@ -508,7 +481,7 @@ def worked_groups(self) -> Dict[str, Tuple[int, int]]:
508481
return g
509482

510483
def dump(self) -> Iterator[CrawlerQueueRecord]:
511-
with self.global_transaction() as cursor:
484+
with self.transaction() as cursor:
512485
cursor.execute(SQL_DUMP)
513486

514487
for row in iterate_over_sqlite_cursor(cursor):
@@ -554,11 +527,11 @@ def __vacuum_and_analyze(self, cursor: sqlite3.Cursor) -> None:
554527
cursor.connection.commit()
555528

556529
def cleanup(self) -> None:
557-
with self.global_transaction() as cursor:
530+
with self.transaction() as cursor:
558531
self.__cleanup(cursor)
559532

560533
def clear(self) -> None:
561-
with self.global_transaction() as cursor:
534+
with self.transaction() as cursor:
562535
self.__clear(cursor)
563536

564537
# NOTE: there is a subtle difference between #.release_group
@@ -570,7 +543,7 @@ def clear(self) -> None:
570543
# task. This is important to ensure atomicity as well as possible.
571544
# Without creating backpressure on the queue's constraints.
572545
def release_group(self, job: CrawlJob) -> None:
573-
with self.task_transaction() as cursor:
546+
with self.transaction() as cursor:
574547
if job.id not in self.tasks:
575548
raise RuntimeError("job is not being worked")
576549

@@ -601,7 +574,7 @@ def group_releaser(self, job: CrawlJob):
601574
self.release_group(job)
602575

603576
def task_done(self, job: CrawlJob) -> None:
604-
with self.task_transaction() as cursor:
577+
with self.transaction() as cursor:
605578
index = self.tasks.pop(job.id, None)
606579

607580
if index is None:
@@ -620,8 +593,7 @@ def task_done(self, job: CrawlJob) -> None:
620593
self.__vacuum_and_analyze(cursor)
621594

622595
def close(self) -> None:
623-
self.put_connection.close()
624-
self.task_connection.close()
596+
self.connection.close()
625597

626598
def __del__(self) -> None:
627599
self.close()

minet/crawl/url_cache.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,22 @@ class AtomicSet(Generic[T]):
3232
"""
3333

3434
__items: Set[T]
35-
__lock: Lock
35+
lock: Lock
3636

3737
def __init__(self):
3838
self.__items = set()
39-
self.__lock = Lock()
39+
self.lock = Lock()
4040

4141
def add(self, item: T) -> bool:
42-
with self.__lock:
42+
with self.lock:
4343
len_before = len(self.__items)
4444

4545
self.__items.add(item)
4646

4747
return len(self.__items) > len_before
4848

4949
def add_many(self, items: Iterable[T]) -> int:
50-
with self.__lock:
50+
with self.lock:
5151
len_before = len(self.__items)
5252

5353
for item in items:
@@ -66,7 +66,7 @@ def add_many_and_keep_new(self, items: Iterable[T], key: None = ...) -> List[T]:
6666
...
6767

6868
def add_many_and_keep_new(self, items, key=None):
69-
with self.__lock:
69+
with self.lock:
7070
new = []
7171

7272
for item in items:
@@ -81,15 +81,15 @@ def add_many_and_keep_new(self, items, key=None):
8181
return new
8282

8383
def __len__(self) -> int:
84-
with self.__lock:
84+
with self.lock:
8585
return len(self.__items)
8686

8787
def __contains__(self, item: T) -> bool:
88-
with self.__lock:
88+
with self.lock:
8989
return item in self.__items
9090

9191
def __iter__(self) -> Iterator[T]:
92-
with self.__lock:
92+
with self.lock:
9393
yield from self.__items
9494

9595
def close(self) -> None:
@@ -112,30 +112,30 @@ def __init__(self, path: str, db_name: str = "set.db"):
112112

113113
self.path = path
114114

115-
self.__connection = sqlite3.connect(
115+
self.connection = sqlite3.connect(
116116
join(self.path, db_name), check_same_thread=False
117117
)
118-
self.__lock = Lock()
118+
self.lock = Lock()
119119

120120
# Setup
121121
# NOTE: this is reexecuted on resume and this is fine
122122
# NOTE: it seems it's safer to reexecute pragmas anyway
123-
self.__connection.executescript(SQL_CREATE)
124-
self.__connection.commit()
123+
self.connection.executescript(SQL_CREATE)
124+
self.connection.commit()
125125

126126
@contextmanager
127-
def __transaction(self):
127+
def transaction(self):
128128
cursor = None
129129
try:
130-
with self.__lock, self.__connection:
131-
cursor = self.__connection.cursor()
130+
with self.lock, self.connection:
131+
cursor = self.connection.cursor()
132132
yield cursor
133133
finally:
134134
if cursor is not None:
135135
cursor.close()
136136

137137
def add(self, item: str) -> bool:
138-
with self.__transaction() as cursor:
138+
with self.transaction() as cursor:
139139
try:
140140
cursor.execute('INSERT INTO "set" ("key") VALUES (?);', (item,))
141141
except sqlite3.IntegrityError:
@@ -144,7 +144,7 @@ def add(self, item: str) -> bool:
144144
return True
145145

146146
def add_many(self, items: Iterable[str]) -> int:
147-
with self.__transaction() as cursor:
147+
with self.transaction() as cursor:
148148
rows = [(item,) for item in items]
149149
cursor.executemany('INSERT OR IGNORE INTO "set" ("key") VALUES (?);', rows)
150150
return cursor.rowcount
@@ -160,7 +160,7 @@ def add_many_and_keep_new(self, items: Iterable[str], key: None = ...) -> List[s
160160
...
161161

162162
def add_many_and_keep_new(self, items, key=None):
163-
with self.__transaction() as cursor:
163+
with self.transaction() as cursor:
164164
new = []
165165

166166
for item in items:
@@ -175,31 +175,31 @@ def add_many_and_keep_new(self, items, key=None):
175175

176176
# NOTE: I keep this for the ugly trick.
177177
# def contains_many(self, items: Iterable[str]) -> List[str]:
178-
# with self.__transaction() as cursor:
178+
# with self.transaction() as cursor:
179179
# cursor.execute(
180180
# 'SELECT "key" FROM "set" WHERE "key" in (SELECT "value" FROM JSON_EACH(?));',
181181
# (json.dumps(items),),
182182
# )
183183

184184
def __contains__(self, item: str) -> bool:
185-
with self.__transaction() as cursor:
185+
with self.transaction() as cursor:
186186
cursor.execute('SELECT 1 FROM "set" WHERE "key" = ?;', (item,))
187187
return cursor.fetchone() is not None
188188

189189
def __len__(self) -> int:
190-
with self.__transaction() as cursor:
190+
with self.transaction() as cursor:
191191
cursor.execute('SELECT COUNT(*) FROM "set";')
192192
return cursor.fetchone()[0]
193193

194194
def __iter__(self) -> Iterator[str]:
195-
with self.__transaction() as cursor:
195+
with self.transaction() as cursor:
196196
cursor.execute('SELECT "key" FROM "set";')
197197

198198
for row in iterate_over_sqlite_cursor(cursor):
199199
yield row[0]
200200

201201
def close(self) -> None:
202-
self.__connection.close()
202+
self.connection.close()
203203

204204
def __del__(self) -> None:
205205
self.close()

0 commit comments

Comments
 (0)