@@ -14,30 +14,33 @@ import (
1414 "github.com/libp2p/go-libp2p/core/peerstore"
1515 "github.com/libp2p/go-libp2p/core/protocol"
1616 "github.com/libp2p/go-libp2p/core/record"
17- "github.com/libp2p/go-libp2p/p2p/host/eventbus"
18-
1917 logging "github.com/libp2p/go-libp2p/gologshim"
18+ "github.com/libp2p/go-libp2p/p2p/host/eventbus"
2019
2120 ma "github.com/multiformats/go-multiaddr"
2221 mstream "github.com/multiformats/go-multistream"
2322)
2423
2524var log = logging .Logger ("blankhost" )
2625
27- // BlankHost is the thinnest implementation of the host.Host interface
26+ // BlankHost is a thin implementation of the host.Host interface
2827type BlankHost struct {
29- n network.Network
30- mux * mstream.MultistreamMuxer [protocol.ID ]
31- cmgr connmgr.ConnManager
32- eventbus event.Bus
33- emitters struct {
28+ N network.Network
29+ M * mstream.MultistreamMuxer [protocol.ID ]
30+ E event.Bus
31+ ConnMgr connmgr.ConnManager
32+ // SkipInitSignedRecord is a flag to skip the initialization of a signed record for the host
33+ SkipInitSignedRecord bool
34+ emitters struct {
3435 evtLocalProtocolsUpdated event.Emitter
3536 }
37+ onStop []func () error
3638}
3739
3840type config struct {
39- cmgr connmgr.ConnManager
40- eventBus event.Bus
41+ cmgr connmgr.ConnManager
42+ eventBus event.Bus
43+ skipInitSignedRecord bool
4144}
4245
4346type Option = func (cfg * config )
@@ -54,6 +57,12 @@ func WithEventBus(eventBus event.Bus) Option {
5457 }
5558}
5659
60+ func SkipInitSignedRecord () Option {
61+ return func (cfg * config ) {
62+ cfg .skipInitSignedRecord = true
63+ }
64+ }
65+
5766func NewBlankHost (n network.Network , options ... Option ) * BlankHost {
5867 cfg := config {
5968 cmgr : & connmgr.NullConnMgr {},
@@ -63,36 +72,72 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
6372 }
6473
6574 bh := & BlankHost {
66- n : n ,
67- cmgr : cfg .cmgr ,
68- mux : mstream .NewMultistreamMuxer [protocol.ID ](),
69- eventbus : cfg .eventBus ,
75+ N : n ,
76+ ConnMgr : cfg .cmgr ,
77+ M : mstream .NewMultistreamMuxer [protocol.ID ](),
78+ E : cfg .eventBus ,
79+
80+ SkipInitSignedRecord : cfg .skipInitSignedRecord ,
7081 }
71- if bh .eventbus == nil {
72- bh .eventbus = eventbus .NewBus (eventbus .WithMetricsTracer (eventbus .NewMetricsTracer ()))
82+
83+ if err := bh .Start (); err != nil {
84+ log .Error ("error creating blank host" , "err" , err )
85+ return nil
86+ }
87+
88+ return bh
89+ }
90+
91+ func (bh * BlankHost ) Start () error {
92+ if bh .E == nil {
93+ bh .E = eventbus .NewBus (eventbus .WithMetricsTracer (eventbus .NewMetricsTracer ()))
7394 }
7495
7596 // subscribe the connection manager to network notifications (has no effect with NullConnMgr)
76- n .Notify (bh .cmgr .Notifee ())
97+ notifee := bh .ConnMgr .Notifee ()
98+ bh .N .Notify (notifee )
99+ bh .onStop = append (bh .onStop , func () error {
100+ bh .N .StopNotify (notifee )
101+ return nil
102+ })
77103
78104 var err error
79- if bh .emitters .evtLocalProtocolsUpdated , err = bh .eventbus .Emitter (& event.EvtLocalProtocolsUpdated {}); err != nil {
80- return nil
105+ if bh .emitters .evtLocalProtocolsUpdated , err = bh .E .Emitter (& event.EvtLocalProtocolsUpdated {}); err != nil {
106+ return err
81107 }
108+ bh .onStop = append (bh .onStop , func () error {
109+ bh .emitters .evtLocalProtocolsUpdated .Close ()
110+ return nil
111+ })
82112
83- n .SetStreamHandler (bh .newStreamHandler )
113+ bh .N .SetStreamHandler (bh .newStreamHandler )
114+ bh .onStop = append (bh .onStop , func () error {
115+ bh .N .SetStreamHandler (func (s network.Stream ) { s .Reset () })
116+ return nil
117+ })
84118
85119 // persist a signed peer record for self to the peerstore.
86- if err := bh .initSignedRecord (); err != nil {
87- log .Error ("error creating blank host" , "err" , err )
88- return nil
120+ if ! bh .SkipInitSignedRecord {
121+ if err := bh .initSignedRecord (); err != nil {
122+ log .Error ("error creating blank host" , "err" , err )
123+ return err
124+ }
89125 }
90126
91- return bh
127+ return nil
128+ }
129+
130+ func (bh * BlankHost ) Stop () error {
131+ var err error
132+ for _ , f := range bh .onStop {
133+ err = errors .Join (err , f ())
134+ }
135+ bh .onStop = nil
136+ return err
92137}
93138
94139func (bh * BlankHost ) initSignedRecord () error {
95- cab , ok := peerstore .GetCertifiedAddrBook (bh .n .Peerstore ())
140+ cab , ok := peerstore .GetCertifiedAddrBook (bh .N .Peerstore ())
96141 if ! ok {
97142 log .Error ("peerstore does not support signed records" )
98143 return errors .New ("peerstore does not support signed records" )
@@ -114,7 +159,7 @@ func (bh *BlankHost) initSignedRecord() error {
114159var _ host.Host = (* BlankHost )(nil )
115160
116161func (bh * BlankHost ) Addrs () []ma.Multiaddr {
117- addrs , err := bh .n .InterfaceListenAddresses ()
162+ addrs , err := bh .N .InterfaceListenAddresses ()
118163 if err != nil {
119164 log .Debug ("error retrieving network interface addrs" , "err" , err )
120165 return nil
@@ -124,14 +169,18 @@ func (bh *BlankHost) Addrs() []ma.Multiaddr {
124169}
125170
126171func (bh * BlankHost ) Close () error {
127- return bh .n .Close ()
172+ var err error
173+ if bh .onStop != nil {
174+ err = bh .Stop ()
175+ }
176+ return errors .Join (err , bh .N .Close ())
128177}
129178
130179func (bh * BlankHost ) Connect (ctx context.Context , ai peer.AddrInfo ) error {
131180 // absorb addresses into peerstore
132181 bh .Peerstore ().AddAddrs (ai .ID , ai .Addrs , peerstore .TempAddrTTL )
133182
134- cs := bh .n .ConnsToPeer (ai .ID )
183+ cs := bh .N .ConnsToPeer (ai .ID )
135184 if len (cs ) > 0 {
136185 return nil
137186 }
@@ -144,15 +193,15 @@ func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error {
144193}
145194
146195func (bh * BlankHost ) Peerstore () peerstore.Peerstore {
147- return bh .n .Peerstore ()
196+ return bh .N .Peerstore ()
148197}
149198
150199func (bh * BlankHost ) ID () peer.ID {
151- return bh .n .LocalPeer ()
200+ return bh .N .LocalPeer ()
152201}
153202
154203func (bh * BlankHost ) NewStream (ctx context.Context , p peer.ID , protos ... protocol.ID ) (network.Stream , error ) {
155- s , err := bh .n .NewStream (ctx , p )
204+ s , err := bh .N .NewStream (ctx , p )
156205 if err != nil {
157206 return nil , fmt .Errorf ("failed to open stream: %w" , err )
158207 }
@@ -204,7 +253,7 @@ func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(protocol.ID)
204253func (bh * BlankHost ) newStreamHandler (s network.Stream ) {
205254 protoID , handle , err := bh .Mux ().Negotiate (s )
206255 if err != nil {
207- log .Info ("protocol negotiation failed" , "err" , err )
256+ log .Error ("protocol negotiation failed" , "err" , err )
208257 s .Reset ()
209258 return
210259 }
@@ -216,18 +265,18 @@ func (bh *BlankHost) newStreamHandler(s network.Stream) {
216265
217266// TODO: i'm not sure this really needs to be here
218267func (bh * BlankHost ) Mux () protocol.Switch {
219- return bh .mux
268+ return bh .M
220269}
221270
222271// TODO: also not sure this fits... Might be better ways around this (leaky abstractions)
223272func (bh * BlankHost ) Network () network.Network {
224- return bh .n
273+ return bh .N
225274}
226275
227276func (bh * BlankHost ) ConnManager () connmgr.ConnManager {
228- return bh .cmgr
277+ return bh .ConnMgr
229278}
230279
231280func (bh * BlankHost ) EventBus () event.Bus {
232- return bh .eventbus
281+ return bh .E
233282}
0 commit comments