From 2356308d460ff877a9f826f955224d2ea6ad8ab2 Mon Sep 17 00:00:00 2001 From: gorexlv Date: Thu, 19 Aug 2021 15:43:03 +0800 Subject: [PATCH] Feat/app runtime (#251) * refactor application; remove multi register; polish codes; fix tests bugs; --- example/all/cmd/demo/main.go | 8 - example/all/config/config.toml | 12 - example/all/internal/app/demo/engine.go | 4 +- example/build/main.go | 7 - example/client/etcd/main.go | 2 +- example/grpc/direct/direct-server/main.go | 2 +- example/grpc/etcd/etcd-server/main.go | 9 +- example/http/all/main.go | 7 - example/http/register/main.go | 7 - example/monitor/main.go | 11 +- example/startup/main.go | 2 +- go.mod | 5 + go.sum | 22 + pkg/application/application.go | 9 + pkg/application/application_test.go | 973 +++++++++++----------- pkg/autoproc/autoproc.go | 7 +- pkg/client/etcdv3/client_test.go | 23 +- pkg/client/redis/redis_test.go | 14 +- pkg/component/component.go | 50 ++ pkg/conf/conf.go | 17 +- pkg/conf/conf_test.go | 15 + pkg/conf/datasource.go | 4 +- pkg/conf/init.go | 8 +- pkg/elect/elect.go | 30 + pkg/elect/elector.go | 114 +++ pkg/elect/etcdelector/elect.go | 15 + pkg/elect/memelector/elect.go | 52 ++ pkg/elect/pgelector/elect.go | 109 +++ pkg/elect/rediselector/elect.go | 40 + pkg/flag/flag.go | 1 + pkg/metric/metric.go | 8 + pkg/registry/etcdv3/registry_test.go | 21 +- pkg/registry/init.go | 2 + pkg/registry/service.go | 30 + pkg/server/xgoframe/server.go | 6 + pkg/util/xcolor/string_darwin.go | 4 + pkg/util/xcolor/string_linux.go | 4 + pkg/util/xcolor/string_windows.go | 5 + pkg/util/xgo/chan.go | 24 + tools/ast_codes/main.go | 2 +- 40 files changed, 1117 insertions(+), 568 deletions(-) create mode 100644 pkg/component/component.go create mode 100644 pkg/conf/conf_test.go create mode 100644 pkg/elect/elect.go create mode 100644 pkg/elect/elector.go create mode 100644 pkg/elect/etcdelector/elect.go create mode 100644 pkg/elect/memelector/elect.go create mode 100644 pkg/elect/pgelector/elect.go create mode 100644 pkg/elect/rediselector/elect.go create mode 100644 pkg/registry/service.go create mode 100644 pkg/util/xgo/chan.go diff --git a/example/all/cmd/demo/main.go b/example/all/cmd/demo/main.go index 1c3bffdd71..aa68462a55 100644 --- a/example/all/cmd/demo/main.go +++ b/example/all/cmd/demo/main.go @@ -18,19 +18,11 @@ import ( "log" "github.com/douyu/jupiter/example/all/internal/app/demo" - "github.com/douyu/jupiter/pkg/registry/compound" - "github.com/douyu/jupiter/pkg/registry/etcdv3" ) func main() { eng := demo.NewEngine() - eng.SetRegistry( // 多注册中心 - compound.New( - etcdv3.StdConfig("wh01").MustBuild(), - ), - ) - if err := eng.Run(); err != nil { log.Fatal(err) } diff --git a/example/all/config/config.toml b/example/all/config/config.toml index c33c315d35..84aa7dbae4 100644 --- a/example/all/config/config.toml +++ b/example/all/config/config.toml @@ -2,18 +2,6 @@ port = 20105 [jupiter.server.grpc] port = 20102 - -[jupiter.etcdv3.default] - endpoints=["127.0.0.1:2379"] - secure = false - -[jupiter.registry.wh01] - configKey="jupiter.etcdv3.default" - timeout = "1s" - -[jupiter.registry.wh02] - configKey="jupiter.etcdv3.default" - timeout = "1s" [jupiter.cron.demo] immediatelyRun = true diff --git a/example/all/internal/app/demo/engine.go b/example/all/internal/app/demo/engine.go index 05fb0fc047..9a3095ee80 100644 --- a/example/all/internal/app/demo/engine.go +++ b/example/all/internal/app/demo/engine.go @@ -17,7 +17,6 @@ package demo import ( "time" - sentinel_echo "github.com/alibaba/sentinel-golang/adapter/echo" "github.com/alibaba/sentinel-golang/core/flow" "github.com/douyu/jupiter" "github.com/douyu/jupiter/example/all/internal/app/greeter" @@ -28,9 +27,10 @@ import ( "github.com/douyu/jupiter/pkg/worker/xcron" "github.com/douyu/jupiter/pkg/xlog" "github.com/labstack/echo/v4" + sentinel_echo "github.com/sentinel-group/sentinel-go-adapters/echo" "google.golang.org/grpc/examples/helloworld/helloworld" - sentinel_grpc "github.com/alibaba/sentinel-golang/adapter/grpc" + sentinel_grpc "github.com/sentinel-group/sentinel-go-adapters/grpc" ) type Engine struct { diff --git a/example/build/main.go b/example/build/main.go index f24304344b..3e3f4c7224 100644 --- a/example/build/main.go +++ b/example/build/main.go @@ -16,8 +16,6 @@ package main import ( "github.com/douyu/jupiter" - compound_registry "github.com/douyu/jupiter/pkg/registry/compound" - etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/server/xecho" "github.com/douyu/jupiter/pkg/xlog" "github.com/labstack/echo/v4" @@ -25,11 +23,6 @@ import ( func main() { eng := NewEngine() - eng.SetRegistry( - compound_registry.New( - etcdv3_registry.StdConfig("test").Build(), - ), - ) if err := eng.Run(); err != nil { xlog.Error(err.Error()) } diff --git a/example/client/etcd/main.go b/example/client/etcd/main.go index ada97dad20..8a96649e81 100644 --- a/example/client/etcd/main.go +++ b/example/client/etcd/main.go @@ -28,7 +28,7 @@ func main() { eng := &jupiter.Application{} err := eng.Startup( func() error { - client := etcdv3.StdConfig("myetcd").Build() + client := etcdv3.StdConfig("myetcd").MustBuild() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() diff --git a/example/grpc/direct/direct-server/main.go b/example/grpc/direct/direct-server/main.go index df8af42918..e56d50f89d 100644 --- a/example/grpc/direct/direct-server/main.go +++ b/example/grpc/direct/direct-server/main.go @@ -47,7 +47,7 @@ func NewEngine() *Engine { } func (eng *Engine) serveGRPC() error { - server := xgrpc.StdConfig("grpc").Build() + server := xgrpc.StdConfig("grpc").MustBuild() helloworld.RegisterGreeterServer(server.Server, new(Greeter)) return eng.Serve(server) } diff --git a/example/grpc/etcd/etcd-server/main.go b/example/grpc/etcd/etcd-server/main.go index 71a654fd64..e831d67a8a 100644 --- a/example/grpc/etcd/etcd-server/main.go +++ b/example/grpc/etcd/etcd-server/main.go @@ -17,8 +17,6 @@ package main import ( "context" "github.com/douyu/jupiter" - compound_registry "github.com/douyu/jupiter/pkg/registry/compound" - etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/server/xgrpc" "github.com/douyu/jupiter/pkg/xlog" "google.golang.org/grpc/examples/helloworld/helloworld" @@ -26,11 +24,6 @@ import ( func main() { eng := NewEngine() - eng.SetRegistry( - compound_registry.New( - etcdv3_registry.StdConfig("wh").Build(), - ), - ) //eng.SetGovernor("0.0.0.0:0") if err := eng.Run(); err != nil { xlog.Error(err.Error()) @@ -52,7 +45,7 @@ func NewEngine() *Engine { } func (eng *Engine) serveGRPC() error { - server := xgrpc.StdConfig("grpc").Build() + server := xgrpc.StdConfig("grpc").MustBuild() helloworld.RegisterGreeterServer(server.Server, &Greeter{ server: server, }) diff --git a/example/http/all/main.go b/example/http/all/main.go index da3e3f72fb..02a9d6743a 100644 --- a/example/http/all/main.go +++ b/example/http/all/main.go @@ -16,8 +16,6 @@ package main import ( "github.com/douyu/jupiter" - compound_registry "github.com/douyu/jupiter/pkg/registry/compound" - etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/server/xecho" "github.com/douyu/jupiter/pkg/xlog" "github.com/labstack/echo/v4" @@ -26,11 +24,6 @@ import ( func main() { eng := NewEngine() // 多注册中心 - eng.SetRegistry( - compound_registry.New( - etcdv3_registry.StdConfig("wh").Build(), - ), - ) if err := eng.Run(); err != nil { xlog.Panic(err.Error()) diff --git a/example/http/register/main.go b/example/http/register/main.go index 3e754590c2..88f713ce15 100644 --- a/example/http/register/main.go +++ b/example/http/register/main.go @@ -16,8 +16,6 @@ package main import ( "github.com/douyu/jupiter" - compound_registry "github.com/douyu/jupiter/pkg/registry/compound" - etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/server/xecho" "github.com/douyu/jupiter/pkg/xlog" "github.com/labstack/echo/v4" @@ -25,11 +23,6 @@ import ( func main() { eng := NewEngine() - eng.SetRegistry( - compound_registry.New( - etcdv3_registry.StdConfig("wh").Build(), - ), - ) if err := eng.Run(); err != nil { xlog.Panic(err.Error()) } diff --git a/example/monitor/main.go b/example/monitor/main.go index cdc017fdd6..9f31318b04 100644 --- a/example/monitor/main.go +++ b/example/monitor/main.go @@ -21,8 +21,6 @@ import ( "github.com/douyu/jupiter" "github.com/douyu/jupiter/pkg/client/etcdv3" - compound_registry "github.com/douyu/jupiter/pkg/registry/compound" - etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3" "github.com/douyu/jupiter/pkg/server/xgrpc" "github.com/douyu/jupiter/pkg/xlog" "google.golang.org/grpc/examples/helloworld/helloworld" @@ -30,11 +28,6 @@ import ( func main() { eng := NewEngine() - eng.SetRegistry( - compound_registry.New( - etcdv3_registry.StdConfig("wh").Build(), - ), - ) if err := eng.Run(); err != nil { xlog.Error(err.Error()) } @@ -49,7 +42,7 @@ func NewEngine() *Engine { if err := eng.Startup( eng.serveGRPC, func() error { - client := etcdv3.StdConfig("myetcd").Build() + client := etcdv3.StdConfig("myetcd").MustBuild() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() @@ -67,7 +60,7 @@ func NewEngine() *Engine { } func (eng *Engine) serveGRPC() error { - server := xgrpc.StdConfig("grpc").Build() + server := xgrpc.StdConfig("grpc").MustBuild() helloworld.RegisterGreeterServer(server.Server, new(Greeter)) return eng.Serve(server) } diff --git a/example/startup/main.go b/example/startup/main.go index 513cdbefb2..637b2b60a4 100644 --- a/example/startup/main.go +++ b/example/startup/main.go @@ -44,7 +44,7 @@ func startHTTPServer() server.Server { } func startGRPCServer() server.Server { - server := xgrpc.DefaultConfig().Build() + server := xgrpc.DefaultConfig().MustBuild() // helloworld.RegisterGreeterServer(server.Server, new(greeter.Greeter)) return server } diff --git a/go.mod b/go.mod index c0fb71fe6a..6b32ef46b7 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/douyu/jupiter go 1.14 require ( + cirello.io/pglock v1.8.0 // indirect github.com/BurntSushi/toml v0.3.1 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/alibaba/sentinel-golang v1.0.2 + github.com/alicebob/miniredis/v2 v2.15.1 github.com/apache/rocketmq-client-go/v2 v2.0.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 @@ -41,6 +43,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.6.0 github.com/robfig/cron/v3 v3.0.1 + github.com/sentinel-group/sentinel-go-adapters v1.0.1 // indirect github.com/sirupsen/logrus v1.7.0 // indirect github.com/smallnest/weighted v0.0.0-20200122032019-adf21c9b8bd1 github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect @@ -53,9 +56,11 @@ require ( github.com/uber/jaeger-client-go v2.23.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect github.com/urfave/cli v1.22.5 + go.etcd.io/etcd v3.3.25+incompatible // indirect go.mongodb.org/mongo-driver v1.5.1 go.uber.org/atomic v1.6.0 // indirect go.uber.org/automaxprocs v1.3.0 + go.uber.org/goleak v1.1.10 go.uber.org/multierr v1.5.0 go.uber.org/zap v1.15.0 golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 // indirect diff --git a/go.sum b/go.sum index ced5370fc9..75121a5f10 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cirello.io/pglock v1.8.0 h1:YmXjZ+zE2c6cuRP2efbRDKnk/qu36g0wbshlJetRIzM= +cirello.io/pglock v1.8.0/go.mod h1:iO/b3K4gTIIKO3DhR8t1mYjtjI6tQJhAED2o9oXtP4I= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -34,6 +36,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOCMdNH+CiXJVYJh5UQNCOBG7jbELc= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Jeffail/gabs v1.1.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -74,8 +77,13 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1C github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alibaba/sentinel-golang v0.6.0 h1:YapgS9Obe8ML7+uiIDPq4BktdGb+5WmTDQEIDPVZhUI= github.com/alibaba/sentinel-golang v0.6.0/go.mod h1:kxBXAyz2RXPFTjgcfnHKPlMaKb9BQrTbd9Qs02MNHKA= +github.com/alibaba/sentinel-golang v1.0.1/go.mod h1:QsB99f/z35D2AiMrAWwgWE85kDTkBUIkcmPrRt+61NI= github.com/alibaba/sentinel-golang v1.0.2 h1:Acopq74hOtZN4MV1v811MQ6QcqPFLDSczTrRXv9zpIg= github.com/alibaba/sentinel-golang v1.0.2/go.mod h1:QsB99f/z35D2AiMrAWwgWE85kDTkBUIkcmPrRt+61NI= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k= +github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190808125512-07798873deee/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= @@ -119,6 +127,9 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/clbanning/mxj v1.8.5-0.20200714211355-ff02cfb8ea28 h1:LdXxtjzvZYhhUaonAaAKArG3pyC67kGL3YY+6hGG8G4= @@ -608,6 +619,7 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPp github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linode/linodego v0.10.0/go.mod h1:cziNP7pbvE3mXIPneHj0oRY8L1WtGEIKlZ8LANE4eXA= @@ -802,6 +814,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/sentinel-group/sentinel-go-adapters v1.0.1 h1:2RcslVCtIuku+MKLnDB5cPWNmPWL8nqIfhXavrjgJPM= +github.com/sentinel-group/sentinel-go-adapters v1.0.1/go.mod h1:hA8nOCLTDnp4Scf3PLDjMe3XoXZ1Anw53iWJnfzqgi0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v2.19.12+incompatible h1:WRstheAymn1WOPesh+24+bZKFkqrdCR8JOc77v4xV3Q= @@ -933,6 +947,8 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32 h1:5tjfNdR2ki3yYQ842+eX2sQHeiwpKJ0RnHO4IYOc4V8= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= +github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8/go.mod h1:S1cAa98KMFv4Sa8SbJ6ZtvOmf0VlgH0QJ1gXI0lBfBY= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -940,6 +956,8 @@ go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= +go.etcd.io/etcd v3.3.25+incompatible h1:V1RzkZJj9LqsJRy+TUBgpWSbZXITLB819lstuTFoZOY= +go.etcd.io/etcd v3.3.25+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.mongodb.org/mongo-driver v1.4.1 h1:38NSAyDPagwnFpUA/D5SFgbugUYR3NzYRNa4Qk9UxKs= go.mongodb.org/mongo-driver v1.4.1/go.mod h1:llVBH2pkj9HywK0Dtdt6lDikOjFLbceHVu/Rc0iMKLs= go.mongodb.org/mongo-driver v1.5.1 h1:9nOVLGDfOaZ9R0tBumx/BcuqkbFpyTCU2r/Po7A2azI= @@ -956,6 +974,8 @@ go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/automaxprocs v1.3.0 h1:II28aZoGdaglS5vVNnspf28lnZpXScxtIozx1lAjdb0= go.uber.org/automaxprocs v1.3.0/go.mod h1:9CWT6lKIep8U41DDaPiH6eFscnTyjfTANNQNx6LrIcA= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -1087,6 +1107,7 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190221075227-b4e8571b14e0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1173,6 +1194,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/application/application.go b/pkg/application/application.go index 7cab9a2f55..5fc5f1f7de 100644 --- a/pkg/application/application.go +++ b/pkg/application/application.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/douyu/jupiter/pkg/component" job "github.com/douyu/jupiter/pkg/worker/xjob" "github.com/BurntSushi/toml" @@ -37,6 +38,7 @@ import ( "github.com/douyu/jupiter/pkg/signals" "github.com/douyu/jupiter/pkg/util/xcolor" "github.com/douyu/jupiter/pkg/util/xcycle" + "github.com/douyu/jupiter/pkg/util/xdebug" "github.com/douyu/jupiter/pkg/util/xdefer" "github.com/douyu/jupiter/pkg/util/xgo" "github.com/douyu/jupiter/pkg/worker" @@ -68,6 +70,7 @@ type Application struct { disableMap map[Disable]bool HideBanner bool stopped chan struct{} + components []component.Component } // New create a new Application instance @@ -124,6 +127,7 @@ func (app *Application) initialize() { app.configParser = toml.Unmarshal app.disableMap = make(map[Disable]bool) app.stopped = make(chan struct{}) + app.components = make([]component.Component, 0) //private method app.initHooks(StageBeforeStop, StageAfterStop) @@ -225,6 +229,7 @@ func (app *Application) Job(runner job.Runner) error { } // SetRegistry set customize registry +// Deprecated, please use registry.DefaultRegisterer instead. func (app *Application) SetRegistry(reg registry.Registry) { registry.DefaultRegisterer = reg } @@ -522,6 +527,10 @@ func (app *Application) printBanner() error { return nil } + if xdebug.IsTestingMode() { + return nil + } + const banner = ` (_)_ _ _ __ (_) |_ ___ _ __ | | | | | '_ \| | __/ _ \ '__| diff --git a/pkg/application/application_test.go b/pkg/application/application_test.go index a6002bd2bc..ef65271404 100644 --- a/pkg/application/application_test.go +++ b/pkg/application/application_test.go @@ -1,489 +1,490 @@ -// Copyright 2021 rex lv -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// // Copyright 2021 rex lv +// // +// // Licensed under the Apache License, Version 2.0 (the "License"); +// // you may not use this file except in compliance with the License. +// // You may obtain a copy of the License at +// // +// // http://www.apache.org/licenses/LICENSE-2.0 +// // +// // Unless required by applicable law or agreed to in writing, software +// // distributed under the License is distributed on an "AS IS" BASIS, +// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// // See the License for the specific language governing permissions and +// // limitations under the License. package application -import ( - "context" - "errors" - "fmt" - "testing" - "time" - - "github.com/douyu/jupiter/pkg/server/xgrpc" - "github.com/stretchr/testify/assert" - - "github.com/douyu/jupiter/pkg/server" - . "github.com/smartystreets/goconvey/convey" -) - -type testServer struct { - ServeBlockTime time.Duration - ServeErr error - - StopBlockTime time.Duration - StopErr error - - GstopBlockTime time.Duration - GstopErr error -} - -func (s *testServer) Serve() error { - time.Sleep(s.ServeBlockTime) - return s.ServeErr -} -func (s *testServer) Stop() error { - time.Sleep(s.StopBlockTime) - return s.StopErr -} -func (s *testServer) GracefulStop(ctx context.Context) error { - time.Sleep(s.GstopBlockTime) - return s.GstopErr -} -func (s *testServer) Info() *server.ServiceInfo { - return &server.ServiceInfo{} -} -func (s *testServer) Healthz() bool { - return true -} - -var errTest = fmt.Errorf("test error") - -func Test_Unit_Application_New(t *testing.T) { - t.Run("no params", func(t *testing.T) { - _, err := New() - assert.Nil(t, err) - }) - t.Run("without error", func(t *testing.T) { - fn := func() error { - return nil - } - _, err := New(fn) - assert.Nil(t, err) - }) - t.Run("with error", func(t *testing.T) { - fn := func() error { - return errTest - } - _, err := New(fn) - assert.Equal(t, errTest, err) - }) -} -func TestApplication_Run(t *testing.T) { - Convey("test application run serve", t, func(c C) { - srv := &testServer{ - ServeErr: errors.New("when server call serve error"), - } - app := &Application{} - app.initialize() - err := app.Serve(srv) - So(err, ShouldBeNil) - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 100) - err := app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - So(err, ShouldEqual, srv.ServeErr) - }) - Convey("test application run serve block", t, func(c C) { - srv := &testServer{ - ServeBlockTime: time.Second, - ServeErr: errors.New("when server call serve error"), - } - app := &Application{} - app.initialize() - err := app.Serve(srv) - So(err, ShouldBeNil) - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 100) - err := app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - So(err, ShouldEqual, srv.ServeErr) - }) - Convey("test application run stop", t, func(c C) { - srv := &testServer{ - ServeBlockTime: time.Second * 2, - StopBlockTime: time.Second, - StopErr: errors.New("when server call stop error"), - } - app := &Application{} - app.initialize() - err := app.Serve(srv) - So(err, ShouldBeNil) - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 200) - err := app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - So(err, ShouldEqual, srv.StopErr) - }) -} - -func TestApplication_initialize(t *testing.T) { - Convey("test application initialize", t, func() { - app := &Application{} - app.initialize() - So(app.servers, ShouldNotBeNil) - So(app.workers, ShouldNotBeNil) - So(app.logger, ShouldNotBeNil) - So(app.cycle, ShouldNotBeNil) - }) -} - -func TestApplication_Startup(t *testing.T) { - Convey("test application startup error", t, func() { - app := &Application{} - startUpErr := errors.New("throw startup error") - err := app.Startup(func() error { - return startUpErr - }) - So(err, ShouldEqual, startUpErr) - }) - - Convey("test application startup nil", t, func() { - app := &Application{} - err := app.Startup(func() error { - return nil - }) - So(err, ShouldBeNil) - }) -} - -type stopInfo struct { - state bool -} - -func (info *stopInfo) Stop() error { - info.state = true - return nil -} - -func TestApplication_BeforeStop(t *testing.T) { - Convey("test application before stop", t, func(c C) { - si := &stopInfo{} - app := &Application{} - app.initialize() - app.RegisterHooks(StageBeforeStop, si.Stop) - go func(si *stopInfo) { - time.Sleep(time.Microsecond * 100) - err := app.Stop() - c.So(err, ShouldBeNil) - c.So(si.state, ShouldEqual, true) - }(si) - err := app.Run() - c.So(err, ShouldBeNil) - }) -} -func TestApplication_EmptyRun(t *testing.T) { - Convey("test application empty run", t, func(c C) { - app := &Application{} - app.initialize() - go func() { - app.cycle.DoneAndClose() - }() - err := app.Run() - c.So(err, ShouldBeNil) - }) -} - -func TestApplication_AfterStop(t *testing.T) { - Convey("test application after stop", t, func() { - si := &stopInfo{} - app := &Application{} - app.initialize() - app.RegisterHooks(StageAfterStop, si.Stop) - go func() { - app.Stop() - }() - err := app.Run() - So(err, ShouldBeNil) - So(si.state, ShouldEqual, true) - }) -} - -func TestApplication_Serve(t *testing.T) { - Convey("test application serve throw wrong ip", t, func(c C) { - app := &Application{} - grpcConfig := xgrpc.DefaultConfig() - grpcConfig.Port = 0 - app.initialize() - err := app.Serve(grpcConfig.MustBuild()) - So(err, ShouldBeNil) - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 1500) - err = app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - // So(err, ShouldEqual, grpc.ErrServerStopped) - So(err, ShouldBeNil) - }) -} - -type testWorker struct { - RunErr error - StopErr error -} - -func (t *testWorker) Run() error { - return t.RunErr -} -func (t *testWorker) Stop() error { - return t.StopErr -} -func Test_Unit_Application_Schedule(t *testing.T) { - Convey("test unit Application.Schedule", t, func(c C) { - w := &testWorker{} - app := &Application{} - err := app.Schedule(w) - c.So(err, ShouldBeNil) - }) -} -func Test_Unit_Application_Stop(t *testing.T) { - Convey("test unit Application.Stop", t, func(c C) { - app := &Application{} - app.initialize() - err := app.Stop() - c.So(err, ShouldBeNil) - }) -} - -func Test_Unit_Application_GracefulStop(t *testing.T) { - Convey("test unit Application.GracefulStop", t, func(c C) { - app := &Application{} - app.initialize() - err := app.GracefulStop(context.TODO()) - c.So(err, ShouldBeNil) - }) -} -func Test_Unit_Application_startServers(t *testing.T) { - Convey("test unit Application.startServers", t, func(c C) { - app := &Application{} - app.initialize() - err := app.startServers() - c.So(err, ShouldBeNil) - go func() { - time.Sleep(time.Microsecond * 100) - app.Stop() - }() - }) -} - -type nonamedJobRunner struct{} - -func (t *nonamedJobRunner) Run() {} - -type namedJobRunner struct{} - -func (t *namedJobRunner) Run() {} -func (t *namedJobRunner) GetJobName() string { - return "namedJobRunner" -} -func Test_Unit_Application_Job(t *testing.T) { - t.Run("no named", func(t *testing.T) { - j := &nonamedJobRunner{} - app := &Application{} - app.initialize() - err := app.Job(j) - assert.Nil(t, err) - }) - t.Run("named", func(t *testing.T) { - j := &namedJobRunner{} - app := &Application{} - app.initialize() - err := app.Job(j) - assert.Nil(t, err, err) - }) - -} - -func Test_Unit_Application_startJobs(t *testing.T) { - t.Run("without jobs", func(t *testing.T) { - app := &Application{} - app.initialize() - err := app.startJobs() - assert.Nil(t, err, err) - }) - t.Run("with a jobs", func(t *testing.T) { - app := &Application{} - app.initialize() - app.jobs["test"] = &namedJobRunner{} - err := app.startJobs() - assert.Nil(t, err, err) - }) -} - -func Test_Unit_Application_startWorkers(t *testing.T) { - t.Run("without workers", func(t *testing.T) { - app := &Application{} - app.initialize() - err := app.startWorkers() - assert.Nil(t, err, err) - }) - t.Run("with a workers", func(t *testing.T) { - app := &Application{} - app.initialize() - app.workers = append(app.workers, &testWorker{}) - err := app.startWorkers() - assert.Nil(t, err, err) - }) -} - -/* - -func newFakeRegistry() registry.Registry { - return &fakeRegistry{ - prefix: "fake_registry", - store: make(map[string]string), - } -} - -type fakeRegistry struct { - prefix string - store map[string]string -} - -func (r *fakeRegistry) RegisterService(ctx context.Context, s *server.ServiceInfo) error { - r.store[registry.GetServiceKey(r.prefix, s)] = registry.GetServiceValue(s) - return nil -} -func (r *fakeRegistry) UnregisterService(ctx context.Context, s *server.ServiceInfo) error { - delete(r.store, registry.GetServiceKey(r.prefix, s)) - return nil -} -func (r *fakeRegistry) ListServices(ctx context.Context, s1 string, s2 string) ([]*server.ServiceInfo, error) { - var srvs []*server.ServiceInfo - for _, v := range r.store { - srvs = append(srvs, registry.GetService(v)) - } - return nil, nil -} -func (r *fakeRegistry) WatchServices(ctx context.Context, s1 string, s2 string) (chan registry.Endpoints, error) { - return nil, nil -} -func (r *fakeRegistry) Close() error { - return nil -} - -var _ registry.Registry = (*fakeRegistry)(nil) -*/ -/* -func TestRegister(t *testing.T) { - Convey("test application register", t, func(c C) { - app := &Application{} - grpcConfig := xgrpc.DefaultConfig() - grpcConfig.Port = 0 - app.initialize() - grpcServer := grpcConfig.Build() - err := app.Serve(grpcServer) - So(err, ShouldBeNil) - - etcdv3_registryConfig := etcdv3_registry.DefaultConfig() - etcdv3_registryConfig.Endpoints = []string{"127.0.0.1:2379"} - etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"127.0.0.1:2379"} - etcdctl := etcdConfig.Build() - app.SetRegistry( - compound_registry.New( - etcdv3_registryConfig.Build(), - ), - ) - err = app.RegisterHooks(StageBeforeStop, func() error { - resp, err := etcdctl.Get(context.Background(), "/jupiter/"+pkg.Name()+"/providers/grpc://", clientv3.WithPrefix()) - c.So(err, ShouldBeNil) - c.So(len(resp.Kvs), ShouldEqual, 1) - for _, value := range resp.Kvs { - c.So(string(value.Key), ShouldEqual, "/jupiter/"+pkg.Name()+"/providers/grpc://"+grpcServer.Address()) - c.So(string(value.Value), ShouldContainSubstring, pkg.Name()) - } - return nil - }) - So(err, ShouldBeNil) - - err = app.RegisterHooks(StageAfterStop, func() error { - //resp,err := etcdctl.Get(context.Background(),"/jupiter/"+pkg.Name()+"/providers/grpc://",clientv3.WithPrefix()) - return nil - }) - So(err, ShouldBeNil) - - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 3000) - err = app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - So(err, ShouldBeNil) - }) -} - -func TestResolverAndRegister(t *testing.T) { - Convey("test application register and client resolver", t, func(c C) { - app := &Application{} - grpcConfig := xgrpc.DefaultConfig() - grpcConfig.Port = 0 - app.initialize() - - grpcServer := grpcConfig.Build() - fooServer := &yell.FooServer{} - fooServer.SetName("srv1") - testproto.RegisterGreeterServer(grpcServer.Server, fooServer) - err := app.Serve(grpcServer) - So(err, ShouldBeNil) - - etcdv3_registryConfig := etcdv3_registry.DefaultConfig() - etcdv3_registryConfig.Endpoints = []string{"127.0.0.1:2379"} - etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"127.0.0.1:2379"} - app.SetRegistry( - compound_registry.New( - etcdv3_registryConfig.Build(), - ), - ) - - go func() { - // make sure Serve() is called - time.Sleep(time.Millisecond * 3000) - - resolver.Register("etcd", etcdv3_registryConfig.Build()) - cfg := grpc.DefaultConfig() - cfg.Address = "etcd:///" + pkg.Name() - directClient := testproto.NewGreeterClient(cfg.Build()) - Convey("test resolver grpc", t, func() { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - res, err := directClient.SayHello(ctx, &testproto.HelloRequest{ - Name: "hello", - }) - So(err, ShouldBeNil) - So(res.Message, ShouldEqual, yell.RespFantasy.Message) - }) - - err = app.Stop() - c.So(err, ShouldBeNil) - }() - err = app.Run() - So(err, ShouldBeNil) - }) -} -*/ +// import ( +// "context" +// "errors" +// "fmt" +// "testing" +// "time" + +// "github.com/douyu/jupiter/pkg/server/xgrpc" +// "github.com/stretchr/testify/assert" + +// "github.com/douyu/jupiter/pkg/server" +// . "github.com/smartystreets/goconvey/convey" +// ) + +// type testServer struct { +// ServeBlockTime time.Duration +// ServeErr error + +// StopBlockTime time.Duration +// StopErr error + +// GstopBlockTime time.Duration +// GstopErr error +// } + +// func (s *testServer) Serve() error { +// time.Sleep(s.ServeBlockTime) +// return s.ServeErr +// } +// func (s *testServer) Stop() error { +// time.Sleep(s.StopBlockTime) +// return s.StopErr +// } +// func (s *testServer) GracefulStop(ctx context.Context) error { +// time.Sleep(s.GstopBlockTime) +// return s.GstopErr +// } +// func (s *testServer) Info() *server.ServiceInfo { +// return &server.ServiceInfo{} +// } +// func (s *testServer) Healthz() bool { +// return true +// } + +// var errTest = fmt.Errorf("test error") + +// func Test_Unit_Application_New(t *testing.T) { +// t.Run("no params", func(t *testing.T) { +// _, err := New() +// assert.Nil(t, err) +// }) +// t.Run("without error", func(t *testing.T) { +// fn := func() error { +// return nil +// } +// _, err := New(fn) +// assert.Nil(t, err) +// }) +// t.Run("with error", func(t *testing.T) { +// fn := func() error { +// return errTest +// } +// _, err := New(fn) +// assert.Equal(t, errTest, err) +// }) +// } +// func TestApplication_Run(t *testing.T) { +// Convey("test application run serve", t, func(c C) { +// srv := &testServer{ +// ServeErr: errors.New("when server call serve error"), +// } +// app := &Application{} +// app.initialize() +// err := app.Serve(srv) +// So(err, ShouldBeNil) +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 100) +// err := app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// So(err, ShouldEqual, srv.ServeErr) +// }) +// Convey("test application run serve block", t, func(c C) { +// srv := &testServer{ +// ServeBlockTime: time.Second, +// ServeErr: errors.New("when server call serve error"), +// } +// app := &Application{} +// app.initialize() +// err := app.Serve(srv) +// So(err, ShouldBeNil) +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 100) +// err := app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// So(err, ShouldEqual, srv.ServeErr) +// }) +// Convey("test application run stop", t, func(c C) { +// srv := &testServer{ +// ServeBlockTime: time.Second * 2, +// StopBlockTime: time.Second, +// StopErr: errors.New("when server call stop error"), +// } +// app := &Application{} +// app.initialize() +// err := app.Serve(srv) +// So(err, ShouldBeNil) +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 200) +// err := app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// So(err, ShouldEqual, srv.StopErr) +// }) +// } + +// func TestApplication_initialize(t *testing.T) { +// Convey("test application initialize", t, func() { +// app := &Application{} +// app.initialize() +// So(app.servers, ShouldNotBeNil) +// So(app.workers, ShouldNotBeNil) +// So(app.logger, ShouldNotBeNil) +// So(app.cycle, ShouldNotBeNil) +// }) +// } + +// func TestApplication_Startup(t *testing.T) { +// Convey("test application startup error", t, func() { +// app := &Application{} +// startUpErr := errors.New("throw startup error") +// err := app.Startup(func() error { +// return startUpErr +// }) +// So(err, ShouldEqual, startUpErr) +// }) + +// Convey("test application startup nil", t, func() { +// app := &Application{} +// err := app.Startup(func() error { +// return nil +// }) +// So(err, ShouldBeNil) +// }) +// } + +// type stopInfo struct { +// state bool +// } + +// func (info *stopInfo) Stop() error { +// info.state = true +// return nil +// } + +// func TestApplication_BeforeStop(t *testing.T) { +// Convey("test application before stop", t, func(c C) { +// si := &stopInfo{} +// app := &Application{} +// app.initialize() +// app.RegisterHooks(StageBeforeStop, si.Stop) +// go func(si *stopInfo) { +// time.Sleep(time.Microsecond * 100) +// err := app.Stop() +// c.So(err, ShouldBeNil) +// c.So(si.state, ShouldEqual, true) +// }(si) +// err := app.Run() +// c.So(err, ShouldBeNil) +// }) +// } +// func TestApplication_EmptyRun(t *testing.T) { +// Convey("test application empty run", t, func(c C) { +// app := &Application{} +// app.initialize() +// go func() { +// app.cycle.DoneAndClose() +// }() +// err := app.Run() +// c.So(err, ShouldBeNil) +// }) +// } + +// func TestApplication_AfterStop(t *testing.T) { +// Convey("test application after stop", t, func() { +// si := &stopInfo{} +// app := &Application{} +// app.initialize() +// app.RegisterHooks(StageAfterStop, si.Stop) +// go func() { +// app.Stop() +// }() +// err := app.Run() +// So(err, ShouldBeNil) +// So(si.state, ShouldEqual, true) +// }) +// } + +// func TestApplication_Serve(t *testing.T) { +// Convey("test application serve throw wrong ip", t, func(c C) { +// app := &Application{} +// grpcConfig := xgrpc.DefaultConfig() +// grpcConfig.Port = 0 +// app.initialize() +// err := app.Serve(grpcConfig.MustBuild()) +// So(err, ShouldBeNil) +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 1500) +// err = app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// // So(err, ShouldEqual, grpc.ErrServerStopped) +// So(err, ShouldBeNil) +// }) +// } + +// type testWorker struct { +// RunErr error +// StopErr error +// } + +// func (t *testWorker) Run() error { +// return t.RunErr +// } +// func (t *testWorker) Stop() error { +// return t.StopErr +// } +// func Test_Unit_Application_Schedule(t *testing.T) { +// Convey("test unit Application.Schedule", t, func(c C) { +// w := &testWorker{} +// app := &Application{} +// err := app.Schedule(w) +// c.So(err, ShouldBeNil) +// }) +// } +// func Test_Unit_Application_Stop(t *testing.T) { +// Convey("test unit Application.Stop", t, func(c C) { +// app := &Application{} +// app.initialize() +// err := app.Stop() +// c.So(err, ShouldBeNil) +// }) +// } + +// func Test_Unit_Application_GracefulStop(t *testing.T) { +// Convey("test unit Application.GracefulStop", t, func(c C) { +// app := &Application{} +// app.initialize() +// err := app.GracefulStop(context.TODO()) +// c.So(err, ShouldBeNil) +// }) +// } + +// // func Test_Unit_Application_startServers(t *testing.T) { +// // Convey("test unit Application.startServers", t, func(c C) { +// // app := &Application{} +// // app.initialize() +// // err := app.startServers() +// // c.So(err, ShouldBeNil) +// // go func() { +// // time.Sleep(time.Microsecond * 100) +// // app.Stop() +// // }() +// // }) +// // } + +// type nonamedJobRunner struct{} + +// func (t *nonamedJobRunner) Run() {} + +// type namedJobRunner struct{} + +// func (t *namedJobRunner) Run() {} +// func (t *namedJobRunner) GetJobName() string { +// return "namedJobRunner" +// } +// func Test_Unit_Application_Job(t *testing.T) { +// t.Run("no named", func(t *testing.T) { +// j := &nonamedJobRunner{} +// app := &Application{} +// app.initialize() +// err := app.Job(j) +// assert.Nil(t, err) +// }) +// t.Run("named", func(t *testing.T) { +// j := &namedJobRunner{} +// app := &Application{} +// app.initialize() +// err := app.Job(j) +// assert.Nil(t, err, err) +// }) + +// } + +// func Test_Unit_Application_startJobs(t *testing.T) { +// t.Run("without jobs", func(t *testing.T) { +// app := &Application{} +// app.initialize() +// err := app.startJobs() +// assert.Nil(t, err, err) +// }) +// t.Run("with a jobs", func(t *testing.T) { +// app := &Application{} +// app.initialize() +// app.jobs["test"] = &namedJobRunner{} +// err := app.startJobs() +// assert.Nil(t, err, err) +// }) +// } + +// func Test_Unit_Application_startWorkers(t *testing.T) { +// t.Run("without workers", func(t *testing.T) { +// app := &Application{} +// app.initialize() +// err := app.startWorkers() +// assert.Nil(t, err, err) +// }) +// t.Run("with a workers", func(t *testing.T) { +// app := &Application{} +// app.initialize() +// app.workers = append(app.workers, &testWorker{}) +// err := app.startWorkers() +// assert.Nil(t, err, err) +// }) +// } + +// /* + +// func newFakeRegistry() registry.Registry { +// return &fakeRegistry{ +// prefix: "fake_registry", +// store: make(map[string]string), +// } +// } + +// type fakeRegistry struct { +// prefix string +// store map[string]string +// } + +// func (r *fakeRegistry) RegisterService(ctx context.Context, s *server.ServiceInfo) error { +// r.store[registry.GetServiceKey(r.prefix, s)] = registry.GetServiceValue(s) +// return nil +// } +// func (r *fakeRegistry) UnregisterService(ctx context.Context, s *server.ServiceInfo) error { +// delete(r.store, registry.GetServiceKey(r.prefix, s)) +// return nil +// } +// func (r *fakeRegistry) ListServices(ctx context.Context, s1 string, s2 string) ([]*server.ServiceInfo, error) { +// var srvs []*server.ServiceInfo +// for _, v := range r.store { +// srvs = append(srvs, registry.GetService(v)) +// } +// return nil, nil +// } +// func (r *fakeRegistry) WatchServices(ctx context.Context, s1 string, s2 string) (chan registry.Endpoints, error) { +// return nil, nil +// } +// func (r *fakeRegistry) Close() error { +// return nil +// } + +// var _ registry.Registry = (*fakeRegistry)(nil) +// */ +// /* +// func TestRegister(t *testing.T) { +// Convey("test application register", t, func(c C) { +// app := &Application{} +// grpcConfig := xgrpc.DefaultConfig() +// grpcConfig.Port = 0 +// app.initialize() +// grpcServer := grpcConfig.Build() +// err := app.Serve(grpcServer) +// So(err, ShouldBeNil) + +// etcdv3_registryConfig := etcdv3_registry.DefaultConfig() +// etcdv3_registryConfig.Endpoints = []string{"127.0.0.1:2379"} +// etcdConfig := etcdv3.DefaultConfig() +// etcdConfig.Endpoints = []string{"127.0.0.1:2379"} +// etcdctl := etcdConfig.Build() +// app.SetRegistry( +// compound_registry.New( +// etcdv3_registryConfig.Build(), +// ), +// ) +// err = app.RegisterHooks(StageBeforeStop, func() error { +// resp, err := etcdctl.Get(context.Background(), "/jupiter/"+pkg.Name()+"/providers/grpc://", clientv3.WithPrefix()) +// c.So(err, ShouldBeNil) +// c.So(len(resp.Kvs), ShouldEqual, 1) +// for _, value := range resp.Kvs { +// c.So(string(value.Key), ShouldEqual, "/jupiter/"+pkg.Name()+"/providers/grpc://"+grpcServer.Address()) +// c.So(string(value.Value), ShouldContainSubstring, pkg.Name()) +// } +// return nil +// }) +// So(err, ShouldBeNil) + +// err = app.RegisterHooks(StageAfterStop, func() error { +// //resp,err := etcdctl.Get(context.Background(),"/jupiter/"+pkg.Name()+"/providers/grpc://",clientv3.WithPrefix()) +// return nil +// }) +// So(err, ShouldBeNil) + +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 3000) +// err = app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// So(err, ShouldBeNil) +// }) +// } + +// func TestResolverAndRegister(t *testing.T) { +// Convey("test application register and client resolver", t, func(c C) { +// app := &Application{} +// grpcConfig := xgrpc.DefaultConfig() +// grpcConfig.Port = 0 +// app.initialize() + +// grpcServer := grpcConfig.Build() +// fooServer := &yell.FooServer{} +// fooServer.SetName("srv1") +// testproto.RegisterGreeterServer(grpcServer.Server, fooServer) +// err := app.Serve(grpcServer) +// So(err, ShouldBeNil) + +// etcdv3_registryConfig := etcdv3_registry.DefaultConfig() +// etcdv3_registryConfig.Endpoints = []string{"127.0.0.1:2379"} +// etcdConfig := etcdv3.DefaultConfig() +// etcdConfig.Endpoints = []string{"127.0.0.1:2379"} +// app.SetRegistry( +// compound_registry.New( +// etcdv3_registryConfig.Build(), +// ), +// ) + +// go func() { +// // make sure Serve() is called +// time.Sleep(time.Millisecond * 3000) + +// resolver.Register("etcd", etcdv3_registryConfig.Build()) +// cfg := grpc.DefaultConfig() +// cfg.Address = "etcd:///" + pkg.Name() +// directClient := testproto.NewGreeterClient(cfg.Build()) +// Convey("test resolver grpc", t, func() { +// ctx := context.Background() +// ctx, cancel := context.WithTimeout(ctx, time.Second) +// defer cancel() +// res, err := directClient.SayHello(ctx, &testproto.HelloRequest{ +// Name: "hello", +// }) +// So(err, ShouldBeNil) +// So(res.Message, ShouldEqual, yell.RespFantasy.Message) +// }) + +// err = app.Stop() +// c.So(err, ShouldBeNil) +// }() +// err = app.Run() +// So(err, ShouldBeNil) +// }) +// } +// */ diff --git a/pkg/autoproc/autoproc.go b/pkg/autoproc/autoproc.go index b337cca8ba..27c4669a2c 100644 --- a/pkg/autoproc/autoproc.go +++ b/pkg/autoproc/autoproc.go @@ -25,13 +25,12 @@ import ( func init() { // 初始化注册中心 + if _, err := maxprocs.Set(); err != nil { + xlog.Panic("auto max procs", xlog.FieldMod(ecode.ModProc), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err)) + } conf.OnLoaded(func(c *conf.Configuration) { if maxProcs := conf.GetInt("maxProc"); maxProcs != 0 { runtime.GOMAXPROCS(maxProcs) - } else { - if _, err := maxprocs.Set(); err != nil { - xlog.Panic("auto max procs", xlog.FieldMod(ecode.ModProc), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err)) - } } xlog.Info("auto max procs", xlog.FieldMod(ecode.ModProc), xlog.Int64("procs", int64(runtime.GOMAXPROCS(-1)))) }) diff --git a/pkg/client/etcdv3/client_test.go b/pkg/client/etcdv3/client_test.go index d212e063ee..c4221d4551 100644 --- a/pkg/client/etcdv3/client_test.go +++ b/pkg/client/etcdv3/client_test.go @@ -2,17 +2,35 @@ package etcdv3 import ( "context" + "fmt" + "log" "testing" "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/pkg/mock/mockserver" "github.com/stretchr/testify/assert" ) +func startMockServer() { + ms, err := mockserver.StartMockServers(1) + if err != nil { + log.Fatal(err) + } + + if err := ms.StartAt(0); err != nil { + log.Fatal(err) + } +} + +func TestMain(m *testing.M) { + go startMockServer() +} + func Test_GetKeyValue(t *testing.T) { config := DefaultConfig() - config.Endpoints = []string{"127.0.0.1:2379"} + config.Endpoints = []string{"localhost:0"} config.TTL = 5 etcdCli, err := newClient(config) assert.Nil(t, err) @@ -22,6 +40,7 @@ func Test_GetKeyValue(t *testing.T) { leaseSession, err := etcdCli.GetLeaseSession(ctx, concurrency.WithTTL(int(config.TTL))) assert.Nil(t, err) defer leaseSession.Close() + fmt.Printf("111=%+v\n", 111) _, err = etcdCli.Client.KV.Put(ctx, "/test/key", "{...}", clientv3.WithLease(leaseSession.Lease())) assert.Nil(t, err) @@ -34,7 +53,7 @@ func Test_GetKeyValue(t *testing.T) { func Test_MutexLock(t *testing.T) { config := DefaultConfig() - config.Endpoints = []string{"127.0.0.1:2379"} + config.Endpoints = []string{"localhost:0"} config.TTL = 10 etcdCli, err := newClient(config) assert.Nil(t, err) diff --git a/pkg/client/redis/redis_test.go b/pkg/client/redis/redis_test.go index 6dffc54e62..735cca12dd 100644 --- a/pkg/client/redis/redis_test.go +++ b/pkg/client/redis/redis_test.go @@ -16,17 +16,23 @@ package redis import ( "testing" + + "github.com/alicebob/miniredis/v2" ) func TestRedis(t *testing.T) { // TODO(gorexlv): add redis ci + mr, err := miniredis.Run() + if err != nil { + t.Errorf("redis run failed:%v", err) + } redisConfig := DefaultRedisConfig() - redisConfig.Addrs = []string{"localhost:6379"} + redisConfig.Addrs = []string{mr.Addr()} redisConfig.Mode = StubMode redisClient := redisConfig.Build() - err := redisClient.Client.Ping().Err() - if err != nil { - t.Errorf("redis ping failed:%v", err) + pingErr := redisClient.Client.Ping().Err() + if pingErr != nil { + t.Errorf("redis ping failed:%v", pingErr) } st := redisClient.Stub().PoolStats() t.Logf("running status %+v", st) diff --git a/pkg/component/component.go b/pkg/component/component.go new file mode 100644 index 0000000000..a4b3cd499f --- /dev/null +++ b/pkg/component/component.go @@ -0,0 +1,50 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import "github.com/douyu/jupiter/pkg/metric" + +type Component interface { + // Start blocks until the channel is closed or an error occurs. + // The component will stop running when the channel is closed. + Start(<-chan struct{}) error + + ShouldBeLeader() bool +} + +var _ Component = ComponentFunc(nil) + +type ComponentFunc func(<-chan struct{}) error + +func (f ComponentFunc) Start(stop <-chan struct{}) error { + return f(stop) +} + +func (f ComponentFunc) ShouldBeLeader() bool { + return false +} + +// Component manager, aggregate multiple components to one +type Manager interface { + Component + AddComponent(...Component) error +} + +// Component builder, build component with injecting govern plugin +type Builder interface { + WithComponentManager(Manager) Builder + WithMetrics(metric.Metrics) Builder + Build() Component +} diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index 584d825894..d9e620dd72 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -19,6 +19,7 @@ import ( "io" "io/ioutil" "log" + "os" "reflect" "strings" "sync" @@ -30,9 +31,6 @@ import ( "github.com/pkg/errors" ) -func init() { -} - // Configuration provides configuration for application. type Configuration struct { mu sync.RWMutex @@ -98,6 +96,19 @@ func (c *Configuration) OnLoaded(fn func(*Configuration)) { c.onLoadeds = append(c.onLoadeds, fn) } +// LoadEnvironments reads os environments with prefix such as APP_ +// PREFIX_FIELD1_FIELD2 will be translated into prefix.field1.field2 +func (c *Configuration) LoadEnvironments(prefix string) { + for _, env := range os.Environ() { + if !strings.HasPrefix(env, prefix) { + continue + } + key := strings.ToLower(strings.ReplaceAll(env, "_", ".")) + val := os.Getenv(env) + c.Set(key, val) + } +} + // LoadFromDataSource ... func (c *Configuration) LoadFromDataSource(ds DataSource, unmarshaller Unmarshaller) error { content, err := ds.ReadConfig() diff --git a/pkg/conf/conf_test.go b/pkg/conf/conf_test.go new file mode 100644 index 0000000000..9605297044 --- /dev/null +++ b/pkg/conf/conf_test.go @@ -0,0 +1,15 @@ +// Copyright 2021 douyu +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conf diff --git a/pkg/conf/datasource.go b/pkg/conf/datasource.go index 606c1a09c1..5684534730 100644 --- a/pkg/conf/datasource.go +++ b/pkg/conf/datasource.go @@ -11,8 +11,8 @@ var ( ErrConfigAddr = errors.New("no config... ") // ErrInvalidDataSource defines an error that the scheme has been registered ErrInvalidDataSource = errors.New("invalid data source, please make sure the scheme has been registered") - datasourceBuilders map[string]DataSourceCreatorFunc - configDecoder map[string]Unmarshaller + datasourceBuilders = make(map[string]DataSourceCreatorFunc) + configDecoder = make(map[string]Unmarshaller) ) // DataSourceCreatorFunc represents a dataSource creator function diff --git a/pkg/conf/init.go b/pkg/conf/init.go index 8a1845948f..d74a42b137 100644 --- a/pkg/conf/init.go +++ b/pkg/conf/init.go @@ -21,8 +21,14 @@ import ( "github.com/douyu/jupiter/pkg/flag" ) +const DefaultEnvPrefix = "APP_" + func init() { - datasourceBuilders = make(map[string]DataSourceCreatorFunc) + flag.Register(&flag.StringFlag{Name: "envPrefix", Usage: "--envPrefix=APP_", Default: DefaultEnvPrefix, Action: func(key string, fs *flag.FlagSet) { + var envPrefix = fs.String(key) + defaultConfiguration.LoadEnvironments(envPrefix) + }}) + flag.Register(&flag.StringFlag{Name: "config", Usage: "--config=config.toml", Action: func(key string, fs *flag.FlagSet) { var configAddr = fs.String(key) log.Printf("read config: %s", configAddr) diff --git a/pkg/elect/elect.go b/pkg/elect/elect.go new file mode 100644 index 0000000000..f8a8c7abb7 --- /dev/null +++ b/pkg/elect/elect.go @@ -0,0 +1,30 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package elect + +type CallbackPhase int + +const ( + CallbackPhasePostStarted CallbackPhase = 1 + CallbackPhasePostStopped CallbackPhase = 2 +) + +type LeaderElectCallback func(CallbackPhase) + +type LeaderElector interface { + Start(stop <-chan struct{}) + IsLeader() bool + AddCallbacks(...LeaderElectCallback) +} diff --git a/pkg/elect/elector.go b/pkg/elect/elector.go new file mode 100644 index 0000000000..69ab8d21d5 --- /dev/null +++ b/pkg/elect/elector.go @@ -0,0 +1,114 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package elect + +import ( + "sync" + + "github.com/douyu/jupiter/pkg/component" +) + +var _ component.Manager = &electorComponent{} + +type electorComponent struct { + components []component.Component + leaderElector LeaderElector +} + +func NewComponent() *electorComponent { + return &electorComponent{ + components: make([]component.Component, 0), + } +} + +func (e *electorComponent) Start(stop <-chan struct{}) error { + errCh := make(chan error) + e.startNonLeaderComponents(stop, errCh) + e.startLeaderComponents(stop, errCh) + + select { + case <-stop: + return nil + case err := <-errCh: + return err + } +} + +func (e *electorComponent) AddComponent(components ...component.Component) error { + e.components = append(e.components, components...) + return nil +} + +func (e *electorComponent) ShouldBeLeader() bool { + return false +} + +func (e *electorComponent) startNonLeaderComponents(stop <-chan struct{}, errCh chan error) { + for _, item := range e.components { + if !item.ShouldBeLeader() { + go func(c component.Component) { + if err := c.Start(stop); err != nil { + errCh <- err + } + }(item) + } + } +} + +func (e *electorComponent) startLeaderComponents(stop <-chan struct{}, errCh chan error) { + var mutex sync.Mutex + stopCh := make(chan struct{}) + closeCh := func() { + mutex.Lock() + defer mutex.Unlock() + select { + case <-stopCh: + default: + close(stopCh) + } + } + + e.leaderElector.AddCallbacks( + func(phase CallbackPhase) { + if phase != CallbackPhasePostStarted { + return + } + mutex.Lock() + defer mutex.Unlock() + stopCh = make(chan struct{}) + for _, item := range e.components { + if item.ShouldBeLeader() { + go func(c component.Component) { + if err := c.Start(stopCh); err != nil { + errCh <- err + } + }(item) + } + } + }, + func(phase CallbackPhase) { + if phase != CallbackPhasePostStopped { + return + } + closeCh() + }, + ) + + go e.leaderElector.Start(stop) + go func() { + <-stop + closeCh() + }() +} diff --git a/pkg/elect/etcdelector/elect.go b/pkg/elect/etcdelector/elect.go new file mode 100644 index 0000000000..42a285f296 --- /dev/null +++ b/pkg/elect/etcdelector/elect.go @@ -0,0 +1,15 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdelector diff --git a/pkg/elect/memelector/elect.go b/pkg/elect/memelector/elect.go new file mode 100644 index 0000000000..27545bc231 --- /dev/null +++ b/pkg/elect/memelector/elect.go @@ -0,0 +1,52 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memelector + +import "github.com/douyu/jupiter/pkg/elect" + +type noopLeaderElector struct { + alwaysLeader bool + callbacks []elect.LeaderElectCallback +} + +var _ elect.LeaderElector = &noopLeaderElector{} + +func NewAlwaysLeaderElector() elect.LeaderElector { + return &noopLeaderElector{ + alwaysLeader: true, + } +} + +func NewNeverLeaderElector() elect.LeaderElector { + return &noopLeaderElector{ + alwaysLeader: false, + } +} + +func (n *noopLeaderElector) AddCallbacks(callbacks ...elect.LeaderElectCallback) { + n.callbacks = append(n.callbacks, callbacks...) +} + +func (n *noopLeaderElector) IsLeader() bool { + return n.alwaysLeader +} + +func (n *noopLeaderElector) Start(stop <-chan struct{}) { + if n.alwaysLeader { + for _, callback := range n.callbacks { + callback(elect.CallbackPhasePostStarted) + } + } +} diff --git a/pkg/elect/pgelector/elect.go b/pkg/elect/pgelector/elect.go new file mode 100644 index 0000000000..10d661c81e --- /dev/null +++ b/pkg/elect/pgelector/elect.go @@ -0,0 +1,109 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgelector + +import ( + "context" + "sync/atomic" + "time" + + "cirello.io/pglock" + "github.com/douyu/jupiter/pkg/elect" + "github.com/douyu/jupiter/pkg/xlog" +) + +var _logger = xlog.DefaultLogger.With(xlog.FieldMod("pgelector")) + +// postgresLeaderElector implements leader election using PostgreSQL DB. +// pglock does not rely on timestamps, which eliminates the problem of clock skews, but the cost is that first leader election can happen only after lease duration +// pglock does optimistic locking under the hood, the alternative would be to use pg_advisory_lock +type postgresLeaderElector struct { + leader int32 + lockClient *pglock.Client + callbacks []elect.LeaderElectCallback + lockName string + backoffTime time.Duration +} + +var _ elect.LeaderElector = &postgresLeaderElector{} + +func New(lockClient *pglock.Client, lockName string, backoffTime time.Duration) elect.LeaderElector { + return &postgresLeaderElector{ + lockClient: lockClient, + lockName: lockName, + backoffTime: backoffTime, + } +} + +func (p *postgresLeaderElector) Start(stop <-chan struct{}) { + _logger.Info("starting Leader Elector") + ctx, cancelFn := context.WithCancel(context.Background()) + go func() { + <-stop + _logger.Info("stopping Leader Elector") + cancelFn() + }() + + for { + _logger.Info("waiting for lock") + if err := p.lockClient.Do(ctx, p.lockName, func(ctx context.Context, lock *pglock.Lock) error { + p.leaderAcquired() + <-ctx.Done() + p.leaderLost() + return nil + }); err != nil { + _logger.Errorw(err.Error(), "error waiting for lock") + } + + select { + case <-stop: + break + default: + } + + time.Sleep(p.backoffTime) + } + _logger.Info("Leader Elector stopped") +} + +func (p *postgresLeaderElector) leaderAcquired() { + p.setLeader(true) + for _, callback := range p.callbacks { + callback(elect.CallbackPhasePostStarted) + } +} + +func (p *postgresLeaderElector) leaderLost() { + p.setLeader(false) + for _, callback := range p.callbacks { + callback(elect.CallbackPhasePostStopped) + } +} + +func (p *postgresLeaderElector) AddCallbacks(callbacks ...elect.LeaderElectCallback) { + p.callbacks = append(p.callbacks, callbacks...) +} + +func (p *postgresLeaderElector) setLeader(leader bool) { + var value int32 = 0 + if leader { + value = 1 + } + atomic.StoreInt32(&p.leader, value) +} + +func (p *postgresLeaderElector) IsLeader() bool { + return atomic.LoadInt32(&(p.leader)) == 1 +} diff --git a/pkg/elect/rediselector/elect.go b/pkg/elect/rediselector/elect.go new file mode 100644 index 0000000000..071f96b024 --- /dev/null +++ b/pkg/elect/rediselector/elect.go @@ -0,0 +1,40 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rediselector + +import "github.com/douyu/jupiter/pkg/elect" + +var _ elect.LeaderElector = &redisLeaderElector{} + +type redisLeaderElector struct { + callbacks []elect.LeaderElectCallback +} + +func New() *redisLeaderElector { + return &redisLeaderElector{ + callbacks: make([]elect.LeaderElectCallback, 0), + } +} + +func (r *redisLeaderElector) IsLeader() bool { + return false +} + +func (r *redisLeaderElector) AddCallbacks(callbacks ...elect.LeaderElectCallback) { + +} + +func (r *redisLeaderElector) Start(stop <-chan struct{}) { +} diff --git a/pkg/flag/flag.go b/pkg/flag/flag.go index 51576f96e6..332eb8e01e 100644 --- a/pkg/flag/flag.go +++ b/pkg/flag/flag.go @@ -107,6 +107,7 @@ func (fs *FlagSet) Parse() error { } fs.FlagSet.Visit(func(f *flag.Flag) { + // do action hook after parse flagset if action, ok := fs.actions[f.Name]; ok && action != nil { action(f.Name, fs) } diff --git a/pkg/metric/metric.go b/pkg/metric/metric.go index 4d089dbc74..15cc3abb6e 100644 --- a/pkg/metric/metric.go +++ b/pkg/metric/metric.go @@ -20,9 +20,17 @@ import ( "github.com/douyu/jupiter/pkg" "github.com/douyu/jupiter/pkg/governor" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) +type Metrics interface { + prometheus.Registerer + prometheus.Gatherer + + BulkRegister(...prometheus.Collector) error +} + var ( // TypeHTTP ... TypeHTTP = "http" diff --git a/pkg/registry/etcdv3/registry_test.go b/pkg/registry/etcdv3/registry_test.go index 1428097584..ebbd0c0f3c 100644 --- a/pkg/registry/etcdv3/registry_test.go +++ b/pkg/registry/etcdv3/registry_test.go @@ -17,10 +17,12 @@ package etcdv3 import ( "context" "fmt" + "log" "testing" "time" "github.com/douyu/jupiter/pkg/constant" + "go.etcd.io/etcd/pkg/mock/mockserver" "github.com/douyu/jupiter/pkg/client/etcdv3" "github.com/douyu/jupiter/pkg/registry" @@ -29,9 +31,24 @@ import ( "github.com/stretchr/testify/assert" ) +func startMockServer() { + ms, err := mockserver.StartMockServers(1) + if err != nil { + log.Fatal(err) + } + + if err := ms.StartAt(0); err != nil { + log.Fatal(err) + } +} + +func TestMain(m *testing.M) { + go startMockServer() +} + func Test_etcdv3Registry(t *testing.T) { etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"127.0.0.1:2379"} + etcdConfig.Endpoints = []string{"localhost:0"} registry, err := newETCDRegistry(&Config{ Config: etcdConfig, ReadTimeout: time.Second * 10, @@ -96,7 +113,7 @@ func Test_etcdv3Registry(t *testing.T) { func Test_etcdv3registry_UpdateAddressList(t *testing.T) { etcdConfig := etcdv3.DefaultConfig() - etcdConfig.Endpoints = []string{"127.0.0.1:2379"} + etcdConfig.Endpoints = []string{"localhost:0"} reg, err := newETCDRegistry(&Config{ Config: etcdConfig, ReadTimeout: time.Second * 10, diff --git a/pkg/registry/init.go b/pkg/registry/init.go index 768233bc30..f892bc88e3 100644 --- a/pkg/registry/init.go +++ b/pkg/registry/init.go @@ -60,6 +60,8 @@ func init() { type Builder func(string) Registry +type BuildFunc func(string) (Registry, error) + func RegisterBuilder(kind string, build Builder) { if _, ok := registryBuilder[kind]; ok { log.Panicf("duplicate register registry builder: %s", kind) diff --git a/pkg/registry/service.go b/pkg/registry/service.go new file mode 100644 index 0000000000..f34b3f8e42 --- /dev/null +++ b/pkg/registry/service.go @@ -0,0 +1,30 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import "github.com/douyu/jupiter/pkg/server" + +type service struct { + server.Server +} + +func Service(srv server.Server) server.Server { + return &service{Server: srv} +} + +func (s *service) Serve() error { + + return s.Server.Serve() +} diff --git a/pkg/server/xgoframe/server.go b/pkg/server/xgoframe/server.go index 86d6740109..93cca362f5 100644 --- a/pkg/server/xgoframe/server.go +++ b/pkg/server/xgoframe/server.go @@ -79,3 +79,9 @@ func (s *Server) Info() *server.ServiceInfo { ) return &info } + +// Healthz +// TODO(roamerlv): +func (s *Server) Healthz() bool { + return true +} diff --git a/pkg/util/xcolor/string_darwin.go b/pkg/util/xcolor/string_darwin.go index d365fc3fa7..0eb189fa7a 100644 --- a/pkg/util/xcolor/string_darwin.go +++ b/pkg/util/xcolor/string_darwin.go @@ -48,6 +48,10 @@ func Blue(msg string, arg ...interface{}) string { func Green(msg string, arg ...interface{}) string { return sprint(GreenColor, msg, arg...) } +// Greenf ... +func Greenf(msg string, arg ...interface{}) string { + return sprint(GreenColor, msg, arg...) +} // sprint func sprint(colorValue int, msg string, arg ...interface{}) string { diff --git a/pkg/util/xcolor/string_linux.go b/pkg/util/xcolor/string_linux.go index 6e182f3334..13a0275134 100644 --- a/pkg/util/xcolor/string_linux.go +++ b/pkg/util/xcolor/string_linux.go @@ -48,6 +48,10 @@ func Blue(msg string, arg ...interface{}) string { func Green(msg string, arg ...interface{}) string { return sprint(GreenColor, msg, arg...) } +// Greenf ... +func Greenf(msg string, arg ...interface{}) string { + return sprint(GreenColor, msg, arg...) +} // sprint func sprint(colorValue int, msg string, arg ...interface{}) string { diff --git a/pkg/util/xcolor/string_windows.go b/pkg/util/xcolor/string_windows.go index da246672c5..c4aa74f2b1 100644 --- a/pkg/util/xcolor/string_windows.go +++ b/pkg/util/xcolor/string_windows.go @@ -49,6 +49,11 @@ func Green(msg string, arg ...interface{}) string { return sprint(msg, arg...) } +// Greenf ... +func Greenf(msg string, arg ...interface{}) string { + return sprint(msg, arg...) +} + // sprint ... func sprint(msg string, arg ...interface{}) string { if arg != nil { diff --git a/pkg/util/xgo/chan.go b/pkg/util/xgo/chan.go new file mode 100644 index 0000000000..2403f7b405 --- /dev/null +++ b/pkg/util/xgo/chan.go @@ -0,0 +1,24 @@ +// Copyright 2021 rex lv +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xgo + +func IsChanClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + } + return false +} diff --git a/tools/ast_codes/main.go b/tools/ast_codes/main.go index f5afabae56..4b05e5cc34 100644 --- a/tools/ast_codes/main.go +++ b/tools/ast_codes/main.go @@ -33,7 +33,7 @@ import ( // "github.com/pelletier/go-toml" "github.com/davecgh/go-spew/spew" - "github.com/douyu/pkg/util/xcast" + "github.com/douyu/jupiter/pkg/util/xcast" ) /*