-
Notifications
You must be signed in to change notification settings - Fork 0
/
rdt_server.lua
221 lines (197 loc) · 5.07 KB
/
rdt_server.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
--[[
由服务器发起握手流程如下:
server (HANDSHAKE_1)---> client
server <------(HANDSHAKE_2) client
server <--(handshake succ)---> client
]]
-- package.cpath = package.cpath .. ";./?.so"
local SOCKET = require "lsocket"
local SERVER = require "lsocket.server"
local port = assert(tonumber(...))
local so = assert(SOCKET.bind(port))
local SESSION_START = 10000
--rdt session 是否握手成功
local enable = false
local readsocket = { so }
local client = {}
local fds = {}
local enable = {}
local current_id
local fd2rdt = {}
local session_2_delete = {}
function string.split( line, sep, maxsplit )
if not line or string.len(line) == 0 then
return {}
end
sep = sep or ' '
maxsplit = maxsplit or 0
local retval = {}
local idx = 0
local pos = 1
local step = 0
local item, from, to
while true do
from, to = string.find(line, sep, pos, true)
step = step + 1
if (maxsplit ~= 0 and step > maxsplit) or from == nil then
item = string.sub(line, pos)
idx = idx + 1
retval[idx] = item
break
else
item = string.sub(line, pos, from-1)
idx = idx + 1
retval[idx] = item
pos = to + 1
end
end
return retval
end
local function sendmsgbyso(so, msg)
msg = SOCKET.pack_msg(msg)
local len = 0
while true do
len = so:send(msg:sub(len + 1))
if len then
if len == #msg then
break
end
else
error("write failed")
end
end
end
local function sendmsgbyrdt(session_id, msg)
-- msg = SOCKET.pack_msg(msg)
SERVER.rdt_send(session_id, msg)
end
local function accept(c)
table.insert(readsocket, c)
local fd = c:info().fd
client[c] = fd
fds[fd] = c
end
local function close(s)
local fd = client[s]
local session_id = fd2rdt[fd]
fds[client[s]] = nil
client[s] = nil
fd2rdt[fd] = nil
local tmp = readsocket
readsocket = {}
for _,v in pairs(tmp) do
if v ~= s then
table.insert(readsocket, v)
end
end
--不删除引擎里的rdt对象,因为后面要演示如何重连
if session_id then
print("disable rdt session: ", session_id)
enable[session_id] = false
session_2_delete[session_id] = os.time()
end
end
local function on_recv_raw(s, str)
local args = string.split(str, " ")
local cmd = args[1]
if cmd == "HANDSHAKE_2" then
local fd = client[s]
local session_id = fd2rdt[fd]
SERVER.rdt_create(session_id)
enable[session_id] = true
print("rdt handshake succ: ", fd, session_id)
sendmsgbyrdt(session_id, "HANDSHAKE_3")
elseif cmd == "LOGIN" then
--客户端登录时,进行rdt handshake
local fd = client[s]
SESSION_START = SESSION_START + 1
local session_id = SESSION_START
fd2rdt[fd] = session_id
enable[session_id] = false
local msg = "HANDSHAKE_1 " .. session_id
sendmsgbyso(s, msg)
elseif cmd == "RECONN_HANDSHAKE_1" then
local fd = client[s]
local session_id = tonumber(args[2])
fd2rdt[fd] = session_id
enable[session_id] = false
print("RECONN_HANDSHAKE_1: ", fd, session_id)
SERVER.rdt_disable(session_id)
sendmsgbyso(s, "RECONN_HANDSHAKE_2")
SERVER.rdt_reconnect(session_id)
-- poll()
local t, sid, msg = SERVER.rdt_poll(session_id)
assert(t == 2, t)
sendmsgbyso(s, msg)
enable[session_id] = true
else
print("server: ", str)
end
end
local function recv(s)
local fd = client[s]
local session_id = fd2rdt[fd]
local str, err = s:recv_packet()
if str then
if session_id and enable[session_id] then
SERVER.rdt_recv(session_id, str)
else
on_recv_raw(s, str)
end
elseif str == nil then
print("disconnect", fd)
-- report closed
-- pool:recv(fd, "")
close(s)
current_id = nil
end
end
local function poll()
local msg_in = {}
for fd, session_id in pairs(fd2rdt) do
if enable[session_id] then
while true do
local t, sid, msg = SERVER.rdt_poll(session_id)
if t == nil then
break
elseif t == 1 then
--message in
msg_in[fd] = session_id
print("<==========:", sid, msg)
else
local so = assert(fds[fd])
sendmsgbyso(so, msg)
end
end
end
end
return msg_in
end
--当重连成功后,引擎会回调该函数
function _G.OnSessionReconnected(session_id)
-- enable[session_id] = true
print("server reconnect succ: ", session_id)
end
print("start server: ", port)
while true do
local r = SOCKET.select(readsocket)
local t = 0
for _, s in ipairs(r) do
if s == so then
local c, ip, port = so:accept()
if c then
print("accept :", ip, port)
accept(c)
end
else
recv(s)
local msg_in = poll()
for fd, session_id in pairs(msg_in) do
local msg = tostring(t) .. "\n"
sendmsgbyrdt(session_id, msg)
end
t=t+1
poll()
end
end
end