-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
585 additions
and
541 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,3 +24,4 @@ go-space-thumb* | |
.env | ||
*secret | ||
go-ma-actor | ||
go-home |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package actor | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/bahner/go-ma/entity" | ||
"github.com/bahner/go-ma/key/set" | ||
"github.com/bahner/go-ma/msg" | ||
pubsub "github.com/libp2p/go-libp2p-pubsub" | ||
|
||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
const MESSAGES_BUFFERSIZE = 100 | ||
|
||
type Actor struct { | ||
|
||
// This context is used to cancel the Listen() function. | ||
ctx context.Context | ||
|
||
// ps service ointer | ||
ps *pubsub.PubSub | ||
|
||
// All actors must be entities. | ||
// Ideally they should be the same, but then ma becomes a bit too opinionated. | ||
Entity *entity.Entity | ||
|
||
// Private is the topic where we receive envelopes from other actors. | ||
// It's basically a private channel with the DIDDocument keyAgreement as topic. | ||
Private *pubsub.Topic | ||
|
||
// We basically receive signed messages from the room we're in here. | ||
// It's basically a public channel with the assertionMethod from the DIDDocument of | ||
// the room we're in as topic. | ||
// Others can subscribe to this topic and send us messages, as long as they are signed. | ||
Public *pubsub.Topic | ||
|
||
// Incoming messages from the actor to AssertionMethod topic. It's bascially a broadcast channel. | ||
// But you could use it to send messages to a specific actor or to all actors in a group. | ||
// This is a public channel. There will need to be some generic To (recipients) in the mesage | ||
// for example "broadcast", so that one actor can send a message to everybody in the room. | ||
// That is a TODO. | ||
// We receive the message contents here after verification or decryption. | ||
Messages chan *msg.Message | ||
} | ||
|
||
// Creates a new actor from an entity. | ||
// Takes a pubsub.PubSub service, an entity and a forcePublish flag. | ||
// The forcePublish is to override existing keys in IPFS. | ||
func New(ctx context.Context, ps *pubsub.PubSub, e *entity.Entity, forcePublish bool) (*Actor, error) { | ||
|
||
log.Debugf("actor/new: Setting Actor Entity: %v", e) | ||
|
||
var err error | ||
a := &Actor{} | ||
|
||
// Assign provided resource pointers | ||
a.ctx = ctx | ||
a.ps = ps | ||
|
||
// Firstly create assign entity to actor | ||
a.Entity = e | ||
|
||
// Create topic for incoming envelopes | ||
a.Private, err = ps.Join(a.Entity.Doc.KeyAgreement) | ||
if err != nil { | ||
if err.Error() != "topic already exists" { | ||
return nil, fmt.Errorf("new_actor: Failed to join topic: %v", err) | ||
} | ||
} | ||
|
||
// Create subscription to topic for incoming messages | ||
a.Public, err = ps.Join(a.Entity.Doc.AssertionMethod) | ||
if err != nil { | ||
return nil, fmt.Errorf("new_actor: Failed to join topic: %v", err) | ||
} | ||
|
||
// Set the messages channel | ||
a.Messages = make(chan *msg.Message, MESSAGES_BUFFERSIZE) | ||
|
||
// Publish the entity | ||
err = a.Entity.Publish(forcePublish) | ||
if err != nil { | ||
return nil, fmt.Errorf("new_actor: Failed to publish Entity: %v", err) | ||
} | ||
|
||
log.Debugf("new_actor: Actor initialized: %s", a.Entity.DID.Fragment) | ||
return a, nil | ||
|
||
} | ||
|
||
// Creates a new actor from a keyset. | ||
// Takes a pubsub.PubSub service, a keyset and a forcePublish flag. | ||
func NewFromKeyset(ctx context.Context, ps *pubsub.PubSub, k *set.Keyset, forcePublish bool) (*Actor, error) { | ||
|
||
log.Debugf("Setting Actor Entity: %v", k) | ||
e, err := entity.NewFromKeyset(k) | ||
if err != nil { | ||
return nil, fmt.Errorf("new_actor: Failed to create Entity: %v", err) | ||
} | ||
|
||
return New(ctx, ps, e, forcePublish) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package actor | ||
|
||
import "fmt" | ||
|
||
// Takes a room topic and joins it. The room is the DID of the room actor. | ||
func (a *Actor) Enter(room string) error { | ||
|
||
var err error | ||
|
||
// First close the current subscription | ||
a.Public.Close() | ||
|
||
a.Public, err = a.ps.Join(room) | ||
if err != nil { | ||
return fmt.Errorf("home: %v failed to join topic: %v", a, err) | ||
} | ||
|
||
return nil | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package actor | ||
|
||
import ( | ||
"fmt" | ||
) | ||
|
||
func (a *Actor) Listen(outputChannel chan<- string) error { | ||
// Subscribe to Inbox topic | ||
inboxSub, err := a.Private.Subscribe() | ||
if err != nil { | ||
return fmt.Errorf("failed to subscribe to Inbox topic: %v", err) | ||
} | ||
defer inboxSub.Cancel() | ||
|
||
// Subscribe to Space topic | ||
spaceSub, err := a.Public.Subscribe() | ||
if err != nil { | ||
return fmt.Errorf("failed to subscribe to Space topic: %v", err) | ||
} | ||
defer spaceSub.Cancel() | ||
|
||
// Start a goroutine for Inbox subscription | ||
go a.handlePrivateMessages(inboxSub) | ||
|
||
// Start a goroutine for Space subscription | ||
// Assuming you have a similar function for Space | ||
go a.handlePublicMessages(spaceSub) | ||
|
||
// Wait for context cancellation (or other exit conditions) | ||
<-a.ctx.Done() | ||
return a.ctx.Err() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package actor | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/bahner/go-ma/msg" | ||
"github.com/bahner/go-ma/msg/envelope" | ||
pubsub "github.com/libp2p/go-libp2p-pubsub" | ||
) | ||
|
||
func (a *Actor) receivePrivateEnvelopes(sub *pubsub.Subscription) (*msg.Message, error) { | ||
|
||
msgData, err := sub.Next(a.ctx) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to receive message from inbox: %v", err) | ||
} | ||
|
||
e, err := envelope.UnmarshalFromCBOR(msgData.Data) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal envelope from CBOR: %v", err) | ||
} | ||
|
||
message, err := e.Open(a.Entity.Keyset.EncryptionKey.PrivKey) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to open envelope: %v", err) | ||
} | ||
|
||
return message, nil | ||
} | ||
|
||
func (a *Actor) handlePrivateMessages(sub *pubsub.Subscription) { | ||
for { | ||
select { | ||
case <-a.ctx.Done(): | ||
// Exit goroutine when context is cancelled | ||
return | ||
default: | ||
// Read message from Inbox subscription | ||
if msg, err := a.receivePrivateEnvelopes(sub); err == nil { | ||
a.Messages <- msg | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.