RabbitMQ实时消息推送

RabbitMQ是一种消息队列中间件。支持安装多种plugin实现不同的通信协议。

本文测试STOMP插件实现实时消息推送。

RabbitMQ使用docker-compose安装。Dockerfile文件如下:

FROM rabbitmq:alpine

RUN rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp

EXPOSE 4369 5671 5672 15671 15672 15674 25672

docker-compose.yml

rabbitmq:
    build:
      docker/rabbitmq
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
    restart: always
    ports:
      - "15672:15672"
      - "15674:15674"
      - "5672:5672"
    container_name: rabbitmq

Producer使用PHP

先执行PHP的composer安装

composer require php-amqplib/php-amqplib

Producer代码

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'topic_exchange';
$exchangeType = 'topic';
$queueName = 'topic_queue';
$routingKey = 'topic.routing_key';

$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);

for ($i = 0; $i < 1000; $i++) {
    $data = date('Y-m-d H:i:s');
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, $exchangeName, $routingKey);
    sleep(1);
}

$channel->close();
$connection->close();

前端Js

需要引入stomp.js

<script>
    var has_had_focus = false;
    var pipe = function(el_name, send) {
        var div  = $(el_name + ' div');
        var inp  = $(el_name + ' input');
        var form = $(el_name + ' form');

        var print = function(m, p) {
            p = (p === undefined) ? '' : JSON.stringify(p);
            div.append($("<code>").text(m + ' ' + p));
            div.scrollTop(div.scrollTop() + 10000);
        };

        if (send) {
            form.submit(function() {
                send(inp.val());
                inp.val('');
                return false;
            });
        }
        return print;
    };

    // Stomp.js boilerplate
    var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws');
    client.debug = pipe('#second');

    var print_first = pipe('#first', function(data) {
        client.send('/amq/queue/topic_fanout', {"content-type":"text/plain"}, data);
    });
    var on_connect = function(x) {
        id = client.subscribe("/amq/queue/topic_fanout", function(d) {
            print_first(d.body);
        });
    };
    var on_error =  function() {
        console.log('error');
    };
    client.connect('anywhere', 'anywhere', on_connect, on_error, '/');

    $('#first input').focus(function() {
        if (!has_had_focus) {
            has_had_focus = true;
            $(this).val("");
        }
    });
</script>

参考

问题发现:这种发布订阅方式,如果有多个前端同时订阅一个队列。后台发布的消息,只有一个前端能收到。消息被前台接收后,队列中的消息会被清空。