The Google Cloud BigQuery source connector feeds data from Google Cloud BigQuery tables and writes data to Pulsar topics.
This section describes how to build the Google Cloud BigQuery source connector.
You can get the Google Cloud BigQuery source connector using one of the following methods:
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-bigquery
Build the connector in the pulsar-io-bigquery
directory.
mvn clean install -DskipTests
After the connector is successfully built, a JAR
package is generated under the target directory.
ls target
pulsar-io-bigquery-{{connector:version}}.jar
You can pull the Google Cloud BigQuery source connector Docker image from the Docker Hub if you use Function Mesh to run the connector.
Before using the Google Cloud BigQuery source connector, you need to configure it. This table lists the properties and the descriptions.
Name | Type | Required | Default | Description |
---|---|---|---|---|
projectId |
String | Yes | "" (empty string) | The Google BigQuery project ID. |
datasetName |
String | Yes | "" (empty string) | The Google BigQuery dataset name. |
tableName |
String | Yes | "" (empty string) | The Google BigQuery table name. |
credentialJsonString |
String | No | "" (empty string) | The authentication JSON key. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key when the credentialJsonString is set to an empty string. For details, see the Google documentation. |
queueSize |
int | No | "" (empty string) | The buffer queue size of the source. It is used for storing records before they are sent to Pulsar topics. By default, it is set to 10000 . |
snapshotTime |
long | No | -1 | The snapshot time of the table. If it is not set, it is interpreted as now. |
sql |
String | No | "" (empty string) | The SQL query on BigQuery. The computed result is saved in a temporary table. The temporary table has a configurable expiration time, and the BigQuery source connector automatically deletes the temporary table when the data is transferred completely. The projectId and datasetName gets values from the configuration file, and the tableName is generated by UUID. |
expirationTimeInMinutes |
int | No | 1440 | The expiration time in minutes until the table is expired and auto-deleted. |
maxParallelism |
int | No | 1 | The maximum parallelism for reading. In fact, the number may be less if the BigQuery source connector deems the data small enough. |
selectedFields |
String | No | "" (empty string) | Names of the fields in the table that should be read. |
filters |
String | No | "" (empty string) | A list of clauses that can filter the result of the table. |
checkpointIntervalSeconds |
int | No | 60 | The checkpoint interval (in units of seconds). By default, it is set to 60s. |
Note
The provided Google Cloud credentials must have permission to access Google Cloud resources. To use the Google Cloud BigQuery source connector, ensure the Google Cloud credentials have the following permissions to the Google BigQuery API:
- bigquery.jobs.create
- bigquery.tables.create
- bigquery.tables.get
- bigquery.tables.getData
- bigquery.tables.list
- bigquery.tables.update
- bigquery.tables.updateData
For more information about Google BigQuery API permissions, see Google Cloud BigQuery API permissions: Access control.
You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.
Example
YAML
name: google-bigquery-source
className: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource
namespace: default
topicName: google-bigquery-source-test
parallelism: 1
archive: connectors/pulsar-io-bigquery-source.jar
batchSourceConfig:
discoveryTriggererClassName: org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger
configs:
# projectId is BigQuery project id.
#
# This field is *required*.
#
projectId: bigquery-dev-001
# datasetName is BigQuery dataset name.
#
# This field is *required*.
#
datasetName: babynames
# tableName is BigQuery table name.
#
# This field is *required*.
#
tableName: names2021
credentialJsonString: SECRETS
JSON
{
"name": "source-test-source",
"className": "org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource",
"namespace": "default",
"topicName": "google-bigquery-source-test",
"parallelism": 1,
"archive": "connectors/pulsar-io-bigquery-source.jar",
"batchSourceConfig": {
"discoveryTriggererClassName": "org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger"
},
"configs": {
"projectId": "bigquery-dev-001",
"datasetName": "babynames",
"tableName": "names2021",
"credentialJsonString": "SECRETS"
}
}
You can create a CustomResourceDefinitions (CRD) to create a Google Cloud BigQuery source connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar IO source CRD configurations, see source CRD configurations.
You can define a CRD file (YAML) to set the properties as below.
apiVersion: compute.functionmesh.io/v1alpha1
kind: Source
metadata:
name: google-bigquery-source-sample
spec:
image: streamnative/pulsar-io-bigquery:{{connector:version}}
className: org.apache.pulsar.functions.source.batch.BatchSourceExecutor
replicas: 1
maxReplicas: 1
output:
producerConf:
maxPendingMessages: 1000
maxPendingMessagesAcrossPartitions: 50000
useThreadLocalProducers: true
topic: persistent://public/default/google-bigquery-pulsar-source
sourceConfig:
__BATCHSOURCECLASSNAME__: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource
__BATCHSOURCECONFIGS__: '{"discoveryTriggererClassName":"org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger"}'
projectId: SECRETS
datasetName: pulsar-io-google-bigquery
tableName: test-google-bigquery-source
credentialJsonString: SECRETS
pulsar:
pulsarConfig: "test-pulsar-source-config"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-bigquery-{{connector:version}}.jar
clusterName: pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-pulsar-source-config
data:
webServiceURL: http://pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://pulsar-broker.default.svc.cluster.local:6650
You can use the Google Cloud BigQuery source connector with Function Worker or Function Mesh.
You can use the Google Cloud BigQuery source connector as a non built-in connector or a built-in connector as below.
If you already have a Pulsar cluster, you can use the Google Cloud BigQuery source connector as a non built-in connector directly.
This example shows how to create a Google Cloud BigQuery source connector on a Pulsar cluster using the pulsar-admin sources create
command.
PULSAR_HOME/bin/pulsar-admin sources create \
--source-config-file <google-bigquery-source-config.yaml >
--
You can make the Google Cloud BigQuery source connector as a built-in connector and use it on a standalone cluster or an on-premises cluster.
This example describes how to use the Google Cloud BigQuery source connector to feed data from Google Cloud BigQuery and write data to Pulsar topics in the standalone mode.
gcloud
CLI tool. For details, see installing Cloud SDK.####3 Steps
Load data to Google Cloud BigQuery table.
babynames
.
bq mk babynames
bq load babynames.names2021 yob2021.txt name:string,gender:string,count:integer
For more information, see Getting Started with Google Cloud BigQuery by the command-line tool.
Copy the JAR package to the Pulsar connectors directory.
cp pulsar-io-google-bigquery-{{connector:version}}.jar PULSAR_HOME/connectors/pulsar-io-google-bigquery-{{connector:version}}.jar
Start Pulsar in standalone mode.
PULSAR_HOME/bin/pulsar standalone
Consume the message from the Pulsar topic.
PULSAR_HOME/bin/pulsar-client consume -s topic-sub -st auto_consume -n 0 public/default/google-bigquery-source-test
Run the Google Cloud BigQuery source connector locally.
PULSAR_HOME/bin/pulsar-admin sources localrun --source-config-file <google-bigquery-source-config.yaml>
Now you can see the message from the Pulsar consumer like this.
----- got message -----
key:[null], properties:[], content:{gender=F, name=Evelyn, count=9434}
This example explains how to create a Google Cloud BigQuery source connector in an on-premises cluster.
Copy the JAR package of the Google Cloud BigQuery connector to the Pulsar connectors directory.
cp pulsar-io-google-bigquery-{{connector:version}}.jar $PULSAR_HOME/connectors/pulsar-io-google-bigquery-{{connector:version}}.jar
Reload all built-in connectors.
PULSAR_HOME/bin/pulsar-admin sources reload
Check whether the Google Cloud BigQuery source connector is available on the list or not.
PULSAR_HOME/bin/pulsar-admin sources available-sources
Create a Google Cloud BigQuery source connector on a Pulsar cluster using the pulsar-admin sources create
command.
PULSAR_HOME/bin/pulsar-admin sources create \
--source-config-file <google-bigquery-source-config.yaml>
This example describes how to create a Google Cloud BigQuery source connector for a Kuberbetes cluster using Function Mesh.
Create and connect to a Kubernetes cluster.
Create a Pulsar cluster in the Kubernetes cluster.
Install the Function Mesh Operator and CRD into the Kubernetes cluster.
Prepare Google Cloud BigQuery service. For details, see Getting Started with Google Cloud BigQuery by the command-line tool.
Define the Google Cloud BigQuery source connector with a YAML file and save it as source-sample.yaml
.
This example shows how to publish the Google Cloud BigQuery source connector to Function Mesh with a Docker image.
apiVersion: compute.functionmesh.io/v1alpha1
kind: Source
metadata:
name: google-bigquery-source-sample
spec:
image: streamnative/pulsar-io-bigquery:{{connector:version}}
className: org.apache.pulsar.functions.source.batch.BatchSourceExecutor
replicas: 1
maxReplicas: 1
output:
producerConf:
maxPendingMessages: 1000
maxPendingMessagesAcrossPartitions: 50000
useThreadLocalProducers: true
topic: persistent://public/default/google-bigquery-pulsar-source
sourceConfig:
__BATCHSOURCECLASSNAME__: org.apache.pulsar.ecosystem.io.bigquery.BigQuerySource
__BATCHSOURCECONFIGS__: '{"discoveryTriggererClassName":"org.apache.pulsar.ecosystem.io.bigquery.source.BigQueryOnceTrigger"}'
projectId: SECRETS
datasetName: pulsar-io-google-bigquery
tableName: test-google-bigquery-source
credentialJsonString: SECRETS
pulsar:
pulsarConfig: "test-pulsar-source-config"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-bigquery-{{connector:version}}.jar
clusterName: pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-pulsar-source-config
data:
webServiceURL: http://pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://pulsar-broker.default.svc.cluster.local:6650
Apply the YAML file to create the Google Cloud BigQuery source connector.
Input
kubectl apply -f <path-to-source-sample.yaml>
Output
source.compute.functionmesh.io/google-bigquery-source-sample created
Check whether the Google Cloud BigQuery source connector is created successfully.
Input
kubectl get all
Output
NAME READY STATUS RESTARTS AGE
pod/google-bigquery-source-sample-0 1/1 Running 0 77s