forked from gocardless/amqpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
103 lines (86 loc) · 2.55 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
type Producer struct {
connection *amqp.Connection
channel *amqp.Channel
tag string
done chan error
}
func NewProducer(amqpURI, exchange, exchangeType, key, ctag string, reliable bool, durable bool) (*Producer, error) {
p := &Producer{
connection: nil,
channel: nil,
tag: ctag,
done: make(chan error),
}
var err error
log.Printf("Connecting to %s", amqpURI)
p.connection, err = amqp.Dial(amqpURI)
if err != nil {
return nil, fmt.Errorf("Dial: ", err)
}
log.Printf("Getting Channel ")
p.channel, err = p.connection.Channel()
if err != nil {
return nil, fmt.Errorf("Channel: ", err)
}
log.Printf("Declaring Exchange (%s)", exchange)
if err := p.channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
durable, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Exchange Declare: %s", err)
}
// Reliable publisher confirms require confirm.select support from the
// connection.
// if reliable {
// if err := p.channel.Confirm(false); err != nil {
// return nil, fmt.Errorf("Channel could not be put into confirm mode: ", err)
// }
// ack, nack := p.channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
// // defer confirmOne(ack, nack)
// }
return p, nil
}
func (p *Producer) Publish(exchange, routingKey, body string) error {
log.Printf("Publishing %s (%dB)", body, len(body))
if err := p.channel.Publish(
exchange, // publish to an exchange
routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Exchange Publish: ", err)
}
return nil
}
// One would typically keep a channel of publishings, a sequence number, and a
// set of unacknowledged sequence numbers and loop until the publishing channel
// is closed.
func confirmOne(ack, nack chan uint64) {
log.Printf("waiting for confirmation of one publishing")
select {
case tag := <-ack:
log.Printf("confirmed delivery with delivery tag: %d", tag)
case tag := <-nack:
log.Printf("failed delivery of delivery tag: %d", tag)
}
}