Lab: Install Kafka Connect
Task 1: Install Connect Cluster
Write a simple Kafka Connect cluster CR with 3 replicas using the Strimzi Kafka Connect CRD.
Use the version 4.0.0 !
It should limit to 2Gi of RAM and 200m of CPU.
Use the cluster-1-tls-managed CA cert.
The connect cluster needs an user to connect to the Kafka Cluster.
Use SASL_SSL with the scram-sha-512 authentication with a new user.
We want to activate the REST endpoint for connect. So don´t use the strimzi.io/use-connector-resources annotation.
https://strimzi.io/docs/operators/in-development/deploying#kafka-connect-str
Following parameters should be used:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
BONUS CHALLENGE:
You may find out the acls for the topic and the consumer group for your self. If you don`t want to participate in this challenge, use following config:
Topics ACL :
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
type: group
name: connect-cluster
operations:
- Read
- resource:
type: topic
name: connect-cluster-configs
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-status
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-offsets
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: postgres_cdc.public.airplanedata
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: __debezium-heartbeat.postgres_cdc
operations:
- Create
- Describe
- Read
- Write
Hint 1
If you need a good starting point look into the examples directory of the strimzi repo. It contains example CONNECT configs. For the User: the required topics for the user are known to you. The User needs read and write access to those.
Hint 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 4.0.0
replicas: {}
bootstrapServers: {}
tls:
trustedCertificates:
- secretName: {}
pattern: {}
resources: {}
authentication:
type: scram-sha-512
{}
config:
{}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: connect-cluster
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Kafka Connects internal topics used to store configuration, offsets orstatus
- resource:
type: group
name: connect-cluster
operations:
- Read
- resource:
type: topic
name: {}
operations:
- Create
- Describe
- Read
- Write
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
#annotations:
# strimzi.io/use-connector-resources: "true"
spec:
version: 4.0.0
replicas: 3
bootstrapServers: cluster-1-kafka-bootstrap.{your namespace}.svc:9093
tls:
trustedCertificates:
- secretName: cluster-1-tls-managed
pattern: "ca.crt"
resources:
requests:
memory: 2Gi
cpu: "200m"
limits:
memory: 2Gi
authentication:
type: scram-sha-512
username: connect-cluster
passwordSecret:
secretName: connect-cluster
password: password
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
#key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
#key.converter.schemas.enable: "false"
value.converter.schemas.enable: "false"
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: connect-cluster
labels:
strimzi.io/cluster: cluster-1
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
type: group
name: connect-cluster
operations:
- Read
- resource:
type: topic
name: connect-cluster-configs
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-status
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-offsets
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: postgres_cdc.public.airplanedata
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: __debezium-heartbeat.postgres_cdc
operations:
- Create
- Describe
- Read
- Write
Task 2: Install the Connector Plugins
Important!
If you installed your Connect Cluster already. Please delete it once again, since no active connector implementations are contained in the base image.
Install build plugins using the strimzi kafka connect build feature.
The container registry is krassestecontainerreistry.azurecr.io
The repository name is connect
The image name should be connect-postgres
The version should be 1.0.0
The push secret is acr-connect-push
Read this blog that helps you out:
https://strimzi.io/blog/2021/03/29/connector-build/
Following plugins should be installed:
# --- Debezium PostgreSQL (CDC) source connector ---
- name: debezium-postgres
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 3.2.1.Final
# repository: https://repo1.maven.org/maven2 # optional; Maven Central is default
# --- Confluent JDBC (source+sink). We’ll use it for the sink. ---
- name: confluent-jdbc
artifacts:
- type: zip
url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip
# --- PostgreSQL JDBC driver (required by JDBC connector) ---
- name: postgres-jdbc-driver
artifacts:
- type: maven
group: org.postgresql
artifact: postgresql
version: 42.7.3
Additionally add a role and rolebinding to allow connect to read the postgres user secret.
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["hippo-pguser-connect"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
name: connect-cluster-connect
namespace: { your namespace }
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
Hint 1
Add the build block to your existing connect CR.
Hint 2
build:
output:
type: docker
image: {container registry}/{repository}/{image name}:{version}
pushSecret: {}
plugins:
{}
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
#annotations:
# strimzi.io/use-connector-resources: "true"
spec:
version: 4.0.0
replicas: 3
bootstrapServers: cluster-1-kafka-bootstrap.{your namespace}.svc:9093
tls:
trustedCertificates:
- secretName: cluster-1-tls-managed
pattern: "ca.crt"
resources:
requests:
memory: 2Gi
cpu: "200m"
limits:
memory: 2Gi
authentication:
type: scram-sha-512
username: connect-cluster
passwordSecret:
secretName: connect-cluster
password: password
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
build:
output:
type: docker
image: krassestecontainerreistry.azurecr.io/connect/connect-postgres:1.0.0
pushSecret: acr-connect-push
plugins:
# --- Debezium PostgreSQL (CDC) source connector ---
- name: debezium-postgres
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 3.2.1.Final
# repository: https://repo1.maven.org/maven2 # optional; Maven Central is default
# --- Confluent JDBC (source+sink). We’ll use it for the sink. ---
- name: confluent-jdbc
artifacts:
- type: zip
url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip
# --- PostgreSQL JDBC driver (required by JDBC connector) ---
- name: postgres-jdbc-driver
artifacts:
- type: maven
group: org.postgresql
artifact: postgresql
version: 42.7.3
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: connect-cluster
labels:
strimzi.io/cluster: cluster-1
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
type: group
name: connect-cluster
operations:
- Read
- resource:
type: topic
name: connect-cluster-configs
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-status
operations:
- Create
- Describe
- Read
- Write
- resource:
type: topic
name: connect-cluster-offsets
operations:
- Create
- Describe
- Read
- Write
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["hippo-pguser-connect"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
name: connect-cluster-connect
namespace: { your namespace }
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
Task3: Start a Connector
Start a debezium source connector (CDC Source) using the Lenses ui.
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-example-configuration
The name should be “debezium-connector-postgres”.
Is should spin up one task.
It should connect to the postgres in your namespace and read data from the “connect.airplanedata” table.
Therefore set the database hostname, port, server, database, user, password, list of tables.
Additionally set following parameter:
topic.prefix=postgres_cdc
key.converter=org.apache.kafka.connect.storage.StringConverter
heartbeat.interval.ms=10000
snapshot.fetch.size=10240
slot.name=debezium_slot
snapshot.mode=when_needed
publication.autocreate.mode=filtered
publication.name=debezium_publication
Hint 1
Visit Lenses.io UI -> Click on Environment -> Click on Connect -> Click Add Connector -> Configure a debezium source connector -> Click show optional values -> add all values that are required -> delete the rest -> apply the config
Hint 2
tasks.max={}
plugin.name={}
database.server.name={}
database.hostname={}
database.port={}
database.user={}
database.password={}
database.dbname={}
table.include.list={}
topic.prefix=postgres_cdc
key.converter=org.apache.kafka.connect.storage.StringConverter
heartbeat.interval.ms=10000
snapshot.fetch.size=10240
slot.name=debezium_slot
snapshot.mode=when_needed
publication.autocreate.mode=filtered
publication.name=debezium_publication
Hint 3
name=debezium-connector-postgres
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=pgoutput
database.server.name=postgres-server
database.hostname=hippo-pods.{your namespace}.svc.cluster.local
database.port=5432
database.user=${secrets:{your namespace}/hippo-pguser-connect:user}
database.password=${secrets:{your namespace}/hippo-pguser-connect:password}
database.dbname=connect
table.include.list=public.airplanedata
schema.include.list=public
topic.prefix=postgres_cdc
key.converter=org.apache.kafka.connect.storage.StringConverter
heartbeat.interval.ms=10000
snapshot.fetch.size=10240
slot.name=debezium_slot
snapshot.mode=when_needed
publication.autocreate.mode=filtered
publication.name=debezium_publication
Task4: Verify the data flow
Check the status of the datagen application.
If it runs, check the status of the connect in Lenses.io.
If it runs check, if any data has been written to the airplanes topic.
Hint 1
Use k9s to get the status of the datagen.
Look into the Logs of the datagen pod.
Visit Lenses.io UI and check the airplane topic in your cluster.
Hint 2
ask the trainer
Hint 3
ask the trainer
Task5: Use the Connect REST API to get the status of the connector task
Check the status of the connect using the connect REST API.
https://kafka.apache.org/documentation.html#connect_rest
Hint 1
-
Find the correct REST call to use
-
Log into a pod in your cluster.
-
Use curl to get the status
Hint 2
k9s -> pod -> kafka node -> press "s" for shell access -> run:
curl -s http://{host}:{port}/connectors/{connector name}/status
Hint 3
ask the trainer