-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprovide.go
299 lines (267 loc) · 11.1 KB
/
provide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
/*
* Copyright (c) 2021-present Sigma-Soft, Ltd. Aleksei Ponomarev
*/
package router2
import (
"context"
"crypto/tls"
"strings"
"github.com/voedger/voedger/pkg/in10n"
"github.com/voedger/voedger/pkg/iprocbusmem"
istructs "github.com/voedger/voedger/pkg/istructs"
coreutils "github.com/voedger/voedger/pkg/utils"
"fmt"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"strconv"
"time"
"github.com/gorilla/mux"
flag "github.com/spf13/pflag"
ibus "github.com/untillpro/airs-ibus"
"github.com/untillpro/goutils/logger"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/net/netutil"
)
func ProvideBP2(ctx context.Context, rp RouterParams, busTimeout time.Duration) []interface{} {
return ProvideBP3(ctx, rp, busTimeout, nil, in10n.Quotas{}, nil, nil, &implIBusBP2{}, nil)
}
// http -> return []interface{pipeline.IService(httpService)}, https -> []interface{pipeline.IService(httpsService), pipeline.IService(acmeService)}
func ProvideBP3(hvmCtx context.Context, rp RouterParams, aBusTimeout time.Duration, broker in10n.IN10nBroker, quotas in10n.Quotas, bp *BlobberParams, autocertCache autocert.Cache,
bus ibus.IBus, appsWSAmount map[istructs.AppQName]istructs.AppWSAmount) []interface{} {
httpService := httpService{
RouterParams: rp,
queues: rp.QueuesPartitions,
n10n: broker,
BlobberParams: bp,
bus: bus,
busTimeout: aBusTimeout,
appsWSAmount: appsWSAmount,
}
if bp != nil {
bp.procBus = iprocbusmem.Provide(bp.ServiceChannels)
for i := 0; i < bp.BLOBWorkersNum; i++ {
httpService.blobWG.Add(1)
go func() {
defer httpService.blobWG.Done()
blobMessageHandler(hvmCtx, bp.procBus.ServiceChannel(0, 0), bp.BLOBStorage, bus, aBusTimeout)
}()
}
}
if rp.Port != HTTPSPort {
return []interface{}{&httpService}
}
crtMgr := &autocert.Manager{
/*
В том случае если требуется тестировать выпуск большого количества сертификатов для разных доменов,
то нужно использовать тестовый контур компании. Для этого в Manager требуется переопределить DirectoryURL в клиенте на
https://acme-staging-v02.api.letsencrypt.org/directory :
Client: &acme.Client{
DirectoryURL: "https://acme-staging-v02.api.letsencrypt.org/directory",
},
поскольку есть квоты на выпуск сертификатов - на количество доменов, сертификатов в единицу времени и пр.
*/
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(rp.HTTP01ChallengeHosts...),
Cache: autocertCache,
}
if crtMgr.Cache == nil {
crtMgr.Cache = autocert.DirCache(rp.CertDir)
}
httpsService := &httpsService{
httpService: &httpService,
crtMgr: crtMgr,
}
// handle Lets Encrypt callback over 80 port - only port 80 allowed
acmeService := &acmeService{
Server: http.Server{
Addr: ":80",
ReadTimeout: DefaultACMEServerReadTimeout,
WriteTimeout: DefaultACMEServerWriteTimeout,
Handler: crtMgr.HTTPHandler(nil),
},
}
acmeServiceHadler := crtMgr.HTTPHandler(nil)
if logger.IsVerbose() {
acmeService.Handler = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
logger.Verbose("acme server request:", r.Method, r.Host, r.RemoteAddr, r.RequestURI, r.URL.String())
acmeServiceHadler.ServeHTTP(rw, r)
})
} else {
acmeService.Handler = acmeServiceHadler
}
return []interface{}{httpsService, acmeService}
}
func ProvideRouterParamsFromCmdLine() RouterParams {
fs := flag.NewFlagSet("", flag.ExitOnError)
rp := RouterParams{}
routes := []string{}
routesRewrite := []string{}
natsServers := ""
isVerbose := false
fs.StringVar(&natsServers, "ns", "", "The nats server URLs (separated by comma)")
fs.IntVar(&rp.Port, "p", DefaultRouterPort, "Server port")
fs.IntVar(&rp.WriteTimeout, "wt", DefaultRouterWriteTimeout, "Write timeout in seconds")
fs.IntVar(&rp.ReadTimeout, "rt", DefaultRouterReadTimeout, "Read timeout in seconds")
fs.IntVar(&rp.ConnectionsLimit, "cl", DefaultRouterConnectionsLimit, "Limit of incoming connections")
fs.BoolVar(&rp.Verbose, "v", false, "verbose, log raw NATS traffic")
// actual for airs-bp3 only
fs.StringSliceVar(&routes, "rht", []string{}, "reverse proxy </url-part-after-ip>=<target> mapping")
fs.StringSliceVar(&routesRewrite, "rhtr", []string{}, "reverse proxy </url-part-after-ip>=<target> rewriting mapping")
fs.StringSliceVar(&rp.HTTP01ChallengeHosts, "rch", []string{}, "HTTP-01 Challenge host for let's encrypt service. Must be specified if router-port is 443, ignored otherwise")
fs.StringVar(&rp.RouteDefault, "rhtd", "", "url to be redirected to if url is unknown")
fs.StringVar(&rp.CertDir, "rcd", ".", "SSL certificates dir")
_ = fs.Parse(os.Args[1:]) // os.Exit on error
if len(natsServers) > 0 {
_ = rp.NATSServers.Set(natsServers) // error impossible
}
if err := coreutils.PairsToMap(routes, rp.Routes); err != nil {
panic(err)
}
if err := coreutils.PairsToMap(routesRewrite, rp.RoutesRewrite); err != nil {
panic(err)
}
if isVerbose {
logger.SetLogLevel(logger.LogLevelVerbose)
}
return rp
}
func (s *httpsService) Prepare(work interface{}) error {
if err := s.httpService.Prepare(work); err != nil {
return err
}
s.server.TLSConfig = &tls.Config{GetCertificate: s.crtMgr.GetCertificate, MinVersion: tls.VersionTLS12} // VersionTLS13 is unsupported by Chargebee
return nil
}
func (s *httpsService) Run(ctx context.Context) {
log.Printf("Starting HTTPS server on %s\n", s.server.Addr)
logger.Info("HTTPS server Write Timeout: ", s.server.WriteTimeout)
logger.Info("HTTPS server Read Timeout: ", s.server.ReadTimeout)
if err := s.server.ServeTLS(s.listener, "", ""); err != http.ErrServerClosed {
log.Fatalf("Service.ServeTLS() failure: %s", err)
}
}
// pipeline.IService
func (s *httpService) Prepare(work interface{}) (err error) {
s.router = mux.NewRouter()
// https://dev.untill.com/projects/#!627072
s.router.SkipClean(true)
if err = s.registerHandlers(s.busTimeout, s.appsWSAmount); err != nil {
return err
}
port := strconv.Itoa(s.RouterParams.Port)
if s.listener, err = net.Listen("tcp", ":"+port); err != nil {
return err
}
if s.RouterParams.ConnectionsLimit > 0 {
s.listener = netutil.LimitListener(s.listener, s.RouterParams.ConnectionsLimit)
}
s.server = &http.Server{
Addr: ":" + port,
Handler: s.router,
ReadTimeout: time.Duration(s.RouterParams.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(s.RouterParams.WriteTimeout) * time.Second,
}
return nil
}
// pipeline.IService
func (s *httpService) Run(ctx context.Context) {
s.server.BaseContext = func(l net.Listener) context.Context {
return ctx // need to track both client disconnect and app finalize
}
logger.Info("Starting HTTP server on", s.listener.Addr().(*net.TCPAddr).String())
if err := s.server.Serve(s.listener); err != http.ErrServerClosed {
log.Println("main HTTP server failure: " + err.Error())
}
}
// pipeline.IService
func (s *httpService) Stop() {
// ctx here is used to avoid eternal waiting for close idle connections and listeners
// all connections and listeners are closed in the explicit way (they're tracks ctx.Done()) so it is not necessary to track ctx here
if err := s.server.Shutdown(context.Background()); err != nil {
log.Println("http server Shutdown() failed: " + err.Error())
s.listener.Close()
s.server.Close()
}
if s.n10n != nil {
for s.n10n.MetricNumSubcriptions() > 0 {
time.Sleep(subscriptionsCloseCheckInterval)
}
}
s.blobWG.Wait()
}
func (s *httpService) GetPort() int {
return s.listener.Addr().(*net.TCPAddr).Port
}
func (s *httpService) registerHandlers(busTimeout time.Duration, appsWSAmount map[istructs.AppQName]istructs.AppWSAmount) (err error) {
redirectMatcher, err := s.getRedirectMatcher()
if err != nil {
return err
}
s.router.HandleFunc("/api/check", corsHandler(checkHandler())).Methods("POST", "OPTIONS").Name("router check")
s.router.HandleFunc("/api", corsHandler(queueNamesHandler())).Name("queues names")
/*
launching app from localhost from browser. Trying to execute POST from web app within browser.
Browser sees that hosts differs: from localhost to alpha -> need CORS -> denies POST and executes the same request with OPTIONS header
-> need to allow OPTIONS
*/
if s.BlobberParams != nil {
s.router.Handle(fmt.Sprintf("/blob/{%s}/{%s}/{%s:[0-9]+}", bp3AppOwner, bp3AppName, wSIDVar), corsHandler(s.blobWriteRequestHandler())).
Methods("POST", "OPTIONS").
Name("blob write")
s.router.Handle(fmt.Sprintf("/blob/{%s}/{%s}/{%s:[0-9]+}/{%s:[0-9]+}", bp3AppOwner, bp3AppName, wSIDVar, bp3BLOBID), corsHandler(s.blobReadRequestHandler())).
Methods("POST", "GET", "OPTIONS").
Name("blob read")
}
if s.RouterParams.UseBP3 {
s.router.HandleFunc(fmt.Sprintf("/api/{%s}/{%s}/{%s:[0-9]+}/{%s:[a-zA-Z_/.]+}", bp3AppOwner, bp3AppName,
wSIDVar, resourceNameVar), corsHandler(partitionHandler(s.queues, s.bus, busTimeout, appsWSAmount))).
Methods("POST", "PATCH", "OPTIONS").Name("api")
} else {
s.router.HandleFunc(fmt.Sprintf("/api/{%s}/{%s:[0-9]+}/{%s:[a-zA-Z_/.]+}", queueAliasVar,
wSIDVar, resourceNameVar), corsHandler(partitionHandler(s.queues, s.bus, busTimeout, appsWSAmount))).
Methods("POST", "PATCH", "OPTIONS").Name("api")
}
s.router.Handle("/n10n/channel", corsHandler(s.subscribeAndWatchHandler())).Methods("GET")
s.router.Handle("/n10n/subscribe", corsHandler(s.subscribeHandler())).Methods("GET")
s.router.Handle("/n10n/unsubscribe", corsHandler(s.unSubscribeHandler())).Methods("GET")
s.router.Handle("/n10n/update/{offset:[0-9]{1,10}}", corsHandler(s.updateHandler()))
// pprof profile
s.router.Handle("/debug/pprof", http.HandlerFunc(pprof.Index))
s.router.Handle("/debug/cmdline", http.HandlerFunc(pprof.Cmdline))
s.router.Handle("/debug/profile", http.HandlerFunc(pprof.Profile))
s.router.Handle("/debug/symbol", http.HandlerFunc(pprof.Symbol))
s.router.Handle("/debug/trace", http.HandlerFunc(pprof.Trace))
s.router.Handle("/debug/{cmd}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
newPath, _ := strings.CutPrefix(r.URL.Path, "/debug/")
r.URL.Path = "/debug/pprof/" + newPath
pprof.Index(w, r)
})) // must be the last
// must be the last handler
s.router.MatcherFunc(redirectMatcher).Name("reverse proxy")
return nil
}
// pipeline.IService
func (s *acmeService) Prepare(work interface{}) error {
return nil
}
// pipeline.IService
func (s *acmeService) Run(ctx context.Context) {
s.BaseContext = func(l net.Listener) context.Context {
return ctx // need to track both client disconnect and app finalize
}
log.Println("Starting ACME HTTP server on :80")
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Println("ACME HTTP server failure: ", err.Error())
}
}
// pipeline.IService
func (s *acmeService) Stop() {
// ctx here is used to avoid eternal waiting for close idle connections and listeners
// all connections and listeners are closed in the explicit way so it is not necessary to track ctx
if err := s.Shutdown(context.Background()); err != nil {
s.Close()
}
}