-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
116 lines (95 loc) · 2.27 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package packetize
import (
"context"
"encoding/binary"
"errors"
bufpool "github.com/valyala/bytebufferpool"
"io"
"net"
"time"
)
type Conn struct {
conn net.Conn
readDeadline <-chan time.Time
packetChan chan []byte
closeCtx context.Context
close context.CancelFunc
}
func (conn *Conn) Read(b []byte) (n int, err error) {
select {
case packet := <-conn.packetChan:
if len(b) < len(packet) {
err = errors.New("message sent on a socket was larger than the buffer used to receive the message into")
}
return copy(b, packet), err
case <-conn.closeCtx.Done():
return 0, errors.New("error reading from conn: connection closed")
case <-conn.readDeadline:
return 0, errors.New("error reading from conn: read timeout")
}
}
func (conn *Conn) Write(b []byte) (n int, err error) {
if conn.closeCtx.Err() != nil {
return 0, errors.New("error writing to conn: connection closed")
}
pk := bufpool.Get()
defer bufpool.Put(pk)
err = binary.Write(pk, binary.BigEndian, uint16(len(b)))
if err != nil {
return
}
_, err = pk.Write(b)
if err != nil {
return
}
_, err = conn.conn.Write(pk.B)
return
}
func (conn *Conn) Close() error {
if conn.closeCtx.Err() != nil {
return errors.New("conn is already closed")
}
conn.close()
return conn.conn.Close()
}
func (conn *Conn) LocalAddr() net.Addr {
return conn.conn.LocalAddr()
}
func (conn *Conn) RemoteAddr() net.Addr {
return conn.conn.RemoteAddr()
}
func (conn *Conn) SetDeadline(t time.Time) error {
return conn.SetReadDeadline(t)
}
func (conn *Conn) SetReadDeadline(t time.Time) error {
if t.IsZero() {
conn.readDeadline = make(chan time.Time)
return nil
}
if t.Before(time.Now()) {
return errors.New("read deadline cannot be before now")
}
conn.readDeadline = time.After(t.Sub(time.Now()))
return nil
}
func (conn *Conn) SetWriteDeadline(time.Time) error {
return nil
}
func (conn *Conn) process() {
lenBuf := make([]byte, 2)
for conn.closeCtx.Err() == nil {
length, err := io.ReadFull(conn.conn, lenBuf)
if err != nil || length != 2 {
_ = conn.Close()
break
}
pkLen := int(binary.BigEndian.Uint16(lenBuf))
pk := make([]byte, pkLen)
length, err = io.ReadFull(conn.conn, pk)
if err != nil || length != pkLen {
_ = conn.Close()
break
}
conn.packetChan <- pk
}
}