Skip to main content

Kafka Connect

  • Connectors and tasks
    • Connectors
      • Determining how many tasks will run for the connector
      • Deciding how to split the data-copying work between the tasks
      • Getting configurations for the tasks from the workers and passing it along
    • Tasks
      • Tasks are responsible for actually getting the data in and out of Kafka
  • Workers

Kafka Connect's worker processes are the "container' processes that execute the connectors and tasks

  • Connectors and tasks are responsible for the "moving data" part of data integrations, while the workers are responsible for the REST API, configuration management, reliability, high availability, scaling, and load balancing

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.

Kafka Connect features

  • A common framework for Kafka connectors - Kafka Connect standardizes integration of other data systems with Kafka, simplifying connector development, deployment, and management
  • Distributed and standalone modes - scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments
  • REST interface - submit and manage connectors to your Kafka Connect cluster via an easy to use REST API
  • Automatic offset management - with just a little information from connectors, Kafka Connect can manage the offset commit process automatically so connector developers do not need to worry about this error prone part of connector development
  • Distributed and scalable by default - Kafka Connect builds on the existing group management protocol. More workers can be added to scale up a Kafka Connect cluster.
  • Streaming/batch integration - leveraging Kafka's existing capabilities, Kafka Connect is an ideal solution for bridging streaming and batch data systems

Kafka Connect currently supports two types of Workers

1. Standalone (single process)

Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks.

2. Distributed

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the samegroup.idand they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers. Note the similarity to consumer group rebalance. Under the covers, connect workers are using consumer groups to coordinate and rebalance.

image

Main components

  • Connectors-- the high level abstraction that coordinates data streaming by managing tasks
  • Tasks-- the implementation of how data is copied to or from Kafka
  • Workers-- the running processes that execute connectors and tasks
  • Converters-- the code used to translate data between Connect and the system sending or receiving data
  • Transforms-- simple logic to alter each message produced by or sent to a connector

Task

image

Connect REST API

https://docs.confluent.io/current/connect/references/restapi.html

Connect typically runs in distributed mode and can be managed through REST APIs. The following table shows the common APIs.

CONNECT REST APIMEANING
GET /connectorsReturn a list of active connectors
POST /connectorsCreate a new connector
GET /connectors/{name}Get the information of a specific connector
GET /connectors/{name}/configGet configuration parameters for a specific connector
PUT /connectors/{name}/configUpdate configuration parameters for a specific connector
GET /connectors/{name}/statusGet the current status of the connector
- GET /connectors- return a list of active connectors
- POST /connectors- create a new connector; the request body should be a JSON object containing a stringnamefield and an objectconfigfield with the connector configuration parameters
- GET /connectors/{name}- get information about a specific connector
- GET /connectors/{name}/config- get the configuration parameters for a specific connector
- PUT /connectors/{name}/config- update the configuration parameters for a specific connector
- GET /connectors/{name}/status- get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
- GET /connectors/{name}/tasks- get a list of tasks currently running for a connector
- GET /connectors/{name}/tasks/{taskid}/status- get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
- PUT /connectors/{name}/pause- pause the connector and its tasks, which stops message processing until the connector is resumed
- PUT /connectors/{name}/resume- resume a paused connector (or do nothing if the connector is not paused)
- POST /connectors/{name}/restart- restart a connector (typically because it has failed)
- POST /connectors/{name}/tasks/{taskId}/restart- restart an individual task (typically because it has failed)
- DELETE /connectors/{name}- delete a connector, halting all tasks and deleting its configuration

Connectors

  • Landoop mqtt source connector
  • Confluent mqtt source connector

The connector requires a Confluent enterprise license, which is stored inside Kafka in a topic. The connector must be configured with Kafka client configuration properties so that it can connect to Kafka and validate the license.

  • Evokly open source mqtt connector

https://github.com/evokly/kafka-connect-mqtt

Kafka Connect Query Language (KCQL)

https://github.com/Landoop/kafka-connect-query-language

Commands

# using confluent mqtt source connector
docker exec kafka-connect curl -s -X POST -H "Content-Type: application/json" --data '{"name": "smap-mqtt-source", "config": {"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max":"1", "mqtt.server.uri":"tcp://emqx:1883", "mqtt.topics":"telemetry/#", "kafka.topic": "smap_telemetry_data", "mqtt.username":"abc_mqtt_client", "mqtt.password":"xitanez123"}}' http://kafka-connect:8082/connectors

errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = smap-mqtt-source
tasks.max = 1
transforms = []
value.converter = null

