Skip to content

Commit

Permalink
Merge pull request #96 from 0KnowledgeNetwork/perf/pki
Browse files Browse the repository at this point in the history
feat(pki): node appchain registration improvements
  • Loading branch information
xendarboh authored Jan 8, 2025
2 parents 3a528ad + 2deeaed commit 46b9d30
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 74 deletions.
185 changes: 143 additions & 42 deletions pki/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"crypto/hmac"
"errors"
"fmt"
"path/filepath"
Expand All @@ -16,6 +17,8 @@ import (
"github.com/katzenpost/hpqc/hash"
"github.com/katzenpost/hpqc/rand"
"github.com/katzenpost/hpqc/sign"
signpem "github.com/katzenpost/hpqc/sign/pem"
signSchemes "github.com/katzenpost/hpqc/sign/schemes"
"github.com/katzenpost/katzenpost/core/epochtime"
"github.com/katzenpost/katzenpost/core/pki"
"github.com/katzenpost/katzenpost/core/sphinx/constants"
Expand All @@ -27,7 +30,7 @@ import (

const (
stateBootstrap = "bootstrap"
stateWaitBlockDesc = "wait_block_desc"
stateDescriptorSend = "descriptor_send"
stateAcceptDescriptor = "accept_desc"
stateAcceptVote = "accept_vote"
stateConfirmConsensus = "confirm_consensus"
Expand All @@ -45,6 +48,7 @@ var (
AuthorityVoteDeadline = epochtime.Period * 3 / 8
PublishConsensusDeadline = epochtime.Period * 5 / 8 // Do NOT change this
DocGenerationDeadline = epochtime.Period * 7 / 8
RandomCourtessyDelay = epochtime.Period * 1 / 16 // duration to distribute load across synchronized nodes
errGone = errors.New("authority: Requested epoch will never get a Document")
errNotYet = errors.New("authority: Document is not ready yet")
errInvalidTopology = errors.New("authority: Invalid Topology")
Expand All @@ -59,11 +63,12 @@ type state struct {
chainBridge *chainbridge.ChainBridge
ccbor cbor.EncMode // a la katzenpost:core/pki/document.go

// locally registered node(s), only one allowed
// authority authentication for descriptor uploads is limited to this
registeredLocalNodes map[[publicKeyHashSize]byte]bool
// locally registered node, only one allowed
// mix descriptor uploads to this authority are restricted to this node
authorizedNode *chainbridge.Node

documents map[uint64]*pki.Document
documents map[uint64]*pki.Document
descriptors map[uint64]map[[publicKeyHashSize]byte]*pki.MixDescriptor

votingEpoch uint64
genesisEpoch uint64
Expand All @@ -86,6 +91,11 @@ func (s *state) worker() {
}
}

// Returns a random delay to distribute load across synchronized nodes
func (s *state) courtessyDelay() time.Duration {
return time.Duration(rand.NewMath().Float64() * float64(RandomCourtessyDelay))
}

func (s *state) fsm() <-chan time.Time {
s.Lock()
var sleep time.Duration
Expand All @@ -105,24 +115,30 @@ func (s *state) fsm() <-chan time.Time {
s.state = stateBootstrap
} else {
s.votingEpoch = epoch + 1
s.state = stateWaitBlockDesc
sleep = MixPublishDeadline - elapsed
s.state = stateDescriptorSend
sleep = MixPublishDeadline - elapsed + s.courtessyDelay()
if sleep < 0 {
sleep = 0
}
s.log.Noticef("Bootstrapping for %d", s.votingEpoch)
}
case stateWaitBlockDesc:
// Wait for appchain block production of all registered descriptors
case stateDescriptorSend:
// Send mix descriptor to the appchain
pk := hash.Sum256(s.authorizedNode.IdentityKey)
desc, ok := s.descriptors[s.votingEpoch][pk]
if ok {
s.submitDescriptorToAppchain(desc, s.votingEpoch)
} else {
s.log.Errorf("❌ No descriptor for epoch %d", s.votingEpoch)
}
s.state = stateAcceptDescriptor
sleep = DescriptorBlockDeadline - elapsed
sleep = DescriptorBlockDeadline - elapsed + s.courtessyDelay()
case stateAcceptDescriptor:
doc, err := s.getVote(s.votingEpoch)
if err == nil {
s.log.Noticef("authority: FSM: Sending vote for epoch %d in epoch %d", s.votingEpoch, epoch)
s.sendVoteToAppchain(doc, s.votingEpoch)
} else {
s.log.Errorf("Failed to compute vote for epoch %v: %s", s.votingEpoch, err)
s.log.Errorf("Failed to compute vote for epoch %v: %s", s.votingEpoch, err)
}
s.state = stateAcceptVote
_, nowelapsed, _ := epochtime.Now()
Expand All @@ -136,8 +152,8 @@ func (s *state) fsm() <-chan time.Time {
// See if consensus doc was retrieved from the appchain
_, ok := s.documents[epoch+1]
if ok {
s.state = stateWaitBlockDesc
sleep = MixPublishDeadline + nextEpoch
s.state = stateDescriptorSend
sleep = MixPublishDeadline + nextEpoch + s.courtessyDelay()
s.votingEpoch++
} else {
s.log.Error("No document for epoch %v", epoch+1)
Expand Down Expand Up @@ -189,7 +205,9 @@ func (s *state) getVote(epoch uint64) (*pki.Document, error) {

func (s *state) sendVoteToAppchain(doc *pki.Document, epoch uint64) {
if err := s.chPKISetDocument(doc); err != nil {
s.log.Errorf("❌ sendVoteToAppchain: Error setting document for epoch %v: %v", epoch, err)
s.log.Errorf("❌ sendVoteToAppchain: Error setting document for epoch %d: %v", epoch, err)
} else {
s.log.Noticef("✅ sendVoteToAppchain: Set document for epoch %d", epoch)
}
}

Expand Down Expand Up @@ -383,19 +401,20 @@ func (s *state) pruneDocuments() {
delete(s.documents, e)
}
}
for e := range s.descriptors {
if e < cmpEpoch {
delete(s.descriptors, e)
}
}
}

// Ensure that the descriptor is from an allowed peer according to the appchain
// Ensure that the descriptor is from the local registered node
func (s *state) isDescriptorAuthorized(desc *pki.MixDescriptor) bool {
node, err := s.chNodesGet(desc.Name)
if err != nil {
s.log.Debugf("state: Failed to retrive node=%s from appchain: %v", desc.Name, err)
return false
}
node := s.authorizedNode

pk := hash.Sum256(desc.IdentityKey)
if pk != hash.Sum256(node.IdentityKey) {
s.log.Debugf("state: IdentityKey mismatch for node %s", desc.Name)
s.log.Debugf("pki: ❌ isDescriptorAuthorized: IdentityKey mismatch for node %s", desc.Name)
return false
}

Expand All @@ -411,29 +430,59 @@ func (s *state) isDescriptorAuthorized(desc *pki.MixDescriptor) bool {
}

func (s *state) onDescriptorUpload(rawDesc []byte, desc *pki.MixDescriptor, epoch uint64) error {
s.log.Noticef("pki: ⭐ onDescriptorUpload; Node name=%v, epoch=%v", desc.Name, epoch)
s.Lock()
defer s.Unlock()

// Note: Caller ensures that the epoch is the current epoch +- 1.
pk := hash.Sum256(desc.IdentityKey)

s.RLock()
doc := s.documents[epoch]
s.RUnlock()
// Get the public key -> descriptor map for the epoch.
_, ok := s.descriptors[epoch]
if !ok {
s.descriptors[epoch] = make(map[[publicKeyHashSize]byte]*pki.MixDescriptor)
}

if doc != nil {
// Check for redundant uploads.
d, ok := s.descriptors[epoch][pk]
if ok {
// If the descriptor changes, then it will be rejected to prevent
// nodes from reneging on uploads.
serialized, err := d.MarshalBinary()
if err != nil {
return err
}
if !hmac.Equal(serialized, rawDesc) {
return fmt.Errorf("state: node %s (%x): Conflicting descriptor for epoch %v", desc.Name, hash.Sum256(desc.IdentityKey), epoch)
}

// Redundant uploads that don't change are harmless.
return nil
}

// Ok, this is a new descriptor.
if s.documents[epoch] != nil {
// If there is a document already, the descriptor is late, and will
// never appear in a document, so reject it.
return fmt.Errorf("pki: ❌ Node %x: Late descriptor upload for epoch %v", pk, epoch)
return fmt.Errorf("state: Node %v: Late descriptor upload for for epoch %v", desc.IdentityKey, epoch)
}

// Store the parsed descriptor
s.descriptors[epoch][pk] = desc

s.log.Noticef("Node %x: Successfully submitted descriptor for epoch %v.", pk, epoch)
return nil
}

func (s *state) submitDescriptorToAppchain(desc *pki.MixDescriptor, epoch uint64) {
// Register the mix descriptor with the appchain, which will:
// - reject redundant descriptors (even those that didn't change)
// - reject descriptors if document for the epoch exists
if err := s.chPKISetMixDescriptor(desc, epoch); err != nil {
return fmt.Errorf("pki: ❌ Failed to set mix descriptor for node %d, epoch=%v: %v", desc.Name, epoch, err)
s.log.Errorf("❌ submitDescriptorToAppchain: Failed to set mix descriptor for node %v, epoch=%v: %v", desc.Name, epoch, err)
}

epochCurrent, _, _ := epochtime.Now()
s.log.Noticef("pki: ✅ Submitted descriptor to appchain for Node name=%v, epoch=%v (in epoch=%v)", desc.Name, epoch, epochCurrent)
return nil
s.log.Noticef("✅ submitDescriptorToAppchain: Submitted descriptor to appchain for node %v, epoch=%v (in epoch=%v)", desc.Name, epoch, epochCurrent)
}

func (s *state) documentForEpoch(epoch uint64) ([]byte, error) {
Expand Down Expand Up @@ -482,13 +531,24 @@ func newState(s *Server) (*state, error) {
st.s = s
st.log = s.logBackend.GetLogger("state")

// set voting schedule at runtime

st.log.Debugf("State initialized with epoch Period: %s", epochtime.Period)
st.log.Debugf("State initialized with RandomCourtessyDelay: %s", RandomCourtessyDelay)
st.log.Debugf("State initialized with MixPublishDeadline: %s", MixPublishDeadline)
st.log.Debugf("State initialized with DescriptorBlockDeadline: %s", DescriptorBlockDeadline)
st.log.Debugf("State initialized with AuthorityVoteDeadline: %s", AuthorityVoteDeadline)
st.log.Debugf("State initialized with PublishConsensusDeadline: %s", PublishConsensusDeadline)
st.log.Debugf("State initialized with DocGenerationDeadline: %s", DocGenerationDeadline)

ccbor, err := cbor.CanonicalEncOptions().EncMode()
if err != nil {
panic(err)
}
st.ccbor = ccbor

chainBridgeLogger := s.logBackend.GetLogger("state:chainBridge")
// Init AppChain communications (chainbridge)
chainBridgeLogger := s.logBackend.GetLogger("state:chain")
st.chainBridge = chainbridge.NewChainBridge(filepath.Join(s.cfg.Server.DataDir, "appchain.sock"))
st.chainBridge.SetErrorHandler(func(err error) {
chainBridgeLogger.Errorf("Error: %v", err)
Expand All @@ -500,25 +560,66 @@ func newState(s *Server) (*state, error) {
chainBridgeLogger.Fatalf("Error: %v", err)
}

// Initialize the authorized peer tables.
st.registeredLocalNodes = make(map[[publicKeyHashSize]byte]bool)
for _, v := range st.s.cfg.Mixes {
st.chNodesRegister(v, false, false)
// Load the authorized local node from configuration

// return a single node configuration and its node type
extractNodeFromCfg := func() (*config.Node, bool, bool) {
if len(st.s.cfg.GatewayNodes) == 1 {
return st.s.cfg.GatewayNodes[0], true, false
}
if len(st.s.cfg.ServiceNodes) == 1 {
return st.s.cfg.ServiceNodes[0], false, true
}
if len(st.s.cfg.Mixes) == 1 {
return st.s.cfg.Mixes[0], false, false
}
return nil, false, false
}
for _, v := range st.s.cfg.GatewayNodes {
st.chNodesRegister(v, true, false)
v, isGatewayNode, isServiceNode := extractNodeFromCfg()
if v == nil {
st.log.Fatalf("❌ Error: Invalid configuration for a single local node")
}
for _, v := range st.s.cfg.ServiceNodes {
st.chNodesRegister(v, false, true)

pkiSignatureScheme := signSchemes.ByName(st.s.cfg.Server.PKISignatureScheme)
var identityPublicKey sign.PublicKey
if filepath.IsAbs(v.IdentityPublicKeyPem) {
identityPublicKey, err = signpem.FromPublicPEMFile(v.IdentityPublicKeyPem, pkiSignatureScheme)
if err != nil {
panic(err)
}
} else {
pemFilePath := filepath.Join(st.s.cfg.Server.DataDir, v.IdentityPublicKeyPem)
identityPublicKey, err = signpem.FromPublicPEMFile(pemFilePath, pkiSignatureScheme)
if err != nil {
panic(err)
}
}

if len(st.registeredLocalNodes) > 1 {
st.log.Fatalf("Error: Configuration found for more than one local node")
// Node Registration: check if node is already registered before registering and rechecking
st.authorizedNode, err = st.chNodesGet(v.Identifier)
if err != nil {
if err := st.chNodesRegister(v, identityPublicKey, isGatewayNode, isServiceNode); err != nil {
st.log.Fatalf("❌ Error: node registration failed:", err)
}
time.Sleep(time.Duration(1) * time.Second)
st.authorizedNode, err = st.chNodesGet(v.Identifier)
if err != nil {
s.log.Fatalf("❌ Error: Failed to get node=%s from appchain: %v", v.Identifier, err)
}
}

// Ensure node appchain registration matches the local node configuration
pk := hash.Sum256From(identityPublicKey)
if pk != hash.Sum256(st.authorizedNode.IdentityKey) {
s.log.Fatalf("❌ Error: IdentityKey mismatch between node registration and configuration")
}

st.log.Noticef("✅ Node registered with Identifier '%s', Identity key hash '%x'", v.Identifier, pk)

st.log.Debugf("State initialized with epoch Period: %s", epochtime.Period)

st.documents = make(map[uint64]*pki.Document)
st.descriptors = make(map[uint64]map[[publicKeyHashSize]byte]*pki.MixDescriptor)

epoch, elapsed, nextEpoch := epochtime.Now()
st.log.Debugf("Epoch: %d, elapsed: %s, remaining time: %s", epoch, elapsed, nextEpoch)
Expand Down
36 changes: 5 additions & 31 deletions pki/state_chain_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ package main

import (
"fmt"
"path/filepath"

"github.com/fxamacker/cbor/v2"
"github.com/katzenpost/hpqc/hash"
"github.com/katzenpost/hpqc/sign"
signpem "github.com/katzenpost/hpqc/sign/pem"
signSchemes "github.com/katzenpost/hpqc/sign/schemes"
"github.com/katzenpost/katzenpost/core/pki"

"github.com/0KnowledgeNetwork/appchain-agent/clients/go/chainbridge"
Expand All @@ -32,30 +28,11 @@ func (s *state) chNodesGet(name string) (*chainbridge.Node, error) {
return &node, nil
}

func (st *state) chNodesRegister(v *config.Node, isGatewayNode bool, isServiceNode bool) {
pkiSignatureScheme := signSchemes.ByName(st.s.cfg.Server.PKISignatureScheme)

var err error
var identityPublicKey sign.PublicKey
if filepath.IsAbs(v.IdentityPublicKeyPem) {
identityPublicKey, err = signpem.FromPublicPEMFile(v.IdentityPublicKeyPem, pkiSignatureScheme)
if err != nil {
panic(err)
}
} else {
pemFilePath := filepath.Join(st.s.cfg.Server.DataDir, v.IdentityPublicKeyPem)
identityPublicKey, err = signpem.FromPublicPEMFile(pemFilePath, pkiSignatureScheme)
if err != nil {
panic(err)
}
}

func (st *state) chNodesRegister(v *config.Node, identityPublicKey sign.PublicKey, isGatewayNode bool, isServiceNode bool) error {
payload, err := identityPublicKey.MarshalBinary()
if err != nil {
st.log.Errorf("failed to marshal identityPublicKey: %v", err)
return
return fmt.Errorf("failed to marshal identityPublicKey: %v", err)
}
pk := hash.Sum256From(identityPublicKey)
chCommand := fmt.Sprintf(
chainbridge.Cmd_nodes_register,
v.Identifier,
Expand All @@ -64,16 +41,13 @@ func (st *state) chNodesRegister(v *config.Node, isGatewayNode bool, isServiceNo
chResponse, err := st.chainBridge.Command(chCommand, payload)
st.log.Debugf("ChainBridge response (%s): %+v", chCommand, chResponse)
if err != nil {
st.log.Errorf("ChainBridge command error: %v", err)
return
return fmt.Errorf("ChainBridge command error: %v", err)
}
if chResponse.Error != "" && chResponse.Error != chainbridge.Err_nodes_alreadyRegistered {
st.log.Errorf("ChainBridge response error: %v", chResponse.Error)
return
return fmt.Errorf("ChainBridge response error: %v", chResponse.Error)
}

st.registeredLocalNodes[pk] = true
st.log.Noticef("Local node registered with Identifier '%s', Identity key hash '%x'", v.Identifier, pk)
return nil
}

func (s *state) chPKIGetGenesisEpoch() (uint64, error) {
Expand Down
2 changes: 1 addition & 1 deletion pki/wire_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (a *wireAuthenticator) IsPeerValid(creds *wire.PeerCredentials) bool {
pk := [hash.HashSize]byte{}
copy(pk[:], creds.AdditionalData[:hash.HashSize])

_, isRegistered := a.s.state.registeredLocalNodes[pk]
isRegistered := (pk == hash.Sum256(a.s.state.authorizedNode.IdentityKey))
if isRegistered {
a.s.log.Debugf("Accepting authority authentication from locally registered node with public key '%x'", pk)
a.isMix = true // Gateways and service nodes and mixes are all mixes.
Expand Down

0 comments on commit 46b9d30

Please sign in to comment.