#producer
package main
import (
“fmt”
“github.com/streadway/amqp”
“log”
“os”
“strings”
)
const (
//amqp uri
uri = “amqp://guest:guest@10.0.0.11:5672/”
//durable amqp exchange name
exchangename = “”
//durable amqp queue name
queuename = “test-queues-acknowledgments”
)
//如果存在错误,则输出
func failonerror(err error, msg string) {
if err != nil {
log.fatalf(“%s: %s”, msg, err)
panic(fmt.sprintf(“%s: %s”, msg, err))
}
}
func main() {
bodymsg := bodyfrom(os.args)
//调用发布消息函数
publish(uri, exchangename, queuename, bodymsg)
log.printf(“published %db ok”, len(bodymsg))
}
func bodyfrom(args []string) string {
var s string
if (len(args) < 2) || os.args[1] == “” {
s = “hello angel”
} else {
s = strings.join(args[1:], ” “)
}
return s
}
//发布者的方法
//@amqpuri, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpuri string, exchange string, queue string, body string) {
//建立连接
log.printf(“dialing %q”, amqpuri)
connection, err := amqp.dial(amqpuri)
failonerror(err, “failed to connect to rabbitmq”)
defer connection.close()
//创建一个channel
log.printf(“got connection, getting channel”)
channel, err := connection.channel()
failonerror(err, “failed to open a channel”)
defer channel.close()
log.printf(“got queue, declaring %q”, queue)
//创建一个queue
q, err := channel.queuedeclare(
queuename, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failonerror(err, “failed to declare a queue”)
log.printf(“declared queue, publishing %db body (%q)”, len(body), body)
// producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.publish(
exchange, // exchange
q.name, // routing key
false, // mandatory
false, // immediate
amqp.publishing{
headers: amqp.table{},
contenttype: “text/plain”,
contentencoding: “”,
body: []byte(body),
})
failonerror(err, “failed to publish a message”)
}