AoP (AMQP on Pulsar) supports AMQP0-9-1 protocol and it is backed by Pulsar. AoP is implemented as a Pulsar ProtocolHandler with the protocol name of "amqp". The AoP protocol handler is built as a nar
file and it is loaded when the Pulsar broker starts.
To install the AoP protocol handler, follow these steps:
Download the NAR package of the AoP protocol handler from here.
Install the AoP protocol handler.
Set the configuration of the AoP protocol handler.
Add the following properties and set their values in the Pulsar configuration file, such as conf/broker.conf
or conf/standalone.conf
.
Property | Recommended value | Default value |
---|---|---|
messagingProtocols |
amqp | null |
protocolHandlerDirectory |
./protocols | null |
Example
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
Set the AMQP server listeners.
Note
The hostname in the AMQP server listeners should be the same one as the Pulsar broker's
advertisedAddress
.
Example
amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
Load the AoP protocol handler. After installing the AoP protocol handler on the Pulsar broker, you can restart the Pulsar broker to load the AoP protocol handler.
The following table lists configurations available for the AoP protocol handler.
Item | Description | Default value |
---|---|---|
amqpTenant | Set the AMQP on the Pulsar broker tenant. | public |
amqpListeners | Set the AMQP service port. | amqp://127.0.0.1:5672 |
amqpMaxNoOfChannels | Set the maximum number of channels available concurrently on a connection. | 64 |
amqpMaxFrameSize | Set the maximum frame size on a connection. | 4194304 (4MB) |
amqpHeartBeat | Set the default heartbeat timeout on the Pulsar broker. | 60 (s) |
amqpProxyPort | Set the AMQP proxy service port. | 5682 |
amqpProxyEnable | Enable/Disable the proxy service. | false |
AoP is supported on Pulsar 2.6.0 or later releases.
This example shows how to use the AoP protocol handler in standalone mode.
Download the NAR package of the AoP protocol handler from here.
Install the AoP protocol handler.
Set the configuration of the AoP protocol handler.
Add the following properties and set their values in the Pulsar configuration file, such as conf/broker.conf
or conf/standalone.conf
.
Property | Recommended value | Default value |
---|---|---|
messagingProtocols |
amqp | null |
protocolHandlerDirectory |
Location of the AoP NAR file | ./protocols |
Example
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
Set the AMQP server listeners.
Note
The hostname in the AMQP server listeners should be the same one as the Pulsar broker's
advertisedAddress
.
Example
amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
Load the AoP protocol handler. After installing the AoP protocol handler on the Pulsar broker, you can restart the Pulsar broker to load the AoP protocol handler.
$PULSAR_HOME/bin/pulsar-daemon start standalone
Add the namespace for vhost.
# for example, the vhost name is `vhost`
bin/pulsar-admin namespaces create -b 1 public/vhost1
# set retention for the namespace
bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
Use the RabbitMQ client to test the AoP protocol handler.
# add RabbitMQ client dependency in your project
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
// Java Code
// create connection
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("vhost1");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "ex";
String queue = "qu";
// exchange declare
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
// queue declare and bind
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind(queue, exchange, "");
// publish some messages
for (int i = 0; i < 100; i++) {
channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
}
// consume messages
CountDownLatch countDownLatch = new CountDownLatch(100);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive msg: " + new String(body));
countDownLatch.countDown();
}
});
countDownLatch.await();
// release resource
channel.close();
connection.close();