Skip to content

Commit 6a56c7a

Browse files
committed
[core] Add support for manually configured outgoing channels
1 parent f30d6e8 commit 6a56c7a

3 files changed

Lines changed: 46 additions & 17 deletions

File tree

core/task/channel/inbound.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ chans.data1.0.type = push
5050
chans.data1.numSockets = 1
5151
*/
5252

53-
func (inbound Inbound) ToFMQMap(port uint64) (pm controlcommands.PropertyMap) {
53+
func (inbound *Inbound) ToFMQMap(port uint64) (pm controlcommands.PropertyMap) {
54+
return inbound.buildFMQMap(fmt.Sprintf("tcp://*:%d", port))
55+
}
56+
57+
func (inbound *Inbound) buildFMQMap(address string) (pm controlcommands.PropertyMap) {
5458
pm = make(controlcommands.PropertyMap)
5559
const chans = "chans"
5660
chName := inbound.Name
@@ -59,7 +63,7 @@ func (inbound Inbound) ToFMQMap(port uint64) (pm controlcommands.PropertyMap) {
5963
prefix := strings.Join([]string{chans, chName, "0"}, ".")
6064

6165
chanProps := controlcommands.PropertyMap{
62-
"address": fmt.Sprintf("tcp://*:%d", port),
66+
"address": address,
6367
"method": "bind",
6468
"rateLogging": strconv.Itoa(inbound.RateLogging),
6569
"rcvBufSize": strconv.Itoa(inbound.RcvBufSize),

core/task/channel/outbound.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,37 @@ chans.data1.0.type = pull
7979
chans.data1.numSockets = 1
8080
*/
8181

82-
func (outbound *Outbound) ToFMQMap(endpoint Endpoint) (pm controlcommands.PropertyMap) {
82+
func (outbound *Outbound) ToFMQMap(bindMap BindMap) (pm controlcommands.PropertyMap) {
83+
if outbound == nil {
84+
return
85+
}
86+
87+
var address string
88+
// If an explicit target was provided, we use it
89+
if strings.HasPrefix(outbound.Target, "tcp://") ||
90+
strings.HasPrefix(outbound.Target, "ipc://") {
91+
address = outbound.Target
92+
} else {
93+
// we don't need class.Bind data for this one, only task.bindPorts after resolving paths!
94+
for chPath, endpoint := range bindMap {
95+
// FIXME: implement more sophisticated channel matching here
96+
if outbound.Target == chPath {
97+
98+
// We have a match, so we generate a resolved target address and break
99+
address = fmt.Sprintf("tcp://%s:%d", endpoint.Host, endpoint.Port)
100+
break
101+
}
102+
}
103+
}
104+
105+
if len(address) == 0 {
106+
return
107+
}
108+
109+
return outbound.buildFMQMap(address)
110+
}
111+
112+
func (outbound *Outbound) buildFMQMap(address string) (pm controlcommands.PropertyMap) {
83113
pm = make(controlcommands.PropertyMap)
84114
const chans = "chans"
85115
chName := outbound.Name
@@ -88,7 +118,7 @@ func (outbound *Outbound) ToFMQMap(endpoint Endpoint) (pm controlcommands.Proper
88118
prefix := strings.Join([]string{chans, chName, "0"}, ".")
89119

90120
chanProps := controlcommands.PropertyMap{
91-
"address": fmt.Sprintf("tcp://%s:%d", endpoint.Host, endpoint.Port),
121+
"address": address,
92122
"method": "connect",
93123
"rateLogging": strconv.Itoa(outbound.RateLogging),
94124
"rcvBufSize": strconv.Itoa(outbound.RcvBufSize),

core/task/task.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -222,19 +222,14 @@ func (t Task) BuildPropertyMap(bindMap channel.BindMap) controlcommands.Property
222222
}
223223
}
224224

225-
for _, outbCh := range t.parent.CollectOutboundChannels() {
226-
// we don't need class.Bind data for this one, only task.bindPorts after resolving paths!
227-
for chPath, endpoint := range bindMap {
228-
// FIXME: implement more sophisticated channel matching here
229-
if outbCh.Target == chPath {
230-
231-
// We get the FairMQ-formatted propertyMap from the outbound channel spec
232-
chanProps := outbCh.ToFMQMap(endpoint)
233-
234-
// And we copy it into the task's propertyMap
235-
for k, v := range chanProps {
236-
propMap[k] = v
237-
}
225+
for _, outboundCh := range t.parent.CollectOutboundChannels() {
226+
// We get the FairMQ-formatted propertyMap from the outbound channel spec
227+
chanProps := outboundCh.ToFMQMap(bindMap)
228+
229+
// And if valid, we copy it into the task's propertyMap
230+
if len(chanProps) > 0 {
231+
for k, v := range chanProps {
232+
propMap[k] = v
238233
}
239234
}
240235
}

0 commit comments

Comments
 (0)