VERSION

Features

  • Use Google Cloud Pub/Sub sink connector to sync data from Pulsar

Tags

Pulsar IO

Pub/Sub

Sink

Enterprise Support

StreamNative supported

Author

[ "StreamNative" ]

Google Cloud Pub/Sub Sink

The Google Cloud Pub/Sub sink connector pulls data from Pulsar topics and persists data to Google Cloud Pub/Sub topics.

How to get

This section describes how to build the Google Cloud Pub/Sub sink connector.

Work with Function Worker

You can get the Google Cloud Pub/Sub sink connector using one of the following methods if you use Pulsar Function Worker to run connectors in a cluster.

To build the Google Cloud Pub/Sub sink connector from the source code, follow these steps.

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-google-pubsub
    
  2. Build the connector in the pulsar-io-google-pubsub directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a NAR package is generated under the target directory.

    ls target
    pulsar-io-google-pubsub-2.9.1.2.nar
    

Work with Function Mesh

You can pull the Google Cloud Pub/Sub sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the Google Cloud Pub/Sub sink connector, you need to configure it. This table lists the properties and the descriptions.

Name Type Required Default Description
pubsubEndpoint String false "" (empty string) The Google Cloud Pub/Sub end-point URL.
pubsubCredential String false "" (empty string) The credential (JSON string) for accessing the Google Cloud.
pubsubProjectId String true "" (empty string) The Google Cloud project ID.
pubsubTopicId String true " " (empty string) The topic ID. It is used to read messages from or write messages to Google Cloud Pub/Sub topics.
pubsubSchemaId String false "" (empty string) The schema ID. You must set the schema ID when creating a schema for Google Cloud Pub/Sub topics.
pubsubSchemaType String false "" (empty string) The schema type. You must set the schema type when creating a schema for Google Cloud Pub/Sub topics. Currently, only the AVRO format is supported.
pubsubSchemaEncoding String false "" (empty string) The encoding of the schema. You must set the schema encoding when creating a schema for Google Cloud Pub/Sub topics. Currently, only the JSON format is supported.
pubsubSchemaDefinition String false "" (empty string) The definition of the schema. It is used to create a schema to or parse messages from Google Cloud Pub/Sub topics.

Note

The provided Google Cloud credentials must have permissions to access Google Cloud resources. To use the Google Cloud Pub/Sub sink connector, ensure the Google Cloud credentials have the following permissions to Google Cloud Pub/Sub API:

  • projects.topics.create
  • projects.topics.get
  • projects.topics.publish

For more information about Google Cloud Pub/Sub API permissions, see Google Cloud Pub/Sub API permissions: Access control.

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • JSON

     {
         "tenant": "public",
         "namespace": "default",
         "name": "google-pubsub-sink",
         "inputs": [
           "test-google-pubsub-pulsar"
         ],
         "archive": "connectors/pulsar-io-google-pubsub-2.9.1.2.nar",
         "parallelism": 1,
         "configs": {
           "pubsubCredential": "SECRETS",
           "pubsubProjectId": "pulsar-io-google-pubsub",
           "pubsubTopicId": "test-google-pubsub-sink"
       }
     }
    
  • YAML

    tenant: public
    namespace: default
    name: google-pubsub-sink
    inputs:
      - test-google-pubsub-pulsar
    archive: connectors/pulsar-io-google-pubsub-2.9.1.2.nar
    parallelism: 1
    configs:
      pubsubCredential: 'SECRETS'
      pubsubProjectId: pulsar-io-google-pubsub
      pubsubTopicId: test-google-pubsub-sink
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create a Google Cloud Pub/Sub sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: google-pubsub-sink-sample
spec:
  image: streamnative/pulsar-io-google-pubsub:2.9.1.2
  className: org.apache.pulsar.ecosystem.io.pubsub.PubsubSink
  replicas: 1
  maxReplicas: 1
  input:
    topics: 
      - persistent://public/default/destination
  sinkConfig:
    pubsubCredential: 'SECRETS'
    pubsubProjectId: pulsar-io-google-pubsub
    pubsubTopicId: test-google-pubsub-sink
  pulsar:
    pulsarConfig: "test-pulsar-sink-config"
  resources:
    limits:
    cpu: "0.2"
    memory: 1.1G
    requests:
    cpu: "0.1"
    memory: 1G
  java:
    jar: connectors/pulsar-io-google-pubsub-2.9.1.2.nar
  clusterName: test-pulsar
  autoAck: true

How to use

You can use the Google Cloud Pub/Sub sink connector with Function Worker or Function Mesh.

Work with Function Worker

You can use the Google Cloud Pub/Sub sink connector as a non built-in connector or a built-in connector.

If you already have a Pulsar cluster, you can use the Google Cloud Pub/Sub sink connector as a non built-in connector directly.

This example shows how to create a Google Cloud Pub/Sub sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-config-file <google-pubsub-sink-config.yaml>

Work with Function Mesh

This example describes how to create a Google Cloud Pub/Sub sink connector for a Kuberbetes cluster using Function Mesh.

Prerequisites

Step

  1. Define the Google Cloud Pub/Sub sink connector with a YAML file and save it as sink-sample.yaml.

    This example shows how to publish the Google Cloud Pub/Sub sink connector to Function Mesh with a Docker image.

    apiVersion: compute.functionmesh.io/v1alpha1
    kind: Sink
    metadata:
      name: google-pubsub-sink-sample
    spec:
      image: streamnative/pulsar-io-google-pubsub:2.9.1.2
      className: org.apache.pulsar.ecosystem.io.pubsub.PubsubSink
      replicas: 1
      maxReplicas: 1
      input:
        topics: 
          - persistent://public/default/destination
        typeClassName: “[B”
      sinkConfig:
        pubsubCredential: 'SECRETS'
        pubsubProjectId: pulsar-io-google-pubsub
        pubsubTopicId: test-google-pubsub-sink
      pulsar:
        pulsarConfig: "test-pulsar-sink-config"
      resources:
        limits:
          cpu: "0.2"
          memory: 1.1G
        requests:
          cpu: "0.1"
          memory: 1G
      java:
        jar: connectors/pulsar-io-google-pubsub-2.9.1.2.nar
      clusterName: test-pulsar
      autoAck: true
    
  2. Apply the YAML file to create the Google Cloud Pub/Sub sink connector.

    Input

    kubectl apply -f <path-to-sink-sample.yaml>
    

    Output

    sink.compute.functionmesh.io/google-pubsub-sink-sample created
    
  3. Check whether the Google Cloud Pub/Sub sink connector is created successfully.

    Input

    kubectl get all
    

    Output

    NAME                                         READY   STATUS      RESTARTS   AGE
    pod/google-pubsub-sink-sample-0               1/1    Running     0          77s
    

    After that, you can produce and consume messages using the Google Cloud Pub/Sub sink connector between Pulsar and Google Cloud Pub/Sub.