-
Notifications
You must be signed in to change notification settings - Fork 2
/
motor.l
252 lines (205 loc) · 5.07 KB
/
motor.l
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
;; -*- mode: lisp -*-
(define ffi (require 'ffi))
(define buffer (require 'buffer))
(define-c ffi |
int socket(int domain, int type, int protocol);
int fcntl(int fildes, int cmd, ...);
typedef int socklen_t;
int bind(
int socket,
const struct sockaddr *address,
socklen_t address_len);
int listen(int socket, int backlog);
int accept(
int socket,
struct sockaddr *restrict address,
socklen_t *restrict address_len);
int setsockopt(
int socket,
int level,
int option_name,
const void *option_value,
socklen_t option_len);
typedef uint8_t sa_family_t;
typedef uint16_t in_port_t;
typedef uint32_t in_addr_t;
struct in_addr {
in_addr_t s_addr;
};
uint16_t htons(uint16_t hostshort);
uint32_t htonl(uint32_t hostlong);
typedef unsigned int nfds_t;
struct pollfd {
int fd;
short events;
short revents;
};
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int close(int fildes);
char *strerror(int errnum);
typedef unsigned int size_t;
typedef int ssize_t;
ssize_t read(int fildes, void *buf, size_t nbyte);
ssize_t write(int fildes, const void *buf, size_t nbyte);
|)
(require 'socket)
(define cstr ffi.string)
(define c ffi.C)
(define abort (name)
(let e (cstr (c.strerror (ffi.errno)))
(error (cat (or name 'error) ": " e))))
(define AF_INET 2)
(define SOCK_STREAM 1)
(define IPPROTO_TCP 6)
(define INADDR_ANY 0)
(define socket ()
(with fd (c.socket AF_INET SOCK_STREAM IPPROTO_TCP)
(when (< fd 0)
(abort 'socket))
(let (a (ffi.new "int[1]" 1)
n (ffi.sizeof "int")
x (c.setsockopt fd SOL_SOCKET SO_REUSEADDR a n))
(when (< x 0)
(abort 'setsockopt)))))
(define bind (port)
(with fd (socket)
(let (p (ffi.new "struct sockaddr_in[1]")
n (ffi.sizeof "struct sockaddr_in")
a (get p 0))
(set a.sin_family AF_INET)
(set a.sin_port (c.htons port))
(set a.sin_addr.s_addr (c.htonl INADDR_ANY))
(let (p (ffi.cast "struct sockaddr*" p)
x (c.bind fd p n))
(when (< x 0)
(abort 'bind)))
(let x (c.listen fd 10)
(when (< x 0)
(abort 'listen))))))
(define POLLNONE 0x0000)
(define POLLIN 0x0001)
(define POLLOUT 0x0004)
(define POLLERR 0x0008)
(define POLLHUP 0x0010)
(define POLLNVAL 0x0020)
(define threads ())
(define error? (v)
(> v 0x0007))
(define close (fd)
(when (< (c.close fd) 0)
(abort 'close)))
(define active (fd)
(get (get threads fd) 'thread))
(define enter (fd thread final)
(let (f (or final (fn () (close fd)))
x (list fd: fd
thread: thread
final: f
events: POLLNONE))
(set (get threads fd) x)))
(define leave (fd)
(let x (get threads fd)
(x.final))
(set (get threads fd) nil))
(define cleanup ()
(each (fd x) threads
(leave fd)))
(define dead? (c)
(= (coroutine.status c) 'dead))
(define run (t fd)
(let |b,e| (resume t)
(unless b
(print (cat "error:" e)))
(when (dead? t)
(leave fd))))
(define polls ()
(with ps ()
(each x threads
(let (p (ffi.new "struct pollfd"))
(set p.fd x.fd)
(set p.events x.events)
(add ps p)))))
(define tick (a n)
(for i n
(let ((:fd revents: r) (get a i)
(thread: t events: v) (get threads fd))
(if (or (dead? t) (error? r))
(leave fd)
(or (= v POLLNONE) (> r 0))
(run t fd)))))
(define IMMEDIATE 0)
(define NEVER -1)
(define timeout ()
(if (find (fn (x)
(= x.events POLLNONE))
threads)
IMMEDIATE
NEVER))
(define loop ()
(while (not (empty? threads))
(let (p (polls)
n (# p)
a (ffi.new "struct pollfd[?]" n p)
t (timeout p))
(c.poll a n t)
(tick a n))))
(define start ()
(let ((x e) (guard (loop)))
(when e
(print (cat "error: " e))))
(cleanup))
(define F_SETFL 4)
(define O_NONBLOCK 0x0004)
(define accept (fd)
(with fd (c.accept fd nil nil)
(when (< fd 0)
(abort 'accept))
(c.fcntl fd F_SETFL O_NONBLOCK)))
(define wait (fd o)
(let (x (get threads fd)
v (if (= o 'out) POLLOUT POLLIN))
(set x.events v))
(yield))
(define listen (port f)
(let fd (bind port)
(define connect ()
(wait fd)
(let (fd (accept fd)
f (fn () (f fd)))
(enter fd (thread f)))
(connect (yield)))
(enter fd (thread connect))))
(define read (fd b)
(wait fd)
(let n (buffer.space b)
(when (> n 0)
(let p (buffer.pointer b)
(with x (c.read fd p n)
(when (< x 0)
(abort 'read))
(inc b.length x))))))
(define receive (fd)
(let (b (buffer.create)
n (read fd b))
(when (> n 0)
(buffer.string b))))
(define write (fd p n)
(wait fd 'out)
(with x (c.write fd p n)
(when (< x 0)
(abort 'send))))
(define send (fd s)
(let (i 0 n (# s)
b (ffi.cast "const char*" s))
(while (< i n)
(let x (write fd (+ b i) (- n i))
(inc i x)))))
(export active
enter
wait
listen
read
receive
write
send
start)