Skip to content

Commit

Permalink
Registration API Functionality (#45)
Browse files Browse the repository at this point in the history
* Basic registration API scaffold

* Add registration-api to top-level Makefile

* Look for C2S payload, not TLSDecoySpec

* Add basic ZMQ proxy

* Add zmq-proxy to Makefile

* Add .PHONY targets to Makefile

* Switch application to connecting to ZMQ

* Switch API to binding socket

* Add logging info in /register endpoint

* Expect shared secret, FSP, VSP in payload

* Add default config values for registration.refraction.network

* Add ZMQ auth to registration API

* Auth TODO: done!

* Simplify client-to-API payload

* Use absolute imports

* Set up heartbeat to detect failed connections

* Add additional logging

* Extract ZMQ payload creation from endpoint handler

* Abstract registration publishing, test handler

* Re-work ZMQ architecture

* Clean up detect.c

* Add README for API

* Move flags from FSP to C2S

* Add ZMQ address options
  • Loading branch information
CarsonHoffman authored Aug 6, 2020
1 parent dfa52b3 commit 427e5f9
Show file tree
Hide file tree
Showing 23 changed files with 4,245 additions and 1,882 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ errno = "0.2.3"
radix = { git = "https://github.com/refraction-networking/radix" }
tuntap = { git = "https://github.com/ewust/tuntap.rs" }
ipnetwork = "^0.14.0"
protobuf = "2.6"
protobuf = "2.16.2"
hkdf = "0.7"
sha2 = "0.8.*"
hex = "0.3.*"
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CFLAGS = -Wall -DENABLE_BPF -DHAVE_PF_RING -DHAVE_PF_RING_ZC -DTAPDANCE_USE_PF_R
PROTO_RS_PATH=src/signalling.rs


all: rust libtd dark-decoy app ${PROTO_RS_PATH}
all: rust libtd dark-decoy app registration-api zmq-proxy ${PROTO_RS_PATH}

rust: ./src/*.rs
cargo build --${DEBUG_OR_RELEASE}
Expand All @@ -29,10 +29,17 @@ libtd:
dark-decoy: detect.c loadkey.c rust_util.c rust libtapdance
${CC} ${CFLAGS} -o $@ detect.c loadkey.c rust_util.c ${LIBS}

registration-api:
cd ./registration-api/ && make

zmq-proxy:
cd ./zmq-proxy/ && make

clean:
cargo clean
rm -f ${TARGETS} *.o *~

${PROTO_RS_PATH}:
cd ./proto/ && make

.PHONY: registration-api zmq-proxy
4 changes: 2 additions & 2 deletions application/lib/proxies.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func MinTransportProxy(regManager *RegistrationManager, clientConn *net.TCPConn,
}
defer covertConn.Close()

if reg.Flags&TdFlagProxyHeader != 0 {
if reg.Flags.GetProxyHeader() {
err = writePROXYHeader(covertConn, clientConn.RemoteAddr().String())
if err != nil {
logger.Printf("failed to send PROXY header to covert: %s", err)
Expand Down Expand Up @@ -212,7 +212,7 @@ func twoWayProxy(reg *DecoyRegistration, clientConn *net.TCPConn, originalDstIP
}
defer covertConn.Close()

if reg.Flags&TdFlagProxyHeader != 0 {
if reg.Flags.GetProxyHeader() {
err = writePROXYHeader(covertConn, clientConn.RemoteAddr().String())
if err != nil {
logger.Printf("failed to send PROXY header to covert: %s", err)
Expand Down
8 changes: 4 additions & 4 deletions application/lib/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewRegistrationManager() *RegistrationManager {
}
}

func (regManager *RegistrationManager) NewRegistration(c2s *pb.ClientToStation, conjureKeys *ConjureSharedKeys, flags [1]byte, includeV6 bool) (*DecoyRegistration, error) {
func (regManager *RegistrationManager) NewRegistration(c2s *pb.ClientToStation, conjureKeys *ConjureSharedKeys, includeV6 bool) (*DecoyRegistration, error) {

phantomAddr, err := regManager.PhantomSelector.Select(
conjureKeys.DarkDecoySeed, uint(c2s.GetDecoyListGeneration()), includeV6)
Expand All @@ -52,7 +52,7 @@ func (regManager *RegistrationManager) NewRegistration(c2s *pb.ClientToStation,
keys: conjureKeys,
Covert: c2s.GetCovertAddress(),
Mask: c2s.GetMaskedDecoyServerName(),
Flags: uint8(flags[0]),
Flags: c2s.Flags,
Transport: uint(c2s.GetTransport()), // hack
DecoyListVersion: c2s.GetDecoyListGeneration(),
RegistrationTime: time.Now(),
Expand Down Expand Up @@ -95,7 +95,7 @@ type DecoyRegistration struct {
DarkDecoy *net.IP
keys *ConjureSharedKeys
Covert, Mask string
Flags uint8
Flags *pb.RegistrationFlags
Transport uint
RegistrationTime time.Time
DecoyListVersion uint32
Expand All @@ -114,7 +114,7 @@ func (reg *DecoyRegistration) String() string {
Phantom string
SharedSecret string
Covert, Mask string
Flags uint8
Flags *pb.RegistrationFlags
Transport uint
RegTime time.Time
DecoyListVersion uint32
Expand Down
58 changes: 17 additions & 41 deletions application/main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package main

import (
"bytes"
"encoding/binary"
"fmt"
"flag"
"log"
"net"
"os"
"syscall"
"time"

dd "./lib"
"github.com/golang/protobuf/proto"
zmq "github.com/pebbe/zmq4"
dd "github.com/refraction-networking/conjure/application/lib"
pb "github.com/refraction-networking/gotapdance/protobuf"
)

Expand Down Expand Up @@ -62,7 +60,7 @@ func handleNewConn(regManager *dd.RegistrationManager, clientConn *net.TCPConn)
}*/
}

