The Pulsar Flink connector is an integration of Apache Pulsar and Apache Flink (data processing engine), which allows Flink to read and write data to and from Pulsar.
The Pulsar Flink connector consists of the Pulsar DataStream connector and the Pulsar SQL connector.
The Pulsar DataStream connector is maintained as a part of the official Flink library. For details about the Pulsar DataStream connector, see Flink's documentation.
Currently, the Pulsar SQL connector is not delivered as a part of the official Flink binary. It is maintained by StreamNative.
The Pulsar SQL connector allows you to query data from or write data into Pulsar topics using simple SQL queries or the Flink Table API.
This table outlines the Maven dependency that is added to the Pulsar SQL connector.
Maven Dependency | SQL JAR |
---|---|
|
This table outlines options for the Pulsar SQL connector.
Key | Default | Type | Description |
---|---|---|---|
admin-url |
(none) | String | The Pulsar service HTTP URL for the admin endpoint. For example, http://my-broker.example.com:8080 or https://my-broker.example.com:8443 for TLS. |
explicit |
true | Boolean | Indicate if the table is an explicit Flink table. |
key.fields |
List<String> | An explicit list of physical columns from the table schema that are decoded/encoded from the key bytes of a Pulsar message. By default, this list is empty and thus a key is undefined. | |
key.format |
(none) | String | The format that is used to deserialize and serialize the key bytes of Pulsar messages. The format identifier is used to discover a suitable format factory. |
service-url |
(none) | String | The Service URL for the Pulsar service. To connect to a Pulsar cluster using a client library, you need to specify a Pulsar protocol URL. You can assign a Pulsar protocol URL to a specific cluster. -This is an example URL of the localhost:pulsar://localhost:6650 . - If you have multiple brokers, the URL is something like pulsar://localhost:6550,localhost:6651,localhost:6652 . - A URL for a production Pulsar cluster is something like pulsar://pulsar.us-west.example.com:6650 . - If TLS authentication is enabled, the URL is something like pulsar+ssl://pulsar.us-west.example.com:6651 . |
sink.custom-topic-router |
(none) | String | (Optional) the custom topic router class URL that is used in the Pulsar DataStream sink connector. If this option is provided, the sink.topic-routing-mode option will be ignored. |
sink.message-delay-interval |
0 ms | Duration | (Optional) the message delay delivery interval that is used in the Pulsar DataStream sink connector. |
sink.topic-routing-mode |
round-robin | Enum | (Optional) the topic routing mode. Available options are round-robin and message-key-hash . By default, it is set to round-robin . If you want to use a custom topic router, use the sink.custom-topic-router option to determine the partition for a particular message.- round-robin : the producer publishes messages across all partitions in a round-robin fashion to achieve the maximum throughput. The round-robin method is not implemented for individual messages. However, it is set to the same boundary of the pulsar.producer.batchingMaxMessages option to make batching take effect.- message-key-hash : if no key is provided, the partitioned producer will randomly pick one single topic partition and publish all messages to that partition. If a key is provided for a message, the partitioned producer will hash the key and assign the message to a particular partition. |
source.start.message-id |
(none) | String | (Optional) the message ID that is used to specify a starting point for the Pulsar DataStream source connector to consume data. Available options are earliest , latest , and the message ID (in a format of ledgerId:entryId:partitionId , such as "12:2:-1"). |
source.start.publish-time |
(none) | Long | (Optional) the publish timestamp that is used to specify a starting point for the Pulsar DataStream source connector to consume data. |
source.subscription-name |
flink-sql-connector-pulsar | String | The subscription name of the consumer that is used by the runtime Pulsar DataStream source connector. This argument is required for constructing the consumer. |
source.subscription-type |
Exclusive | Enum | The subscription type that is supported by the Pulsar DataStream source connector. Currently, only Exclusive and Shared subscription types are supported. |
source.stop.at-message-id |
(none) | String | (Optional) message id used to specify a stop cursor for the unbounded sql source. Use never" , latest or pass in a message id representation in ledgerId:entryId:partitionId , such as 12:2:-1 |
source.stop.at-publish-time |
(none) | Long | (Optional) publish timestamp used to specify a stop cursor for the unbounded sql source. |
source.stop.after-message-id |
(none) | String | (Optional) message id used to specify a stop position but include the given message in the consuming result for the unbounded sql source. Pass in a message id representation in ledgerId:entryId:partitionId , such as 12:2:-1 . |
topics |
(none) | List<String> | Topic name(s) the table reads data from. It can be a single topic name or a list of topic names separated by a semicolon symbol (; ) like topic-1;topic-2 . |
Besides these Pulsar SQL connector options, you can configure the underlying Pulsar DataStream connector using the connector options defined in Apache Pulsar Source Connector and [Apache Pulsar Sink Connector]([subscription type](Apache Pulsar Source Connector using the WITH
clause.
This section describes some significant features for the Pulsar SQL connector.
The connector metadata are the metadata specific to the external system in use. Flink SQL supports using metadata from the external system or their decoding/encoding formats as a table column.
This table outlines the Pulsar topic metadata that can be mapped by the Pulsar SQL connector to Flink table fields.
Note
- The
R/W
column defines whether a metadata is readable from the Pulsar topics (R
) and/or writable to the Pulsar topics (W
).- The
R
column defines that a metadata is read-only. The read-only metadata must be declaredVIRTUAL
to exclude them during anINSERT INTO
operation.
Key | Data Type | R/W |
---|---|---|
topic | STRING NOT NULL | R |
message_size | INT NOT NULL | R |
producer_name | STRING NOT NULL | R |
message_id | BYTES NOT NULL | R |
sequenceId | BIGINT NOT NULL | R |
publish_time | TIMESTAMP_LTZ(3) NOT NULL | R |
event_time | TIMESTAMP_LTZ(3) NOT NULL | R/W |
properties | MAP<STRING, STRING> NOT NULL | R/W |
The extended CREATE TABLE
example demonstrates the syntax for exposing publish_time
, producer_name
, and topic
metadata fields.
CREATE TABLE user
(
`publish_time` TIMESTAMP(3) METADATA VIRTUAL FROM 'publish_time',
`producer_name` STRING METADATA VIRTUAL,
`topic` STRING METADATA VIRTUAL,
`uid` BIGINT,
`item_id` BIGINT,
`description` STRING
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/user',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'source.start.message-id' = 'earliest' ,
'format' = 'json'
);
You can serialize or deserialize the key and the value of a Pulsar message to or from raw bytes using one of the Flink official formats. For details about how to configure the key and value formats, see configure key and value formats.
If the key of a Pulsar message is absent or ignored, you should use the format
option (a synonym for value.format
) to designate a decoded/encoded format for the Pulsar message. For details, see the formats documentation. All format options are prefixed with the format identifier, such as json.ignore-parse-errors
.
CREATE TABLE users
(
`uid` BIGINT,
`item_id` BIGINT,
`description` STRING
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/users',
'service-url' = 'pulsar://localhost:6650',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
You can use the value format to deserialize a Pulsar message to the following Flink data types:
ROW<`uid` BIGINT, `item_id` BIGINT, `description` STRING>
The deserialized or serialized key of a Pulsar message can be mapped to a Flink table field. The key format includes the fields that are listed in the key.fields
option. You can use the semicolon symbol (;
) to separate multiple fields. Thus, the key format is configured with the following data type:
ROW<`uid` BIGINT, `item_id` BIGINT>
And all the key fields are excluded in the value format's data type:
ROW<`description` STRING>
The topics
and topic-pattern
options specify the topics and topic pattern for the Pulsar DataStream connector to consume or produce data.
topics
: a list of topics that are separated using the semicolon symbol (;
) like topic-1;topic-2
.
topic-pattern
: uses the regular expression to discover the matched topics. For example, if the topic-pattern
is set to test-topic-[0-9]
, then all topics whose names match the specified regular expression (starting with test-topic-
and ending with a single digit) will be subscribed to by the consumer when running a query job.
By default, the Pulsar DataStream source connector dynamically checks topic changes every 30 seconds, such as the number of topic partitions and topics. To disable automatic topic and partition discovery, you can set a negative value for the pulsar.source.partitionDiscoveryIntervalMs
option. For details, see Pulsar DataStream connector documentation.
You can use the source.start.message-id
or source.start-publish-time
option to designate the starting position for the Pulsar DataStream source connector.
You can configure one of the following values for the source.start.message-id
option:
earliest
latest
<ledgerId>:<entryId>:<partitionIndex>
format)You can configure the source.start.publish-time
option with a Long type timestamp value.
The source.start.message-id
and source.start-publish-time
options are exclusive. If both options are configured, the validation will fail. By default, the Pulsar DataStream source connector uses the StartCursor.earliest()
option.
The Pulsar DataStream connector supports designating a topic router. In the Pulsar SQL connector, you can configure the sink topic router using the sink.topic-routing-mode
or sink.custom-topic-router
option.
sink.topic-routing-mode
: the built-in topic router implementation. Available values are round-robin
and message-key-hash
.sink.custom-topic-router
: the full class name of the custom topic router. Make sure that the custom topic router implementation is set on the classpath.The sink.topic-routing-mode
and sink.custom-topic-router
options are exclusive. If both options are configured, the validation will fail.
Pulsar Schema describes how to deserialize data from and serialize data to a message stored in Pulsar topics. Flink SQL also has its own data types as well as the deserialization and serialization framework. The Pulsar SQL connector provides multiple choices to map a Pulsar schema to Flink data types to transfer data between Flink SQL and Pulsar.
Flink SQL uses formats to decode and encode data from external systems, such as flink-csvs, flink-json, and flink-avro. If you want to read a Pulsar topic with a predefined Pulsar schema (JSON, CSV, or Avro schema), you can use the related Flink format.
This table outlines the recommended Flink format for each Pulsar Schema. To use the avro
and json
format, you should add flink-avro
and flink-json
to your dependencies if they are not added yet. (By default, the JAR package of the Pulsar SQL connector is bundled with them).
Pulsar schema | Flink format |
---|---|
AVRO | avro |
JSON | json |
PROTOBUF | Not supported yet |
PROTOBUF_NATIVE | Not supported yet |
AUTO_CONSUME | Not supported yet |
AUTO_PUBLISH | Not supported yet |
NONE/BYTES | raw |
BOOLEAN | raw |
STRING | raw |
DOUBLE | raw |
FLOAT | raw |
INT8 | raw |
INT16 | raw |
INT32 | raw |
INT64 | raw |
LOCAL_DATE | Not supported yet |
LOCAL_TIME | Not supported yet |
LOCAL_DATE_TIME | Not supported yet |
Note
When Flink writes data to a Pulsar topic with the Avro format, the Pulsar consumer cannot consume data with the Avro schema.
PulsarCatalog supports configuring the Pulsar cluster as the metadata storage for Flink tables.
PulsarCatalog defines two different kinds of tables: explicit
tables and native
tables.
explicit
table is a table explicitly created using the CREATE
statements or table API. It is like the common use pattern in other SQL connectors. You can create a table and then query data from or write data to the table.native
table is automatically created by PulsarCatalog. PulsarCatalog scans all non-system topics in a Pulsar cluster, and then maps each topic into a Flink table without using the CREATE
statements.PulsarCatalog uses the Pulsar SchemaInfo
to store the Flink schema bytes for an explicit table
.
For each explicit
table, PulsarCatalog creates a placehoder topic under a preconfigured tenant in the Pulsar cluster. The default value for such a preconfigured tenant is named __flink_catalog
, but you can use the catalog-tenant
option to specify a different tenant name. The Flink database is then mapped to a namespace with the same name. Finally, the placehoder topic that saves the Flink table schema information is named as table_<FLINK_TABLE_NAME>
.
For example, if you create a table users
under the testdb
database, then a topic named table_users
is created under the __flink_catalog
tenant in thetestdb
namespace.
The table like table_users
is called a placeholder topic, because these topics do not have any producer or consumer. Therefore, no data would flow in or out of these topics. You can use the schema information of such topics to store the Flink table metadata.
For the JSON format, the Flink table schema is serialized and then stored under the schema
key. Other table options from CREATE TABLE xxx WITH ()
statements are stored under the properties
key.
You can use the pulsar-admin
CLI tool to retrieve the JSON format of a topic schema:
pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>
A native
table does not have any placeholder topic. Instead, PulsarCatalog maps the Pulsar schema of the native table
to a Flink table schema when creating a SQL job.
This table outlines the Flink formats that are used for a native table's Pulsar schema.
Pulsar schema | Flink data type | Flink format | Work or not |
---|---|---|---|
AVRO | It is decided by the Avro format. | avro | Yes |
JSON | It is decided by the JSON format. | json | Yes |
PROTOBUF | Not supported yet | / | No |
PROTOBUF_NATIVE | It is decided by the Protobuf definition. | Not supported yet | No |
AUTO_CONSUME | Not supported yet | / | No |
AUTO_PUBLISH | Not supported yet | / | No |
NONE/BYTES | DataTypes.BYTES() | raw | Yes |
BOOLEAN | DataTypes.BOOLEAN() | raw | Yes |
LOCAL_DATE | DataTypes.DATE() | / | No |
LOCAL_TIME | DataTypes.TIME() | / | No |
LOCAL_DATE_TIME | DataTypes.TIMESTAMP(3) | / | No |
STRING | DataTypes.STRING() | raw | Yes |
DOUBLE | DataTypes.DOUBLE() | raw | Yes |
FLOAT | DataTypes.FLOAT() | raw | Yes |
INT8 | DataTypes.TINYINT() | raw | Yes |
INT16 | DataTypes.SMALLINT() | raw | Yes |
INT32 | DataTypes.INT() | raw | Yes |
INT64 | DataTypes.BIGINT() | raw | Yes |
Note
Even if there are corresponding Flink data formats for the
LOCAL_DATE
,LOCAL_TIME
, andLOCAL_DATE_TIME
options, theraw
format is not able to decode messages with these Pulsar schemas. In this case, the auto schema mapping will fail.
After a native
table is created, you can query data from existing Pulsar topics. PulsarCatalog automatically reads the topic's schema and decides which decoded/encoded format to use. However, the native
table does not support watermarks and primary keys. Therefore, you cannot use the native
table to do window aggregation. A native
table maps tenant/namespace
to a database and the topic name to the table name.
To fully manage a table, you can use the explicit
table to define watermark fields, specify metadata fields, and specify a custom format. The usage is similar to creating a Pulsar table in GenericInMemoryCatalog
. You can bind an explicit
table to a Pulsar topic and each Pulsar topic can be bound to multiple Flink tables (including the native
table).
Key | Default | Type | Description | Required |
---|---|---|---|---|
catalog-admin-url |
"http://localhost:8080" | String | (Required) The admin URL of the Pulsar cluster. | Yes |
catalog-auth-params |
(none) | String | The authentication parameters for accessing the Pulsar cluster. | |
catalog-auth-plugin |
(none) | String | The name of the authentication plugin for accessing the Pulsar cluster. | |
catalog-service-url |
"pulsar://localhost:6650" | String | (Required) The service URL of the Pulsar cluster. | Yes |
catalog-tenant |
"__flink_catalog" | String | The Pulsar tenant that stores all table information. | |
default-database |
"default" | String | The default database when using PulsarCatalog. It will be created if it does not exist. |
You can create PulsarCatalog using any of the following ways:
This example shows how to create PulsarCatalog using the SQL query statements.
CREATE
CATALOG pulsar
WITH (
'type' = 'pulsar-catalog',
'catalog-admin-url' = '<ADMIN_URL>',
'catalog-service-url' = '<SERVICE_URL>'
);
This example shows how to create PulsarCatalog using the Flink Table API.
Catalog pulsarCatalog = new PulsarCatalog();
tableEnv.registerCatalog("pulsar",pulsarCatalog);
Note
If you use the SQL Client and configure catalogs in
sql-clients-defaults.yaml
YAML file, make sure that this file has been removed due to FLIP-163 and this file is no longer available for Flink 1.15.
This section lists some examples about the Pulsar SQL connector.
This example shows how to use the Pulsar SQL connector to create a table in a JSON encoding/decoding format.
CREATE TABLE users
(
`user_id` BIGINT,
`item_id` BIGINT,
`description` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/users',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'format' = 'json'
)
This example shows how to specify and configure the key format and value formats. The format options are prefixed with either the key
or value
plus the format identifier.
CREATE TABLE users
(
`uid` BIGINT,
`item_id` BIGINT,
`description` STRING
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/users',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'uid;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false'
This example shows how to specify and configure the authentication parameters used by the pulsar-admin client.
CREATE TABLE users
(
`uid` BIGINT,
`item_id` BIGINT,
`description` STRING
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/users',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'pulsar.client.tlsAllowInsecureConnection' = 'true',
'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyMSJ9.2AgtxHe8-2QBV529B5DrRtpuqP6RJjrk21Mhn'
apache/flink
repository and streamnative/flink
repositoryStreamNative forks the apache/flink repository and maintains the project in the streamnative/flink repository. Compared with the official Flink repository, the StreamNative forked repository maintains a different Pulsar DataStream connector and Pulsar SQL connector.
Currently, the apache/flink
repository only contains the Pulsar DataStream connector while the streamnative/flink
repository contains both the Pulsar DataStream connector and the Pulsar SQL connector.
streamnative/flink
repository differ slightly from those in the apache/flink
repository. Usually, new features and bug fixes will first go into the streamnative/flink
repository, and then related PRs are submitted to the apache/flink
repository.For detailed differences and recommendations, see the following sections.
Repository | Pulsar DataStream source | Pulsar DataStream sink | Pulsar SQL source | Pulsar SQL sink |
---|---|---|---|---|
apache/flink 1.14 | Yes | |||
apache/flink 1.15 | Yes | Yes | ||
streamnative/flink 1.15 | Yes | Yes | Yes | Yes |
Repository | Release | Versioning | DataStream connector documentation | SQL connector documentation |
---|---|---|---|---|
apache/flink | Managed by Flink community, following Flink release process | Standard Semantic Versioning, like 1.15.0 | Maintained on Official Flink Website | Not available yet |
streamnative/flink | Managed by StreamNative, following a monthly release process. | Use an extra digit on the top of the Flink version, such as 1.15.1.1 | Not available yet, but similar to the Official Flink Website | Maintained in the current documentation |
streamnative/flink
repository to submit bug reports and feature requests. Once the Pulsar SQL connector is merged back to the official Flink repository, use the Flink JIRA as well.Repository | Pulsar DataStream connector | Pulsar SQL connector |
---|---|---|
apache/flink | https://search.maven.org/artifact/org.apache.flink/flink-connector-pulsar | |
streamnative/flink | https://search.maven.org/artifact/io.streamnative.connectors/flink-connector-pulsar | https://search.maven.org/artifact/io.streamnative.connectors/flink-sql-connector-pulsar |