The AWS Lambda sink connector is a Pulsar IO connector for pulling messages from Pulsar topics to AWS Lambda to invoke Lambda functions.
This section describes how to build the AWS Lambda sink connector.
You can get the AWS Lambda sink connector using one of the following methods if you use Pulsar Function Worker to run connectors in a cluster.
Download the NAR package from the download page.
Build it from the source code.
To build the AWS Lambda sink connector from the source code, follow these steps.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-aws-lambda
Build the connector in the pulsar-io-aws-lambda
directory.
mvn clean install -DskipTests
After the connector is successfully built, a NAR
package is generated under the target
directory.
ls target
pulsar-io-aws-lambda-{{connector:version}}.nar
You can pull the AWS Lambda sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.
Before using the AWS Lambda sink, you need to configure it. This table lists the properties and the descriptions.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsEndpoint |
String | false | " " (empty string) | The AWS Lambda endpoint URL. It can be found at AWS Lambda endpoints and quotas. |
awsRegion |
String | true | " " (empty string) | The supported AWS region. For example, us-west-1 , us-west-2 . |
awsCredentialPluginName |
String | false | " " (empty string) | The fully-qualified class name of the AwsCredentialProviderPlugin implementation. |
awsCredentialPluginParam |
String | true | " " (empty string) | The JSON parameter to initialize AwsCredentialsProviderPlugin . |
lambdaFunctionName |
String | true | " " (empty string) | The Lambda function that should be invoked by the messages. |
synchronousInvocation |
Boolean | true | true | - true : invoke a Lambda function synchronously. - false : invoke a Lambda function asynchronously. |
You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.
Example
JSON
{
"tenant": "public",
"namespace": "default",
"name": "aws-lambda-sink",
"inputs": [
"test-aws-lambda-topic"
],
"archive": "connectors/pulsar-io-aws-lambda-{{connector:version}}.nar",
"parallelism": 1,
"configs":
{
"awsEndpoint": "https://lambda.us-west-2.amazonaws.com",
"awsRegion": "us-west-2",
"lambdaFunctionName": "test-function",
"awsCredentialPluginName": "",
"awsCredentialPluginParam": '{"accessKey":"myKey","secretKey":"my-Secret"}',
"synchronousInvocation": true
}
}
YAML
tenant: "public"
namespace: "default"
name: "aws-lambda-sink"
inputs:
- "test-aws-lambda-topic"
archive: "connectors/pulsar-io-aws-lambda-{{connector:version}}.nar"
parallelism: 1
configs:
awsEndpoint: "https://lambda.us-west-2.amazonaws.com"
awsRegion: "us-west-2"
lambdaFunctionName: "test-function"
awsCredentialPluginName: ""
awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
synchronousInvocation: true
You can create a CustomResourceDefinitions (CRD) to create an AWS Lambda sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.
You can define a CRD file (YAML) to set the properties as below.
apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: aws-lambda-sink-sample
spec:
image: streamnative/pulsar-io-aws-lambda:{{connector:version}}
className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink
replicas: 1
input:
topics:
- persistent://public/default/destination
typeClassName: “[B”
sinkConfig:
awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com"
awsRegion: "us-west-2"
lambdaFunctionName: "test-function"
awsCredentialPluginName: ""
awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
synchronousInvocation: true
pulsar:
pulsarConfig: "test-pulsar-sink-config"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-aws-lambda-{{connector:version}}.nar
clusterName: test-pulsar
autoAck: true
You can use the AWS Lambda sink connector with Function Worker or Function Mesh.
You can use the AWS Lambda sink connector as a non built-in connector or a built-in connector.
This example describes how to create an AWS Lambda sink connector for a Kuberbetes cluster using Function Mesh.
Create and connect to a Kubernetes cluster.
Create a Pulsar cluster in the Kubernetes cluster.
Install the Function Mesh Operator and CRD into the Kubernetes cluster.
Define the AWS Lambda sink connector with a YAML file and save it as sink-sample.yaml
.
This example shows how to publish the AWS Lambda sink connector to Function Mesh with a Docker image.
apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: aws-lambda-sink-sample
spec:
image: streamnative/pulsar-io-aws-lambda:{{connector:version}}
className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink
replicas: 1
input:
topics:
- persistent://public/default/destination
typeClassName: “[B”
sinkConfig:
awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com"
awsRegion: "us-west-2"
lambdaFunctionName: "test-function"
awsCredentialPluginName: ""
awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
synchronousInvocation: true
pulsar:
pulsarConfig: "test-pulsar-sink-config"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-aws-lambda-{{connector:version}}.nar
clusterName: test-pulsar
autoAck: true
Apply the YAML file to create the AWS Lambda sink connector.
Input
kubectl apply -f <path-to-sink-sample.yaml>
Output
sink.compute.functionmesh.io/aws-lambda-sink-sample created
Check whether the AWS Lambda sink connector is created successfully.
Input
kubectl get all
Output
NAME READY STATUS RESTARTS AGE
pod/aws-lambda-sink-sample-0 1/1 Running 0 77s
After that, you can use the AWS Lambda sink connector to export Pulsar messages to AWS Lambda to invoke Lambda functions.