-
Notifications
You must be signed in to change notification settings - Fork 0
/
lruqueue-broker.scm
100 lines (86 loc) · 3.72 KB
/
lruqueue-broker.scm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
;; Least-recently used (LRU) queue device
;; This contains the polling version of the LRU broker.
;; Polling is a blocking operation and prevents other Chicken
;; threads from running, so the front- and backends are in
;; a separate process. See lruqueue-ends.scm.
(use zmq data-structures)
(include "zhelpers.scm")
(define +num-clients+ 10)
(define +num-workers+ 3)
(define (receive-empty-message s)
(assert (string=? "" (receive-message* s)))
"")
(define frontend (make-socket 'xrep))
(define backend (make-socket 'xrep))
(bind-socket frontend "ipc://frontend.ipc")
(bind-socket backend "ipc://backend.ipc")
(define q (make-queue))
;; Handle worker activity on backend, forwarding messages to the proper
;; front-end address and queuing the (now-free) worker on the LRU queue.
;; Quits after having replied to +num-clients+ clients.
;; Worker messages look like
;; worker-addr "" client-addr "" reply indicating client reply, OR like
;; worker-addr "" "READY" indicating first time connect.
(define handle-worker ;; return #t to continue, #f to terminate
(let ((clients +num-clients+))
(lambda ()
(let* ((worker-addr (receive-message* backend))
(empty (receive-empty-message backend))
(client-addr (receive-message* backend)))
;; data-structures queue does not track length, so we don't assert < +num-workers+
(queue-add! q worker-addr)
(cond ((string=? client-addr "READY") #t) ; continue
(else
(receive-empty-message backend)
(let ((reply (receive-message* backend)))
(send-multipart-message* frontend
client-addr "" reply)
(printf "Worker ~A -> ~A ~S\n"
worker-addr client-addr reply)
(set! clients (- clients 1))
(> clients 0))))))))
;; Handle client activity on frontend, routing messages to LRU worker.
;; Client messages look like:
;; client-addr "" request
;; and routed client messages look like:
;; worker-addr "" client-addr "" request
(define handle-client
(lambda ()
(let* ((client-addr (receive-message* frontend))
(empty (receive-empty-message frontend))
(request (receive-message* frontend))
(worker-addr (queue-remove! q)))
(send-multipart-message* backend
worker-addr ""
client-addr "" request)
(printf "Client ~A -> ~A ~S\n"
client-addr worker-addr request)
#t)))
;;; main
;; Always poll for worker activity on backend; poll front-end only if
;; we have available workers. Note that the native C example ensures
;; that when no workers are available, we do not chew CPU while waiting
;; for one to become available. In contrast, the Python example always
;; polls the frontend and tests for worker availability afterward,
;; which will spin. We follow the C approach here.
(define backend-items (poll-items (list backend) '()))
(define bothend-items (poll-items (list backend frontend) '()))
(call/cc
(lambda (finish)
(forever
(cond ((queue-empty? q)
(poll backend-items #t) ;; maybe poll!
(when (poll-items-in? backend-items 0)
(unless (handle-worker)
(finish #t))))
(else
(poll bothend-items #t)
(when (poll-items-in? bothend-items 0)
(unless (handle-worker)
(finish #t)))
(when (poll-items-in? bothend-items 1)
(unless (handle-client)
(finish #t))))))))
(close-socket frontend)
(close-socket backend)
(terminate-context (zmq-default-context))