diff --git a/downstream.go b/downstream.go index 17b1d80..e948aee 100644 --- a/downstream.go +++ b/downstream.go @@ -379,7 +379,7 @@ func (dc *downstreamConn) handleCapCommand(cmd string, args []string) error { } } - caps := []string{"message-tags", "server-time"} + caps := []string{"message-tags", "server-time", "echo-message"} if dc.capVersion >= 302 { caps = append(caps, "sasl=PLAIN") @@ -432,7 +432,7 @@ func (dc *downstreamConn) handleCapCommand(cmd string, args []string) error { } switch name { - case "sasl", "message-tags", "server-time": + case "sasl", "message-tags", "server-time", "echo-message": dc.caps[name] = enable default: ack = false @@ -666,6 +666,7 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { // TODO: can't be enabled/disabled on-the-fly msgTagsEnabled := dc.caps["message-tags"] serverTimeEnabled := dc.caps["server-time"] + echoMessageEnabled := dc.caps["echo-message"] consumer, ch := net.ring.NewConsumer(seqPtr) @@ -692,7 +693,7 @@ func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { _, ours := dc.ourMessages[msg] delete(dc.ourMessages, msg) dc.lock.Unlock() - if ours { + if ours && !echoMessageEnabled { // The message comes from our connection, don't echo it // back consumer.Consume()