• Use Canal source connector to sync data to Pulsar


Pulsar IO




Enterprise Support

StreamNative supported


[ "ASF" ]

Canal Source

The Canal source connector pulls messages from MySQL to Pulsar topics.


The configuration of Canal source connector has the following properties.


Name Required Default Description
username true None Canal server account (not MySQL).
password true None Canal server password (not MySQL).
destination true None Source destination that Canal source connector connects to.
singleHostname false None Canal server address.
singlePort false None Canal server port.
cluster true false Whether to enable cluster mode based on Canal server configuration or not.

  • true: cluster mode.
    If set to true, it talks to zkServers to figure out the actual database host.

  • false: standalone mode.
    If set to false, it connects to the database specified by singleHostname and singlePort.
  • zkServers true None Address and port of the Zookeeper that Canal source connector talks to figure out the actual database host.
    batchSize false 1000 Batch size to fetch from Canal.


    Before using the Canal connector, you can create a configuration file through one of the following methods.

    • JSON

          "zkServers": "",
          "batchSize": "5120",
          "destination": "example",
          "username": "",
          "password": "",
          "cluster": false,
          "singleHostname": "",
          "singlePort": "11111",
    • YAML

      You can create a YAML file and copy the contents below to your YAML file.

          zkServers: ""
          batchSize: 5120
          destination: "example"
          username: ""
          password: ""
          cluster: false
          singleHostname: ""
          singlePort: 11111


    Here is an example of storing MySQL data using the configuration file as above.

    1. Start a MySQL server.

      $ docker pull mysql:5.7
      $ docker run -d -it --rm --name pulsar-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=canal -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw mysql:5.7
    2. Create a configuration file mysqld.cnf.

      pid-file    = /var/run/mysqld/
      socket      = /var/run/mysqld/mysqld.sock
      datadir     = /var/lib/mysql
      #log-error  = /var/log/mysql/error.log
      # By default we only accept connections from localhost
      #bind-address   =
      # Disabling symbolic-links is recommended to prevent assorted security risks
    3. Copy the configuration file mysqld.cnf to MySQL server.

      $ docker cp mysqld.cnf pulsar-mysql:/etc/mysql/mysql.conf.d/
    4. Restart the MySQL server.

      $ docker restart pulsar-mysql
    5. Create a test database in MySQL server.

      $ docker exec -it pulsar-mysql /bin/bash
      $ mysql -h -uroot -pcanal -e 'create database test;'
    6. Start a Canal server and connect to MySQL server.

      $ docker pull canal/canal-server:v1.1.2
      $ docker run -d -it --link pulsar-mysql -e -e canal.destinations=test -e canal.instance.master.address=pulsar-mysql:3306 -e canal.instance.dbUsername=root -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2
    7. Start Pulsar standalone.

      $ docker pull apachepulsar/pulsar:2.3.0
      $ docker run -d -it --link pulsar-canal-server -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone
    8. Modify the configuration file canal-mysql-source-config.yaml.

          zkServers: ""
          batchSize: "5120"
          destination: "test"
          username: ""
          password: ""
          cluster: false
          singleHostname: "pulsar-canal-server"
          singlePort: "11111"
    9. Create a consumer file

      import pulsar
      client = pulsar.Client('pulsar://localhost:6650')
      consumer = client.subscribe('my-topic',
      while True:
          msg = consumer.receive()
          print("Received message: '%s'" %
    10. Copy the configuration file canal-mysql-source-config.yaml and the consumer file to Pulsar server.

      $ docker cp canal-mysql-source-config.yaml pulsar-standalone:/pulsar/conf/
      $ docker cp pulsar-standalone:/pulsar/
    11. Download a Canal connector and start it.

      $ docker exec -it pulsar-standalone /bin/bash
      $ wget -P connectors
      $ ./bin/pulsar-admin source localrun \
      --archive ./connectors/pulsar-io-canal-2.3.0.nar \
      --classname \
      --tenant public \
      --namespace default \
      --name canal \
      --destination-topic-name my-topic \
      --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \
      --parallelism 1
    12. Consume data from MySQL.

      $ docker exec -it pulsar-standalone /bin/bash
      $ python
    13. Open another window to log in MySQL server.

      $ docker exec -it pulsar-mysql /bin/bash
      $ mysql -h -uroot -pcanal
    14. Create a table, and insert, delete, and update data in MySQL server.

      mysql> use test;
      mysql> show tables;
      mysql> CREATE TABLE IF NOT EXISTS `test_table`(`test_id` INT UNSIGNED AUTO_INCREMENT,`test_title` VARCHAR(100) NOT NULL,
      `test_author` VARCHAR(40) NOT NULL,
      `test_date` DATE,PRIMARY KEY ( `test_id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
      mysql> INSERT INTO test_table (test_title, test_author, test_date) VALUES("a", "b", NOW());
      mysql> UPDATE test_table SET test_title='c' WHERE test_title='a';
      mysql> DELETE FROM test_table WHERE test_title='c';