# using lenses connector
docker exec kafka-connect curl -s -X POST -H "Content-Type: application/json" --data '{"name": "smap-mqtt-source-lenses", "config": {"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector", "tasks.max":"1", "connect.mqtt.hosts":"tcp://emqx:1883", "connect.mqtt.username":"abc_mqtt_client", "connect.mqtt.password":"xitanez123", "connect.mqtt.service.quality":"1", "connect.mqtt.kcql":"INSERT INTO smap_telemetry_data SELECT * FROM telemetry/+/+ WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.BytesConverter`"}}' http://kafka-connect:8082/connectors

# validate configuration values
docker exec kafka-connect curl -s -X PUT -H "Content-Type: application/json" -d '{"name":"smap-mqtt-source","config": {"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max":"1", "mqtt.server.uri":"tcp://emqx:1883", "mqtt.username":"abc_mqtt_client", "mqtt.password":"xitanez123", "mqtt.topic":"smap_telemetry_data", "kafka.topic": "smap_telemetry_data"}}' http://kafka-connect:8082/connector-plugins/MqttSourceConnector/config/validate

# list all connector available
docker exec kafka-connect curl -s -X GET http://kafka-connect:8082/connectors/
docker exec kafka-connect curl -s -X GET http://kafka-connect:8082/connector-plugins/

# get status of a connector
docker exec kafka-connect curl -s -X GET http://kafka-connect:8082/connectors/smap-mqtt-source status

# delete connector
docker exec kafka-connect curl -X DELETE http://kafka-connect:8082/connectors/smap-mqtt-source

#Updating a connector config
curl -s -X PUT -H "Content-Type:application/json" --data '{"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector", "tasks.max":"1", "connect.mqtt.hosts":"tcp://mqtt.vernemq:1883", "connect.mqtt.username":"abc_mqtt_client", "connect.mqtt.password":"xitanez123", "connect.mqtt.service.quality":"1", "connect.mqtt.clean":"false", "connect.mqtt.kcql":"INSERT INTO smap_telemetry_data SELECT * FROM telemetry/+/+ WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.BytesConverter`"}' http://ke-cp-kafka-connect.kafka:8083/connectors/smap-mqtt-source-lenses/config

Lenses Source Mqtt Connector

Configurations

  • Kafka Connect framework configurations

    • name
    • tasks.max (1)
  • connector.class (com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector)

  • Connector Configurations

    • connect.mqtt.ksql
    • connect.mqtt.hosts
  • Optional Configurations

    • connect.mqtt.service.quality (default - 1)
    • connect.mqtt.username
    • connect.mqtt.password
    • connect.mqtt.client.id
    • connect.mqtt.timeout (default - 3000ms)
    • connect.mqtt.clean (default - true)
    • connect.mqtt.keep.alive (default - 5000)
    • connect.mqtt.converter.throw.on.error (default - false)
    • connect.converter.avro.schemas
    • connect.progress.enabled

Commands

curl -s -X POST -H "Content-Type: application/json" --data '{"name": "bench-test", "config": {"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector", "tasks.max":"1", "connect.mqtt.hosts":"tcp://mqtt.vernemq:1883", "connect.mqtt.username":"example_mqtt_client", "connect.mqtt.password":"xitanez123", "connect.mqtt.client.id":"bench-test-client-id", "connect.mqtt.service.quality":"1", "connect.mqtt.clean":"false", "connect.mqtt.kcql":"INSERT INTO bench_data SELECT * FROM bench/+ WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.BytesConverter`"}}'

http://ke-cp-kafka-connect.kafka:8083/connectors

https://docs.lenses.io/connectors/source/mqtt.html

Simple Message Transformations (SMT)

These single message transforms (SMTs) are available for use with Kafka Connect:

TransformDescription
CastCast fields or the entire key or value to a specific type, e.g. to force an integer field to a smaller width.
ExtractFieldExtract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.
ExtractTopicReplace the record topic with a new topic derived from its key or value.
FlattenFlatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.
HoistFieldWrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
InsertFieldInsert field using attributes from the record metadata or a configured static value.
MaskFieldMask specified fields with a valid null value for the field type.
RegexRouterUpdate the record topic using the configured regular expression and replacement string.
ReplaceFieldFilter or rename fields.
SetSchemaMetadataSet the schema name, version, or both on the record's key or value schema.
TimestampConverterConvert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.
TimestampRouterUpdate the record's topic field as a function of the original topic value and the record timestamp.
ValueToKeyReplace the record key with a new key formed from a subset of fields in the record value.

References