Skip to content

Commit

Permalink
Merge pull request neilalexander#31 from neilalexander/quic
Browse files Browse the repository at this point in the history
QUIC transport
  • Loading branch information
neilalexander authored Nov 3, 2023
2 parents 50182bf + 98b69b5 commit 8bed266
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 262 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: "Docker"
on:
push:

env:
PLATFORMS: linux/amd64,linux/arm64,linux/arm/v7

jobs:
monolith:
name: Docker image
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to GitHub Containers
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build Docker image
uses: docker/build-push-action@v3
with:
context: .
platforms: ${{ env.PLATFORMS }}
push: true
tags: |
ghcr.io/${{ github.repository_owner }}/yggmail:${{ github.ref_name }}
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ EXPOSE 1143/tcp
EXPOSE 1025/tcp
VOLUME /etc/yggmail

ENTRYPOINT ["/usr/bin/yggmail", "-database=/etc/yggmail/yggmail.db"]
ENTRYPOINT ["/usr/bin/yggmail", "-smtp=:1025", "-imap=:1143", "-database=/etc/yggmail/yggmail.db"]
9 changes: 2 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/fatih/color v1.15.0
github.com/gologme/log v1.3.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/neilalexander/utp v0.1.1-0.20210705212447-691f29ad692b
github.com/quic-go/quic-go v0.40.0
github.com/yggdrasil-network/yggdrasil-go v0.5.1
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.14.0
Expand All @@ -21,22 +21,17 @@ require (

require (
github.com/Arceliar/phony v0.0.0-20220903101357-530938a4b13d // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/perf v1.0.0 // indirect
github.com/anacrolix/missinggo/v2 v2.5.1 // indirect
github.com/anacrolix/sync v0.5.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/bits-and-blooms/bloom/v3 v3.6.0 // indirect
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20231101202521-4ca4178f5c7a // indirect
github.com/hjson/hjson-go/v4 v4.3.1 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/quic-go/qtls-go1-20 v0.4.1 // indirect
github.com/quic-go/quic-go v0.40.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
go.uber.org/mock v0.3.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
Expand Down
234 changes: 0 additions & 234 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion internal/imapserver/imap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewIMAPServer(backend *Backend, addr string, insecure bool) (*IMAPServer, *
s.server.AllowInsecureAuth = insecure
//s.server.Debug = os.Stdout
s.server.Enable(idle.NewExtension())
s.server.Enable(notify)
//s.server.Enable(notify)
s.server.EnableAuth(sasl.Login, func(conn server.Conn) sasl.Server {
return sasl.NewLoginServer(func(username, password string) error {
_, err := s.backend.Login(nil, username, password)
Expand Down
6 changes: 3 additions & 3 deletions internal/smtpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewQueues(config *config.Config, log *log.Logger, transport transport.Trans
Transport: transport,
Storage: storage,
}
go qs.manager()
time.AfterFunc(time.Second*5, qs.manager)
return qs
}

Expand All @@ -51,7 +51,7 @@ func (qs *Queues) manager() {
for _, destination := range destinations {
_, _ = qs.queueFor(destination)
}
time.AfterFunc(time.Minute*10, qs.manager)
time.AfterFunc(time.Minute, qs.manager)
}

func (qs *Queues) QueueFor(from string, rcpts []string, content []byte) error {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (qs *Queues) queueFor(server string) (*Queue, error) {
if !ok {
return nil, fmt.Errorf("type assertion error")
}
if q.running.CAS(false, true) {
if q.running.CompareAndSwap(false, true) {
go q.run()
}
return q, nil
Expand Down
170 changes: 154 additions & 16 deletions internal/transport/yggdrasil.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,45 @@
package transport

import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"fmt"
"log"
"net"
"regexp"
"sync"
"time"

iwt "github.com/Arceliar/ironwood/types"
"github.com/fatih/color"
gologme "github.com/gologme/log"
"github.com/neilalexander/utp"
"github.com/quic-go/quic-go"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast"
)

type YggdrasilTransport struct {
Sessions *utp.Socket
listener *quic.Listener
yggdrasil net.PacketConn
transport *quic.Transport
tlsConfig *tls.Config
quicConfig *quic.Config
incoming chan *yggdrasilSession
sessions sync.Map // string -> quic.Connection
dials sync.Map // string -> *yggdrasilDial
}

type yggdrasilSession struct {
quic.Connection
quic.Stream
}

type yggdrasilDial struct {
context.Context
context.CancelFunc
}

func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.PublicKey, peers []string, mcast bool) (*YggdrasilTransport, error) {
Expand All @@ -38,7 +59,9 @@ func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.Pu

cfg := config.GenerateConfig()
copy(cfg.PrivateKey, sk)
cfg.GenerateSelfSignedCertificate()
if err := cfg.GenerateSelfSignedCertificate(); err != nil {
return nil, err
}

var ygg *core.Core
var err error
Expand Down Expand Up @@ -73,25 +96,140 @@ func NewYggdrasilTransport(log *log.Logger, sk ed25519.PrivateKey, pk ed25519.Pu
}
}

us, err := utp.NewSocketFromPacketConnNoClose(ygg)
if err != nil {
return nil, fmt.Errorf("utp.NewSocketFromPacketConnNoClose: %w", err)
tr := &YggdrasilTransport{
tlsConfig: &tls.Config{
ServerName: hex.EncodeToString(ygg.PublicKey()),
Certificates: []tls.Certificate{
*cfg.Certificate,
},
InsecureSkipVerify: true,
},
quicConfig: &quic.Config{
HandshakeIdleTimeout: time.Second * 5,
MaxIdleTimeout: time.Second * 60,
},
transport: &quic.Transport{
Conn: ygg,
},
yggdrasil: ygg,
incoming: make(chan *yggdrasilSession, 1),
}

if tr.listener, err = tr.transport.Listen(tr.tlsConfig, tr.quicConfig); err != nil {
return nil, fmt.Errorf("quic.Listen: %w", err)
}

go tr.connectionAcceptLoop()
return tr, nil
}

func (t *YggdrasilTransport) connectionAcceptLoop() {
for {
qc, err := t.listener.Accept(context.TODO())
if err != nil {
return
}

host := qc.RemoteAddr().String()
if eqc, ok := t.sessions.LoadAndDelete(host); ok {
eqc := eqc.(quic.Connection)
_ = eqc.CloseWithError(0, "Connection replaced")
}
t.sessions.Store(host, qc)
if dial, ok := t.dials.LoadAndDelete(host); ok {
dial := dial.(*yggdrasilDial)
dial.CancelFunc()
}

go t.streamAcceptLoop(qc)
}
}

func (t *YggdrasilTransport) streamAcceptLoop(qc quic.Connection) {
host := qc.RemoteAddr().String()

defer qc.CloseWithError(0, "Timed out") // nolint:errcheck
defer t.sessions.Delete(host)

for {
qs, err := qc.AcceptStream(context.Background())
if err != nil {
break
}
t.incoming <- &yggdrasilSession{qc, qs}
}
return &YggdrasilTransport{
Sessions: us,
}, nil
}

func (t *YggdrasilTransport) Dial(host string) (net.Conn, error) {
addr := make(iwt.Addr, ed25519.PublicKeySize)
k, err := hex.DecodeString(host)
if err != nil {
return nil, err
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
var retry bool
retry:
qc, ok := t.sessions.Load(host)
if !ok {
if dial, ok := t.dials.Load(host); ok {
<-dial.(*yggdrasilDial).Done()
}
if qc, ok = t.sessions.Load(host); !ok {
dialctx, dialcancel := context.WithCancel(ctx)
defer dialcancel()

t.dials.Store(host, &yggdrasilDial{dialctx, dialcancel})
defer t.dials.Delete(host)

addr := make(iwt.Addr, ed25519.PublicKeySize)
k, err := hex.DecodeString(host)
if err != nil {
return nil, err
}
copy(addr, k)

if qc, err = t.transport.Dial(dialctx, addr, t.tlsConfig, t.quicConfig); err != nil {
return nil, err
}

qc := qc.(quic.Connection)
t.sessions.Store(host, qc)
go t.streamAcceptLoop(qc)
}
}
if qc == nil {
return nil, net.ErrClosed
} else {
qc := qc.(quic.Connection)
qs, err := qc.OpenStreamSync(ctx)
if err != nil {
if !retry {
retry = true
goto retry
}
return nil, err
}
// For some reason this is needed to kick the stream
_, err = qs.Write([]byte(" "))
return &yggdrasilSession{qc, qs}, err
}
copy(addr, k)
return t.Sessions.DialAddr(addr)
}

func (t *YggdrasilTransport) Listener() net.Listener {
return t.Sessions
return &yggdrasilListener{t}
}

type yggdrasilListener struct {
*YggdrasilTransport
}

func (t *yggdrasilListener) Accept() (net.Conn, error) {
return <-t.incoming, nil
}

func (t *yggdrasilListener) Addr() net.Addr {
return t.listener.Addr()
}

func (t *yggdrasilListener) Close() error {
if err := t.listener.Close(); err != nil {
return err
}
return t.yggdrasil.Close()
}

0 comments on commit 8bed266

Please sign in to comment.