The ActiveMQ source connector receives messages from ActiveMQ clusters and writes messages to Pulsar topics.
To install the ActiveMQ source connector, follow these steps.
Download the NAR package of the ActiveMQ source connector from here.
Put the NAR package pulsar-io-activemq-2.5.1.nar
in the pulsar connectors catalog.
cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
Run the ActiveMQ source connector locally.
$PULSAR_HOME/bin/pulsar-admin source localrun --source-config-file activemq-source-config.yaml
The configuration of the ActiveMQ source connector has the following properties.
Name | Type | Required | Default | Description |
---|---|---|---|---|
protocol |
String | true | "tcp" | ActiveMQ protocol |
host |
String | true | " " (empty string) | ActiveMQ host |
port |
int | true | 5672 | ActiveMQ port. |
username |
String | false | " " (empty string) | Username used to authenticate to ActiveMQ |
password |
String | false | " " (empty string) | Password used to authenticate to ActiveMQ. |
queueName |
String | false | " " (empty string) | ActiveMQ queue name that messages should be read from or written to |
topicName |
String | false | " " (empty string) | ActiveMQ topic name that messages should be read from or written to |
Before using the ActiveMQ source connector, you need to create a configuration file through one of the following methods.
JSON
{
"tenant": "public",
"namespace": "default",
"name": "activemq-source",
"topicName": "user-op-queue-topic",
"archive": "connectors/pulsar-io-activemq-2.5.1.nar",
"parallelism": 1,
"configs": {
"protocol": "tcp",
"host": "localhost",
"port": "61616",
"username": "admin",
"password": "admin",
"queueName": "user-op-queue"
}
}
YAML
tenant: "public"
namespace: "default"
name: "activemq-source"
topicName: "user-op-queue-topic"
archive: "connectors/pulsar-io-activemq-2.5.1.nar"
parallelism: 1
configs:
protocol: "tcp"
host: "localhost"
port: "61616"
username: "admin"
password: "admin"
queueName: "user-op-queue"
This example shows how to use the ActiveMQ source connector to receive messages from ActiveMQ clusters and writes messages to Pulsar topics.
Prepare ActiveMQ service.
docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
Put the pulsar-io-activemq-2.5.1.nar
in the pulsar connectors catalog.
cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
Run ActiveMQ source locally.
$PULSAR_HOME/bin/pulsar-admin source localrun --source-config-file activemq-source-config.yaml
Consume Pulsar messages.
$PULSAR_HOME/bin/pulsar-client consume -s "sub-products" public/default/user-op-queue-topic -n 0
Send ActiveMQ messages.
Use the test method sendMessage
of the class org.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo
to send ActiveMQ messages.
public void sendMessage() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
@Cleanup
Connection connection = connectionFactory.createConnection();
connection.start();
@Cleanup
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("user-op-queue");
@Cleanup
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 10; i++) {
String msgContent = "Hello ActiveMQ - " + i;
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(msgContent);
producer.send(message);
}
}