Skip to content

Lab: Install Kafka Connect

Task 1: Install Connect Cluster

Write a simple Kafka Connect cluster CR with 3 replicas using the Strimzi Kafka Connect CRD.

Use the version 4.0.0 !

It should limit to 2Gi of RAM and 200m of CPU.

Use the cluster-1-tls-managed CA cert.

The connect cluster needs an user to connect to the Kafka Cluster.

Use SASL_SSL with the scram-sha-512 authentication with a new user.

We want to activate the REST endpoint for connect. So don´t use the strimzi.io/use-connector-resources annotation.

https://strimzi.io/docs/operators/in-development/deploying#kafka-connect-str

Following parameters should be used:

    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"

BONUS CHALLENGE:

You may find out the acls for the topic and the consumer group for your self. If you don`t want to participate in this challenge, use following config:

Topics ACL :
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
    type: group
    name: connect-cluster
  operations:
    - Read
- resource:
    type: topic
    name: connect-cluster-configs
  operations:
    - Create
    - Describe
    - Read
    - Write
- resource:
    type: topic
    name: connect-cluster-status
  operations:
    - Create
    - Describe
    - Read
    - Write
- resource:
    type: topic
    name: connect-cluster-offsets
  operations:
    - Create
    - Describe
    - Read
    - Write
- resource:
    type: topic
    name: postgres_cdc.public.airplanedata
  operations:
    - Create
    - Describe
    - Read
    - Write  
- resource:
    type: topic
    name: __debezium-heartbeat.postgres_cdc
  operations:
    - Create
    - Describe
    - Read
    - Write
Hint 1

If you need a good starting point look into the examples directory of the strimzi repo. It contains example CONNECT configs. For the User: the required topics for the user are known to you. The User needs read and write access to those.

Hint 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
    name: connect-cluster
annotations:
    strimzi.io/use-connector-resources: "true"
spec:
    version: 4.0.0
replicas: {}
bootstrapServers: {}
tls:
    trustedCertificates:
    - secretName: {}
        pattern: {}
resources: {}
authentication:
    type: scram-sha-512
    {}
config:
    {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
    name: connect-cluster
labels:
    strimzi.io/cluster: kafka-cluster
spec:
    authentication:
        type: scram-sha-512
authorization:
    type: simple
    acls:
    # Kafka Connects internal topics used to store configuration, offsets orstatus
    - resource:
        type: group
        name: connect-cluster
    operations:
        - Read
    - resource:
        type: topic
        name: {}
    operations:
        - Create
        - Describe
        - Read
        - Write
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  #annotations:
  #  strimzi.io/use-connector-resources: "true"
spec:
  version: 4.0.0
  replicas: 3
  bootstrapServers: cluster-1-kafka-bootstrap.{your namespace}.svc:9093
  tls:
    trustedCertificates:
      - secretName: cluster-1-tls-managed
        pattern: "ca.crt"
  resources:
    requests:
      memory: 2Gi
      cpu: "200m"
    limits:
      memory: 2Gi
  authentication:
    type: scram-sha-512
    username: connect-cluster
    passwordSecret:
      secretName: connect-cluster
      password: password
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    #key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    #key.converter.schemas.enable: "false"
    value.converter.schemas.enable: "false"
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: connect-cluster
  labels:
    strimzi.io/cluster: cluster-1
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
    # Kafka Connects internal topics used to store configuration, offsets or status
    - resource:
        type: group
        name: connect-cluster
      operations:
        - Read
    - resource:
        type: topic
        name: connect-cluster-configs
      operations:
        - Create
        - Describe
        - Read
        - Write
    - resource:
        type: topic
        name: connect-cluster-status
      operations:
        - Create
        - Describe
        - Read
        - Write
    - resource:
        type: topic
        name: connect-cluster-offsets
      operations:
        - Create
        - Describe
        - Read
        - Write
    - resource:
        type: topic
        name: postgres_cdc.public.airplanedata
      operations:
        - Create
        - Describe
        - Read
        - Write  
    - resource:
        type: topic
        name: __debezium-heartbeat.postgres_cdc
      operations:
        - Create
        - Describe
        - Read
        - Write

Task 2: Install the Connector Plugins

Important!

If you installed your Connect Cluster already. Please delete it once again, since no active connector implementations are contained in the base image.

Install build plugins using the strimzi kafka connect build feature.

The container registry is krassestecontainerreistry.azurecr.io

The repository name is connect

The image name should be connect-postgres

The version should be 1.0.0

The push secret is acr-connect-push

Read this blog that helps you out:

https://strimzi.io/blog/2021/03/29/connector-build/

Following plugins should be installed:

      # --- Debezium PostgreSQL (CDC) source connector ---
      - name: debezium-postgres
        artifacts:
          - type: maven
            group: io.debezium
            artifact: debezium-connector-postgres
            version: 3.2.1.Final
            # repository: https://repo1.maven.org/maven2  # optional; Maven Central is default
      # --- Confluent JDBC (source+sink). We’ll use it for the sink. ---
      - name: confluent-jdbc
        artifacts:
          - type: zip
            url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip
      # --- PostgreSQL JDBC driver (required by JDBC connector) ---
      - name: postgres-jdbc-driver
        artifacts:
          - type: maven
            group: org.postgresql
            artifact: postgresql
            version: 42.7.3

Additionally add a role and rolebinding to allow connect to read the postgres user secret.

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["hippo-pguser-connect"]
  verbs: ["get"]
--- 

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
  name: connect-cluster-connect
  namespace: { your namespace }
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
Hint 1

Add the build block to your existing connect CR.

Hint 2
    build:
        output:
        type: docker
        image: {container registry}/{repository}/{image name}:{version}
        pushSecret: {}
        plugins:
            {}
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  #annotations:
  #  strimzi.io/use-connector-resources: "true"
spec:
  version: 4.0.0
  replicas: 3
  bootstrapServers: cluster-1-kafka-bootstrap.{your namespace}.svc:9093
  tls:
    trustedCertificates:
      - secretName: cluster-1-tls-managed
        pattern: "ca.crt"
  resources:
    requests:
      memory: 2Gi
      cpu: "200m"
    limits:
      memory: 2Gi
  authentication:
    type: scram-sha-512
    username: connect-cluster
    passwordSecret:
      secretName: connect-cluster
      password: password
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"
  build:
    output:
      type: docker
      image: krassestecontainerreistry.azurecr.io/connect/connect-postgres:1.0.0
      pushSecret: acr-connect-push
    plugins:
      # --- Debezium PostgreSQL (CDC) source connector ---
      - name: debezium-postgres
        artifacts:
          - type: maven
            group: io.debezium
            artifact: debezium-connector-postgres
            version: 3.2.1.Final
            # repository: https://repo1.maven.org/maven2  # optional; Maven Central is default
      # --- Confluent JDBC (source+sink). We’ll use it for the sink. ---
      - name: confluent-jdbc
        artifacts:
          - type: zip
            url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip
      # --- PostgreSQL JDBC driver (required by JDBC connector) ---
      - name: postgres-jdbc-driver
        artifacts:
          - type: maven
            group: org.postgresql
            artifact: postgresql
            version: 42.7.3
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: connect-cluster
  labels:
    strimzi.io/cluster: cluster-1
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
    # Kafka Connects internal topics used to store configuration, offsets or status
    - resource:
        type: group
        name: connect-cluster
      operations:
        - Read
    - resource:
        type: topic
        name: connect-cluster-configs
      operations:
        - Create
        - Describe
        - Read
        - Write
    - resource:
        type: topic
        name: connect-cluster-status
      operations:
        - Create
        - Describe
        - Read
        - Write
    - resource:
        type: topic
        name: connect-cluster-offsets
      operations:
        - Create
        - Describe
        - Read
        - Write
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["hippo-pguser-connect"]
  verbs: ["get"]
--- 

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
  name: connect-cluster-connect
  namespace: { your namespace }
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io

Task3: Start a Connector

Start a debezium source connector (CDC Source) using the Lenses ui.

https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-example-configuration

The name should be “debezium-connector-postgres”.

Is should spin up one task.

It should connect to the postgres in your namespace and read data from the “connect.airplanedata” table.

Therefore set the database hostname, port, server, database, user, password, list of tables.

Additionally set following parameter:

    topic.prefix=postgres_cdc
    key.converter=org.apache.kafka.connect.storage.StringConverter
    heartbeat.interval.ms=10000
    snapshot.fetch.size=10240
    slot.name=debezium_slot
    snapshot.mode=when_needed
    publication.autocreate.mode=filtered
    publication.name=debezium_publication

Hint 1

Visit Lenses.io UI -> Click on Environment -> Click on Connect -> Click Add Connector -> Configure a debezium source connector -> Click show optional values -> add all values that are required -> delete the rest -> apply the config

Hint 2
tasks.max={}
plugin.name={}
database.server.name={}
database.hostname={}
database.port={}
database.user={}
database.password={}
database.dbname={}
table.include.list={}
topic.prefix=postgres_cdc
key.converter=org.apache.kafka.connect.storage.StringConverter
heartbeat.interval.ms=10000
snapshot.fetch.size=10240
slot.name=debezium_slot
snapshot.mode=when_needed
publication.autocreate.mode=filtered
publication.name=debezium_publication
Hint 3
name=debezium-connector-postgres
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=pgoutput
database.server.name=postgres-server
database.hostname=hippo-pods.{your namespace}.svc.cluster.local
database.port=5432
database.user=${secrets:{your namespace}/hippo-pguser-connect:user}
database.password=${secrets:{your namespace}/hippo-pguser-connect:password}
database.dbname=connect
table.include.list=public.airplanedata
schema.include.list=public     
topic.prefix=postgres_cdc
key.converter=org.apache.kafka.connect.storage.StringConverter
heartbeat.interval.ms=10000
snapshot.fetch.size=10240
slot.name=debezium_slot
snapshot.mode=when_needed
publication.autocreate.mode=filtered
publication.name=debezium_publication

Task4: Verify the data flow

Check the status of the datagen application.

If it runs, check the status of the connect in Lenses.io.

If it runs check, if any data has been written to the airplanes topic.

Hint 1

Use k9s to get the status of the datagen.

Look into the Logs of the datagen pod.

Visit Lenses.io UI and check the airplane topic in your cluster.

Hint 2

ask the trainer

Hint 3

ask the trainer

Task5: Use the Connect REST API to get the status of the connector task

Check the status of the connect using the connect REST API.

https://kafka.apache.org/documentation.html#connect_rest

Hint 1
  1. Find the correct REST call to use

  2. Log into a pod in your cluster.

  3. Use curl to get the status

Hint 2
    k9s -> pod -> kafka node -> press "s" for shell access -> run:
    curl -s http://{host}:{port}/connectors/{connector name}/status
Hint 3

ask the trainer