The AWS Simple Queue Service (SQS) sink connector pulls data from Pulsar topics and persists data to AWS SQS.
You can get the SQS sink connector using one of the following methods:
Download the NAR package from here.
Build it from the source code.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-sqs
Assume that PULSAR_IO_SQS_HOME
is the home directory for the pulsar-io-sqs
repo. Build the connector in the ${PULSAR_IO_SQS_HOME}
directory.
mvn clean install -DskipTests
After the connector is successfully built, a NAR
package is generated under the target
directory.
ls target
pulsar-io-sqs-2.7.0.nar
Before using the SQS sink connector, you need to configure it.
You can create a configuration file (JSON or YAML) to set the following properties.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsEndpoint |
String | false | " " (empty string) | AWS SQS end-point URL. It can be found at here. |
awsRegion |
String | true | " " (empty string) | Supported AWS region. For example, us-west-1, us-west-2. |
awsCredentialPluginName |
String | false | " " (empty string) | Fully-qualified class name of implementation of AwsCredentialProviderPlugin . |
awsCredentialPluginParam |
String | true | " " (empty string) | JSON parameter to initialize AwsCredentialsProviderPlugin . |
queueName |
String | true | " " (empty string) | Name of the SQS queue that messages should be read from or written to. |
Example
JSON
{
"tenant": "public",
"namespace": "default",
"name": "sqs-sink",
"inputs": [
"test-queue-pulsar"
],
"archive": "connectors/pulsar-io-sqs-2.7.0.nar",
"parallelism": 1,
"configs":
{
"awsEndpoint": "https://sqs.us-west-2.amazonaws.com",
"awsRegion": "us-west-2",
"queueName": "test-queue",
"awsCredentialPluginName": "",
"awsCredentialPluginParam": '{"accessKey":"myKey","secretKey":"my-Secret"}'
}
}
YAML
tenant: "public"
namespace: "default"
name: "sqs-sink"
inputs:
- "test-queue-pulsar"
archive: "connectors/pulsar-io-sqs-2.7.0.nar"
parallelism: 1
configs:
awsEndpoint: "https://sqs.us-west-2.amazonaws.com"
awsRegion: "us-west-2"
queueName: "test-queue"
awsCredentialPluginName: ""
awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
You can use the SQS sink connector as a non built-in connector or a built-in connector as below.
If you already have a Pulsar cluster, you can use the SQS sink connector as a non built-in connector directly.
This example shows how to create an SQS sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--archive pulsar-io-sqs-2.7.0.nar \
--sink-config-file sqs-sink-config.yaml \
--classname org.apache.pulsar.ecosystem.io.sqs.SQSSink \
--name sqs-sink
You can make the SQS sink connector as a built-in connector and use it on a standalone cluster, on-premises cluster, or K8S cluster.
This example describes how to use the SQS sink connector to pull data from Pulsar topics and persist data to SQS in standalone mode.
Prepare SQS service.
For more information, see Getting Started with Amazon SQS.
Copy the NAR package of the SQS connector to the Pulsar connectors directory.
cp pulsar-io-sqs-2.7.0.nar PULSAR_HOME/connectors/pulsar-io-sqs-2.7.0.nar
Start Pulsar in standalone mode.
PULSAR_HOME/bin/pulsar standalone
Run the SQS sink connector locally.
PULSAR_HOME/bin/pulsar-admin sink localrun \
--sink-type sqs \
--sink-config-file sqs-sink-config.yaml
Send messages to Pulsar topics.
PULSAR_HOME/bin/pulsar-client produce public/default/test-queue-pulsar --messages hello -n 10
Consume messages from the SQS queue using the AWS SQS CLI tool.
aws sqs receive-message --queue-url ${QUEUE_URL} --max-number-of-messages 10
Now you can see the messages containing "Hello From Pulsar" from AWS SQS CLI.
This example explains how to create an SQS sink connector in an on-premises cluster.
Copy the NAR package of the SQS connector to the Pulsar connectors directory.
cp pulsar-io-sqs-2.7.0.nar $PULSAR_HOME/connectors/pulsar-io-sqs-2.7.0.nar
Reload all built-in connectors.
PULSAR_HOME/bin/pulsar-admin sinks reload
Check whether the SQS sink connector is available on the list or not.
PULSAR_HOME/bin/pulsar-admin sinks available-sinks
Create an SQS sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-type sqs \
--sink-config-file sqs-sink-config.yaml \
--name sqs-sink
Build a new image based on the Pulsar image with the SQS sink connector and push the new image to your image registry. This example tags the new image as streamnative/pulsar-sqs:2.7.0
.
FROM apachepulsar/pulsar-all:2.7.0
RUN curl https://github.com/streamnative/pulsar-io-sqs/releases/download/v2.7.0/pulsar-io-sqs-2.7.0.nar -o /pulsar/connectors/pulsar-io-sqs-2.7.0.nar
Extract the previous --set
arguments from K8S to the file pulsar.yaml
.
helm get values <release-name> > pulsar.yaml
Replace the images
section in the pulsar.yaml
file with the images
section of streamnative/pulsar-sqs:2.7.0
.
Upgrade the K8S cluster with the pulsar.yaml
file.
helm upgrade <release-name> streamnative/pulsar \
--version <new version> \
-f pulsar.yaml
Tip
For more information about how to upgrade a Pulsar cluster with Helm, see Upgrade Guide.
Create an SQS sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-type sqs \
--sink-config-file sqs-sink-config.yaml \
--name sqs-sink