Skip to content

Commit

Permalink
looking better node
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Shinebourne committed Dec 9, 2023
1 parent d89d2aa commit 79bdfdd
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 213 deletions.
7 changes: 0 additions & 7 deletions .vscode/settings.json

This file was deleted.

7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ decentralised nature. They address many of the pitfalls of centralised architect
single points of failure, resulting in fault-tolerant networks that can behave autonomously. Many of the current efforts
to design peer-to-peer systems use structured elements which improve the performance but re-introduce precariousness
present in centralised systems. Butter is a peer-to-peer (p2p) framework loosely inspired by other project such as
Gnutella, JXTA and libp2p, its goal is to explore what is achievable in unstructured p2p networks.
[Gnutella](https://www.gnu.org/philosophy/gnutella.en.html), [JXTA](https://en.wikipedia.org/wiki/JXTA) and
[libp2p](https://libp2p.io/), its goal is to explore what is achievable in unstructured p2p networks.

Each of the problems in building a p2p middleware corresponds to a module of the framework. Peer discovery is handled
by the `discover` package using a multicast protocol, the NAT traversal and wider internet peer discovery strategy is
Expand All @@ -34,8 +35,8 @@ Butter differentiating factors
maintaining redundant copies in small groups of specially selected 'diverse' nodes while attempting to robustly
maintain data.
- Diversity is a core concept of the design. The idea is premised on the fact that 'diverse' nodes are likely to be less
reliant on common infrastructure and more likely to have different knowledge of the network (e.g. other nodes or
information stored). The diversity metric is hence relative to each node and each node, based on its knowledge,
reliant on common infrastructure and more likely to have different knowledge of the network (e.g. other nodes or
information stored). The diversity metric is hence relative to each node and each node, based on its knowledge,
decides which nodes are more diverse than others. The metric is then used to determine which nodes to enter into PCGs
with, which nodes to query for information and which nodes to maintain relationships with.

Expand Down
134 changes: 101 additions & 33 deletions betterNode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,131 @@
package betterNode

import (
"github.com/butter-network/butter/node"
uuid "github.com/nu7hatch/gouuid"
"fmt"
"github.com/nu7hatch/gouuid"
"github.com/pbnjay/memory"
"log"
"net"
"time"
)

const EOF byte = 26 // EOF code

type Listener interface {
Accept() (Conn, error)
Close() error
Addr() Addr
}

type Conn interface {
read()
write()
}

type CommunicationInterface interface {
Listen() // blocking - continuously open channel listening for incoming connections
Request(CommunicationInterface, []byte, []byte) ([]byte, error) // called every time a new request needs to be made
Listen() (net.Listener, error)
Connect(addr string) (net.Conn, error)
}

type Overlay interface {
Node() *Node
AvailableStorage() uint64
// Add client and server behaviours specific to the overlay
// add storage protocol specific to the overlay
}

type Node struct {
id uuid.UUID
commInterface CommunicationInterface // could make this a slice and allow a node to communicate over many interfaces simultaneously
knownHosts node.KnownHosts
started time.Time
clientBehaviours []func(Overlay) // can only access the Request() method
serverBehaviours map[string]func(Overlay, []byte) []byte // can only access the Request() method
ambassador bool
storageMemoryCap uint64
type KnownHost struct {
addr net.Addr
// TODO: quality metadata for known host optimisation
}

func NewNode(commInterface CommunicationInterface) (Node, error) {
var node Node
type Node struct {
id uuid.UUID
comsInterfaces []CommunicationInterface // Allow a node to communicate over multiple interfaces simultaneously
started time.Time
overlayStack []Overlay
}

func NewNode(commInterfaces []CommunicationInterface, allocatedMemoryMb uint64) (*Node, error) {
u4, err := uuid.NewV4()
if err != nil {
return node, err
return nil, err
}

// Check that there is enough memory available to allocate to the node
allocatedMemoryBytes := allocatedMemoryMb * 1024 * 1024
availableMemoryBytes := memory.FreeMemory()
if allocatedMemoryBytes > availableMemoryBytes {
return nil, fmt.Errorf("allocated memory (%v bytes) is greater than available memory (%v bytes)", allocatedMemoryBytes, availableMemoryBytes)
}

node.id = *u4
node.commInterface = commInterface
node.clientBehaviours = make([]func(Overlay), 0)
node.serverBehaviours = make(map[string]func(Overlay, []byte) []byte)
// FIXME: Allocate 10% of allocated memory to storage of known host data
//knownHostMem := uint64(0.1 * float64(allocatedMemoryBytes))

node := &Node{
id: *u4,
comsInterfaces: commInterfaces,
overlayStack: make([]Overlay, 0),
}

// persistent storage should be part of the overlay network, not the node

return node, nil
}

func (n *Node) Start() {
n.started = time.Now()

n.commInterface.Listen()
// Start listening on all communication interfaces
for _, comsInterface := range n.comsInterfaces {
listener, err := comsInterface.Listen()
if err != nil {
fmt.Printf("Error starting listener: %v\n", err)
continue
}

go n.handleConnections(listener)
}

// Start the node's default background behaviors
// 1. Do peer discovery
}

func (n *Node) handleConnections(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Error accepting connection: %v\n", err)
continue
}
log.Printf("Ready to accept and handle connections accepted on %v\n\n", listener.Addr().String())

// Handle the connection in a separate goroutine
go n.handleConnection(conn)
}
}

func (n *Node) handleConnection(conn net.Conn) {
// Your code to handle communication over this connection goes here
// Unpack the payload and determine the connection route/function to call
// Call the appropriate function and send the response back to the client
}

func main() {
// Create a new node with desired communication interfaces and known hosts
// Initialize TCP communication interface
tcpComm, err := NewTCPCommunication() // Specify your TCP listen address
if err != nil {
fmt.Printf("Error creating TCP communication: %v\n", err)
return
}

// Initialize Pipe communication interface
pipeComm, err := NewPipeCommunication() // Use a unique socket file path
if err != nil {
fmt.Printf("Error creating Pipe communication: %v\n", err)
return
}

commInterfaces := []CommunicationInterface{
tcpComm,
pipeComm,
}

node, err := NewNode(commInterfaces, 2)
if err != nil {
fmt.Printf("Error creating node: %v\n", err)
return
}

// Start the node
node.Start()
}
71 changes: 15 additions & 56 deletions betterNode/pipe.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,29 @@
package betterNode

// TODO: complete the test node (pipe) - server/client behaviour
import "net"

import (
"bufio"
"bytes"
mock_conn "github.com/jordwest/mock-conn"
"net"
)
type PipeCommunication struct{}

type Pipe struct {
//conn mock_conn.Conn
func NewPipeCommunication() (*PipeCommunication, error) {
return &PipeCommunication{}, nil
}

func (p *Pipe) Listen() {
listener := bufio.NewReader(p.conn.Server)
for {
var buffer bytes.Buffer
for {
b, err := listener.ReadByte()
if err != nil {
break
}
if b == EOF {
break
}
buffer.WriteByte(b)
}
// Handle connection
func (pc *PipeCommunication) Listen() (net.Listener, error) {
listener, err := net.Listen("unix", "/tmp/pipe.sock")
if err != nil {
return nil, err
}
}

func (p *Pipe) Request(commInterface CommunicationInterface, route []byte, payload []byte) ([]byte, error) {
//pipeInterface := commInterface.(*Pipe)
// create a new conn
conn := mock_conn.NewConn()
conn.
return nil, nil
}
defer listener.Close()

func Read(conn *net.Conn) ([]byte, error) {
reader := bufio.NewReader(*conn)
var buffer bytes.Buffer
for {
b, err := reader.ReadByte()
if err != nil {
return nil, err
}
if b == EOF {
break
}
buffer.WriteByte(b)
}
return buffer.Bytes(), nil
return listener, nil
}

func Write(conn *net.Conn, packet []byte) error {
writer := bufio.NewWriter(*conn)
appended := append(packet, EOF)
_, err := writer.Write(appended)
func (pc *PipeCommunication) Connect(addr string) (net.Conn, error) {
conn, err := net.Dial("unix", addr)
if err != nil {
return err
return nil, err
}
err = writer.Flush()
if err != nil {
return err
}
return nil

return conn, nil
}
89 changes: 10 additions & 79 deletions betterNode/tcp.go
Original file line number Diff line number Diff line change
@@ -1,98 +1,29 @@
package betterNode

import (
"bufio"
"bytes"
"log"
"net"
)
import "net"

type TCP struct {
addr string
port int
listener net.Listener
}

func (n *TCP) Listen() {
for {
_, err := n.listener.Accept()
if err != nil {
// Avoid fatal errors at all costs - we want to maximise node availability
log.Println("Node is unable to accept incoming connections due to: ", err.Error())
continue // forces next iteration of the loop skipping any code in between
}
type TCPCommunication struct{}

// Pass connection to request handler in a new goroutine - allows a node to handle multiple connections at once
//go node.HandleRequest(conn, overlay)
}
func NewTCPCommunication() (*TCPCommunication, error) {
return &TCPCommunication{}, nil
}

func (n *TCP) Request(commInterface CommunicationInterface, route []byte, payload []byte) ([]byte, error) {
tcpInterface := commInterface.(*TCP)
conn, err := createConnections(tcpInterface.addr, tcpInterface.port)
func (tc *TCPCommunication) Listen() (net.Listener, error) {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
return nil, err
}
defer conn.Close()

packet := append(route, payload...)
packet = append(packet, EOF)

err = Write(&conn, packet)
if err != nil {
return nil, err
}

response, err := Read(&conn)
if err != nil {
return nil, err
}

return response, nil
defer listener.Close()

return listener, nil
}

func createConnections(address string, port int) (net.Conn, error) {
socketAddr := address + ":" + string(port)
tcpAddr, err := net.ResolveTCPAddr("tcp", socketAddr)
if err != nil {
return nil, err
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
func (tc *TCPCommunication) Connect(addr string) (net.Conn, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}

return conn, nil
}

func Read(conn *net.Conn) ([]byte, error) {
reader := bufio.NewReader(*conn)
var buffer bytes.Buffer
for {
b, err := reader.ReadByte()
if err != nil {
return nil, err
}
if b == EOF {
break
}
buffer.WriteByte(b)
}
return buffer.Bytes(), nil
}

func Write(conn *net.Conn, packet []byte) error {
writer := bufio.NewWriter(*conn)
appended := append(packet, EOF)
_, err := writer.Write(appended)
if err != nil {
return err
}
err = writer.Flush()
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 79bdfdd

Please sign in to comment.