Skip to main content Link Search Menu Expand Document (external link)

Build Connectors with PDK

What is PDK

PDK is Plugin Development Kit. Provide an easier way to develop data source connectors

  • Database connectors
    • MySql, Oracle, Postgres, etc.
  • SaaS connectors
    • Facebook, Salesforce, Google docs, etc.
  • Custom connectors
    • Connect your custom data sources.

PDK connectors provide data source to flow in iDaaS pipeline to process, join, etc, and flow in a target which is also provided by PDK connectors.

Quick Start

Java 8+ and Maven need to be installed.

Generate Java Sample Project, then start filling the necessary methods to implement a PDK connector.


@TapConnectorClass("spec.json")
public class SampleConnector extends ConnectorBase {
    
    @Override
    public void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer) {
        //TODO Load schema from database, connection information in connectionContext#getConnectionConfig
        //Sample code shows how to define tables with specified fields.
        consumer.accept(list(
                //Define first table
                table("empty-table1")
                        //Define a field named "id", origin field type, whether is primary key and primary key position
                        .add(field("id", "VARCHAR").isPrimaryKey(true))
                        .add(field("description", "TEXT"))
                        .add(field("name", "VARCHAR"))
                        .add(field("age", "DOUBLE"))
        ));
    }

    private void streamRead(TapConnectorContext connectorContext, List<String> tableList, Object streamOffset, int batchSize, StreamReadConsumer consumer) {
        //TODO using CDC APi or log to read stream records from database, use consumer#accept to send to incremental engine.
        
        //Stream read may start slowly, please call below method to mark stream read started.  
        consumer.streamReadStarted();
        //Below is sample code to generate stream records directly
        while(!isShutDown.get()) {
            String tableId = tableList.get(0);
            List<TapEvent> tapEvents = list();
            for (int i = 0; i < batchSize; i++) {
                TapInsertRecordEvent event = insertRecordEvent(map(
                        entry("id", counter.incrementAndGet()),
                        entry("description", "123"),
                        entry("name", "123"),
                        entry("age", 12)
                ), tableId);
                tapEvents.add(event);
            }

            sleep(1000L);
            consumer.accept(tapEvents, null);
        }
    }

    private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, TapTable table, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) {
        //TODO write records into database
        //Need to tell incremental engine the write result
        writeListResultConsumer.accept(writeListResult()
                .insertedCount(inserted.get())
                .modifiedCount(updated.get())
                .removedCount(deleted.get()));
    }
}

Each connector need to provide a json file like above, described in TapConnector annotation.

spec.json describes three types information,

  • properties
    • Connector name, icon and id.
  • configOptions Form Json Schema
    • Describe a form for user to input the information for how to connect and which database to open.
  • dataTypes Data Type Expressions
    • This will be required when current data source need create table with proper types before insert records. Otherwise, this key can be empty.
    • Describe the capability of types for current data source.
    • Incremental Engine will generate proper types when table creation is needed.
{
  "properties": {
    "name": "PDK Sample Connector",
    "icon": "icon.jpeg",
    "id": "pdk-sample"
  },
  "configOptions": {
    "connection": {
      "type": "object",
      "properties": {
        "host": {
          "type": "string",
          "title": "Host",
          "required": true,
          "x-decorator": "FormItem",
          "x-component": "Input"
        },
        "port": {
          "type": "number",
          "title": "Port",
          "required": true,
          "x-decorator": "FormItem",
          "x-component": "Input"
        },
        "database": {
          "type": "string",
          "title": "Database",
          "required": true,
          "x-decorator": "FormItem",
          "x-component": "Input"
        }
      }
    }
  },
  "dataTypes": {
    "double": {"bit": 64, "to": "TapNumber"},
    "decimal[($precision,$scale)]": {"precision": [1, 27], "defaultPrecision": 10, "scale": [0, 9], "defaultScale": 0, "to": "TapNumber"}
  }
}

How to create PDK java project?

Requirements

  • java 8 and above
  • maven (https://maven.apache.org/download.cgi)

For Windows

  • gitbash (https://gitforwindows.org/)
  • setup maven (https://mkyong.com/maven/how-to-install-maven-in-windows/)

Install from source code. Please use gitbash to execute below commands for Windows,

git clone https://github.com/tapdata/tapdata.git
cd tapdata
mvn clean install
cd plugin-kit

Use template to create a project. Assume “group” is “io.tapdata” and “name” is “XDB” and “version” is “0.0.1”.

./bin/pdk template --group io.tapdata --name XDB --version 0.0.1 --output ../connectors

Use IntelliJ IDE to open tapdata, you can find it named “xdb-connector” under connectors module.

Test Driven Development

Provide json file which given the values of “configOptions” required from “spec.json” file.

{
    "connection": {
      "host": "192.168.153.132",
      "port": 9030,
      "database": "test"
    }
}

The json file can be outside of git project as you may don’t want to expose your password or ip/port, etc.

To run TDD command,

./bin/pdk test --testConfig xdb_tdd.json ../connectors/xdb-connector

Once PDK connector pass TDD tests, proves that the PDK connector is ready to work in Tapdata. You can also contribute the PDK connector to tapdata open source project.

Development Guide

PDK need developer to provide below,

  • Provide data types expression in spec.json for the database which need create table before record insertion.
  • Implement the methods that PDK required.
  • Use TDD to verify that the methods are implemented correctly.

Get started


Table of contents