-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcertstream.go
152 lines (132 loc) · 4.4 KB
/
certstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package certstream
import (
"encoding/json"
"fmt"
"time"
"github.com/gorilla/websocket"
)
const (
PingPeriod time.Duration = 15 * time.Second
DefaultTimeout = 15
DefaultSleep = 5
)
type Message struct {
MessageType string `json:"message_type"`
Data struct {
CertIndex int `json:"cert_index"`
CertLink string `json:"cert_link"`
LeafCert struct {
AllDomains []string `json:"all_domains"`
Extensions struct {
AuthorityInfoAccess string `json:"authorityInfoAccess"`
AuthorityKeyIdentifier string `json:"authorityKeyIdentifier"`
BasicConstraints string `json:"basicConstraints"`
CertificatePolicies string `json:"certificatePolicies"`
CtlPoisonByte bool `json:"ctlPoisonByte"`
ExtendedKeyUsage string `json:"extendedKeyUsage"`
KeyUsage string `json:"keyUsage"`
SubjectAltName string `json:"subjectAltName"`
SubjectKeyIdentifier string `json:"subjectKeyIdentifier"`
} `json:"extensions"`
Fingerprint string `json:"fingerprint"`
NotAfter int `json:"not_after"`
NotBefore int `json:"not_before"`
SerialNumber string `json:"serial_number"`
SignatureAlgorithm string `json:"signature_algorithm"`
Subject Name `json:"subject"`
Issuer Name `json:"issuer"`
IsCA bool `json:"is_ca"`
} `json:"leaf_cert"`
Seen float64 `json:"seen"`
Source Source `json:"source"`
UpdateType string `json:"update_type"`
} `json:"data"`
}
type Name struct {
C string `json:"C"`
CN string `json:"CN"`
L string `json:"L"`
O string `json:"O"`
OU string `json:"OU"`
ST string `json:"ST"`
Aggregated string `json:"aggregated"`
EmailAddress string `json:"email_address"`
}
type Source struct {
Name string `json:"name"`
URL string `json:"url"`
}
// EventStream establishes a connection to the CertStream server and returns two channels:
//
// outputStream: a channel to receive CertStream messages
// errStream: a channel to receive any errors that occur during the connection
//
// The function takes the following parameters:
//
// skipHeartbeats: a boolean indicating whether to skip heartbeat messages
// certstreamServerURL: the URL of the CertStream server
// timeout: the timeout duration in seconds (0 for default timeout)
func EventStream(skipHeartbeats bool, certstreamServerURL string, timeout int) (chan Message, chan error) {
if timeout == 0 {
timeout = DefaultTimeout
}
outputStream := make(chan Message)
errStream := make(chan error)
go connectAndListen(certstreamServerURL, timeout, skipHeartbeats, outputStream, errStream)
return outputStream, errStream
}
func connectAndListen(url string, timeout int, skipHeartbeats bool, outputStream chan Message, errStream chan error) {
for {
c, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
errStream <- fmt.Errorf("Error connecting to certstream: %w", err)
time.Sleep(DefaultSleep * time.Second)
continue
}
done := make(chan struct{})
go sendPingMessages(c, done, errStream)
if err := readMessages(c, timeout, skipHeartbeats, outputStream); err != nil {
errStream <- fmt.Errorf("Error reading messages: %w", err)
close(done)
c.Close()
time.Sleep(DefaultSleep * time.Second)
continue
}
close(done)
c.Close()
}
}
func sendPingMessages(c *websocket.Conn, done chan struct{}, errStream chan error) {
ticker := time.NewTicker(PingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
errStream <- fmt.Errorf("Error sending ping message: %w", err)
return
}
case <-done:
return
}
}
}
func readMessages(c *websocket.Conn, timeout int, skipHeartbeats bool, outputStream chan Message) error {
for {
if err := c.SetReadDeadline(time.Now().Add(time.Duration(timeout) * time.Second)); err != nil {
return fmt.Errorf("Error creating wss deadline: %w", err)
}
_, rawMessage, err := c.ReadMessage()
if err != nil {
return fmt.Errorf("Error reading message: %w", err)
}
var message Message
if err := json.Unmarshal(rawMessage, &message); err != nil {
return fmt.Errorf("Error unmarshalling certstream message: %w", err)
}
if skipHeartbeats && message.MessageType == "heartbeat" {
continue
}
outputStream <- message
}
}