func get_zmq_updates(regManager *dd.RegistrationManager) {
func get_zmq_updates(connectAddr string, regManager *dd.RegistrationManager) {
logger := log.New(os.Stdout, "[ZMQ] ", log.Ldate|log.Lmicroseconds)
sub, err := zmq.NewSocket(zmq.SUB)
if err != nil {
Expand All @@ -71,11 +69,10 @@ func get_zmq_updates(regManager *dd.RegistrationManager) {
}
defer sub.Close()

bindAddr := "tcp://*:5591"
sub.Bind(bindAddr)
sub.Connect(connectAddr)
sub.SetSubscribe("")

logger.Printf("ZMQ listening on %v\n", bindAddr)
logger.Printf("ZMQ connected to %v\n", connectAddr)

for {

Expand Down Expand Up @@ -103,51 +100,26 @@ func get_zmq_updates(regManager *dd.RegistrationManager) {
}

func recieve_zmq_message(sub *zmq.Socket, regManager *dd.RegistrationManager) ([]*dd.DecoyRegistration, error) {
// var ipAddr []byte
// var covertAddrLen, maskedAddrLen [1]byte

var sharedSecret [32]byte
var fixedSizePayload [6]byte
var flags [1]byte
minMsgLen := 32 + 6 + 1 // + 16

msg, err := sub.RecvBytes(0)
if err != nil {
logger.Printf("error reading from ZMQ socket: %v\n", err)
return nil, err
}
if len(msg) < minMsgLen {
logger.Printf("short message of size %v\n", len(msg))
return nil, fmt.Errorf("short message of size %v", len(msg))
}

msgReader := bytes.NewReader(msg)

msgReader.Read(sharedSecret[:])
msgReader.Read(fixedSizePayload[:])

vspSize := binary.BigEndian.Uint16(fixedSizePayload[0:2]) - 16
flags = [1]byte{fixedSizePayload[2]}

clientToStationBytes := make([]byte, vspSize)

msgReader.Read(clientToStationBytes)

// parse c2s
clientToStation := &pb.ClientToStation{}
err = proto.Unmarshal(clientToStationBytes, clientToStation)
parsed := &pb.ZMQPayload{}
err = proto.Unmarshal(msg, parsed)
if err != nil {
logger.Printf("Failed to unmarshall ClientToStation: %v", err)
return nil, err
}

conjureKeys, err := dd.GenSharedKeys(sharedSecret[:])
conjureKeys, err := dd.GenSharedKeys(parsed.SharedSecret)

// Register one or both of v4 and v6 based on support specified by the client
var newRegs []*dd.DecoyRegistration

if clientToStation.GetV4Support() {
reg, err := regManager.NewRegistration(clientToStation, &conjureKeys, flags, false)
if parsed.RegistrationPayload.GetV4Support() {
reg, err := regManager.NewRegistration(parsed.RegistrationPayload, &conjureKeys, false)
if err != nil {
logger.Printf("Failed to create registration: %v", err)
return nil, err
Expand All @@ -159,8 +131,8 @@ func recieve_zmq_message(sub *zmq.Socket, regManager *dd.RegistrationManager) ([
newRegs = append(newRegs, reg)
}

if clientToStation.GetV6Support() {
reg, err := regManager.NewRegistration(clientToStation, &conjureKeys, flags, true)
if parsed.RegistrationPayload.GetV6Support() {
reg, err := regManager.NewRegistration(parsed.RegistrationPayload, &conjureKeys, true)
if err != nil {
logger.Printf("Failed to create registration: %v", err)
return nil, err
Expand All @@ -177,9 +149,13 @@ func recieve_zmq_message(sub *zmq.Socket, regManager *dd.RegistrationManager) ([
var logger *log.Logger

func main() {
var zmqAddress string
flag.StringVar(&zmqAddress, "zmq-address", "ipc://@zmq-proxy", "Address of ZMQ proxy")
flag.Parse()

regManager := dd.NewRegistrationManager()
logger = regManager.Logger
go get_zmq_updates(regManager)
go get_zmq_updates(zmqAddress, regManager)

go func() {
for {
Expand Down
99 changes: 62 additions & 37 deletions detect.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <time.h>
#include <pthread.h>
#include <unistd.h>
//#include <zmq.h>
#include <zmq.h>

// The Makefile in this directory provides a `make tapdance` and a
// `make zc_tapdance` rule. The latter causes the following #define to happen.
Expand Down Expand Up @@ -68,21 +68,13 @@ void* g_rust_failed_map = 0;
int g_update_cli_conf_when_convenient = 0;
int g_update_overloaded_decoys_when_convenient = 0;

//void *g_zmq_ctx;
//void *g_zmq_socket;

#define TIMESPEC_DIFF(a, b) ((a.tv_sec - b.tv_sec)*1000000000LL + \
((int64_t)a.tv_nsec - (int64_t)b.tv_nsec))

void the_program(uint8_t core_id, unsigned int log_interval,
uint8_t* station_key, char* zmq_listener)
uint8_t* station_key, char* workers_socket_addr)
{
struct RustGlobalsStruct rust_globals = rust_detect_init(core_id, station_key);

// init zeromq
//g_zmq_ctx = zmq_ctx_new();
//g_zmq_socket = zmq_socket(g_zmq_ctx, ZMQ_PUB);
//zmq_connect(g_zmq_socket, zmq_listener);
struct RustGlobalsStruct rust_globals = rust_detect_init(core_id, station_key, workers_socket_addr);

//g_rust_failed_map = rust_globals.fail_map;
//g_rust_cli_conf_proto_ptr = rust_globals.cli_conf;
Expand Down Expand Up @@ -333,7 +325,7 @@ void startup_pfring_maybezc(unsigned int cluster_id, int proc_ind)

pid_t start_tapdance_process(int core_affinity, unsigned int cluster_id,
int proc_ind, unsigned int log_interval,
uint8_t* station_key, char* zmq_listener)
uint8_t* station_key, char* workers_socket_addr)
{
pid_t the_pid = fork();
if(the_pid == 0)
Expand All @@ -345,7 +337,7 @@ pid_t start_tapdance_process(int core_affinity, unsigned int cluster_id,
signal(SIGINT, sigproc_child);
signal(SIGTERM, sigproc_child);
signal(SIGPIPE, ignore_sigpipe);
the_program(proc_ind, log_interval, station_key, zmq_listener);
the_program(proc_ind, log_interval, station_key, workers_socket_addr);
}
printf("Core %d: PID %d, lcore %d\n", proc_ind, the_pid, core_affinity);
return the_pid;
Expand Down Expand Up @@ -378,7 +370,8 @@ struct cmd_options
uint8_t* public_key; // the public key, used only for diagnostic
// (all nuls if not provided)
int skip_core; // -1 if not skipping any core, otherwise the core to skip
char* zmq_address;
char* zmq_address; // address of output ZMQ socket to bind
char* zmq_worker_address; // address of ZMQ socket to bind for communication between threads
};

static uint8_t station_key[TD_KEYLEN_BYTES] = {
Expand All @@ -399,14 +392,16 @@ void parse_cmd_args(int argc, char* argv[], struct cmd_options* options)
options->pfring_offset = 0;
options->log_interval = 1000; // milliseconds
int skip_core = -1; // If >0, skip this core when incrementing
options->zmq_address = "ipc://@detector";
options->zmq_worker_address = "ipc://@detector-workers";

char* keyfile_name = 0;

options->station_key = station_key;
options->public_key = public_key;

char c;
while ((c = getopt(argc,argv,"i:n:c:o:l:K:s:a:z:")) != -1)
while ((c = getopt(argc,argv,"i:n:c:o:l:K:s:a:w:z:")) != -1)
{
switch (c)
{
Expand Down Expand Up @@ -435,10 +430,14 @@ void parse_cmd_args(int argc, char* argv[], struct cmd_options* options)
case 's':
skip_core = atoi(optarg);
break;
case 'a':
options->zmq_address = malloc(strlen(optarg));
strcpy(options->zmq_address, optarg);
break;
case 'a':
options->zmq_address = malloc(strlen(optarg));
strcpy(options->zmq_address, optarg);
break;
case 'w':
options->zmq_worker_address = malloc(strlen(optarg));
strcpy(options->zmq_worker_address, optarg);
break;
case 'z':
options->pfring_offset = atoi(optarg);
break;
Expand Down Expand Up @@ -520,25 +519,49 @@ void parse_cmd_args(int argc, char* argv[], struct cmd_options* options)
fflush(stdout);
}

// id is a 1-byte identifier that uniquely determines which proxy process it will go to
// should be the first byte of the session (alternatively, hash of the client IP)
//extern void *g_zmq_socket; // Provided by zc_tapdance
int send_packet_to_proxy(uint8_t id, uint8_t *pkt, size_t len)
// Start a new process to proxy messages between
// the worker threads and the outgoing ZMQ socket.
int handle_zmq_proxy(char *socket_addr, char *workers_socket_addr)
{
/*
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, len+1);
if (rc != 0) {
return rc;
int pid = fork();
if (pid == 0) {
// Set up ZMQ sockets, one for publishing to the proxy and one for taking in
// messages from other threads
void *ctx = zmq_ctx_new();
void *pub = zmq_socket(ctx, ZMQ_PUB);

// Bind the socket for publishing to the proxy
printf("binding zmq socket to %s\n", socket_addr);
int rc = zmq_bind(pub, socket_addr);
if (rc != 0) {
printf("bind on pub socket failed: %s\n", zmq_strerror(errno));
return rc;
}

void *sub = zmq_socket(ctx, ZMQ_SUB);
rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0) {
printf("failed to set sock opt: %s\n", zmq_strerror(errno));
return rc;
}

// Bind the socket for communication between worker threads
// (they're actually set up as separate processes, so we
// need to use IPC rather than inproc communication)
printf("binding zmq worker socket to %s\n", workers_socket_addr);
rc = zmq_bind(sub, workers_socket_addr);
if (rc != 0) {
printf("bind on sub socket failed: %s\n", zmq_strerror(errno));
return rc;
}

// Proxy traffic between worker threads and the outgoing socket
rc = zmq_proxy(sub, pub, NULL);
if (rc != 0) {
printf("proxy returned error: %s\n", zmq_strerror(errno));
return rc;
}
}
// TODO: can we do this without memcpy? Maybe setup msg
// or even just use zmq_send directly?
memcpy(zmq_msg_data(&msg), &id, 1);
memcpy(&(((char*)zmq_msg_data(&msg))[1]), pkt, len);
// TODO: use ZMQ_DONTWAIT to make this non-blocking
return zmq_msg_send(&msg, g_zmq_socket, 0);
*/
// Not implemented
return 0;
}

Expand Down Expand Up @@ -569,6 +592,8 @@ int main(int argc, char* argv[])
sa2.sa_sigaction = notify_overloaded_decoys_file_update;
sigaction(SIGUSR2, &sa2, NULL);

handle_zmq_proxy(options.zmq_address, options.zmq_worker_address);

int i;
int core_num = options.core_affinity_offset;
for (i=0; i<g_num_worker_procs; i++)
Expand All @@ -578,7 +603,7 @@ int main(int argc, char* argv[])
g_forked_pids[i] =
start_tapdance_process(core_num,
options.cluster_id, i+pfring_offset, options.log_interval,
options.station_key, options.zmq_address);
options.station_key, options.zmq_worker_address);
core_num++;
}
signal(SIGINT, sigproc_parent);
Expand Down
Loading

0 comments on commit 427e5f9

Please sign in to comment.