Lab: Setup Mirror Maker
Task 1: Create Mirror Maker base config
Setup the Mirror Maker CR config.
https://strimzi.io/docs/operators/0.29.0/configuring#proc-mirrormaker-replication-str
It should limit to 2Gi of RAM and 500m of CPU.
It should use 1 replica. The connectCluster is ccloud since it needs to be the target. For the source alias:
Use your strimzi cluster.
You need to setup tls and authentication. Therefore you also need a user.
For the target alias:
Use Confluent Cloud.
Access information will be provided by the trainer.
To configure tls, use: tls: {}
To configure authentication use type plain and the API Key as username and the API Secret as password. You may need to create a secret.
For the source connector: Use strimzi as source and confluent as target.
Use 1 Task.
Set following config:
```yaml
config:
sync.topic.acls.enabled: "false"
sync.group.acls.enabled: "false"
sync.cluster.acls.enabled: "false"
sync.topic.configs.enabled: "false"
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
refresh.topics.interval.seconds: 60
```
For the hearbeat connector:
Use 1 task.
For the checkpoint connector:
Use 1 task.
Set following config:
```yaml
config:
emit.checkpoints.enabled: "true"
sync.group.offsets.enabled: "true"
```
For the topics pattern, use: “postgres.*” for the groups pattern use groupsPattern: “”.
Hint 1
If you need a good starting point look into the examples directory of the strimzi repo. It contains example MIRROR-MAKER configs. Also look in the security folder of the examples. For the User: give the user access to all topics and consumer groups and the cluster and the transactional IDs. Use a k8s secret to store the api key secret.
Hint 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: mm2-to-ccloud
spec:
version: 4.0.0
replicas: 1
connectCluster: ccloud
clusters:
# === SOURCE: Strimzi (SASL/SCRAM) ===
- alias: src
bootstrapServers: {TODO}
authentication:
type: {TODO}
username: {TODO}
passwordSecret:
secretName: {TODO}
password: {TODO}
tls:
trustedCertificates:
- secretName: {TODO}
pattern: {TODO}
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
# === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
- alias: ccloud
bootstrapServers: {given bootstrap}
tls: {}
authentication:
type: plain
username: {given api key}
passwordSecret:
secretName: ccloud-api
password: password
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
ssl.endpoint.identification.algorithm: "https"
mirrors:
- sourceCluster: src
targetCluster: ccloud
sourceConnector:
tasksMax: {TODO}
config:
{TODO}
heartbeatConnector:
{TODO}
checkpointConnector:
{TODO}
topicsPattern: ""
groupsPattern: ""
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: mirror-maker
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Full cluster-level operations
- resource:
type: cluster
name: kafka-cluster
patternType: literal
operation: All
# All operations on all topics
- resource:
type: topic
name: "*"
patternType: literal
operation: All
# Allow use of any consumer group
- resource:
type: group
name: "*"
patternType: literal
operation: All
# Access to all transactional IDs (for producers using transactions)
- resource:
type: transactionalId
name: "*"
patternType: literal
operation: All
---
apiVersion: v1
kind: Secret
metadata:
name: ccloud-api
type: Opaque
stringData:
password: {TODO}
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: mm2-to-ccloud
spec:
version: 4.0.0
replicas: 1
connectCluster: ccloud
clusters:
# === SOURCE: Strimzi (SASL/SCRAM) ===
- alias: src
bootstrapServers: kafka-cluster-kafka-bootstrap.strimzi:9093
authentication:
type: scram-sha-512
username: mirror-maker
passwordSecret:
secretName: mirror-maker
password: password
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
pattern: "*.crt"
- secretName: cluster-tls-managed
pattern: "ca.crt"
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
# === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
- alias: ccloud
bootstrapServers: {}.azure.confluent.cloud:9092
tls: {}
authentication:
type: plain
username: "{}"
passwordSecret:
secretName: ccloud-api
password: password
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
ssl.endpoint.identification.algorithm: "https"
mirrors:
- sourceCluster: src
targetCluster: ccloud
sourceConnector:
tasksMax: 1
config:
sync.topic.acls.enabled: "false"
sync.group.acls.enabled: "false"
sync.cluster.acls.enabled: "false"
sync.topic.configs.enabled: "false"
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
refresh.topics.interval.seconds: 60
heartbeatConnector:
tasksMax: 1
checkpointConnector:
tasksMax: 1
config:
emit.checkpoints.enabled: "true"
sync.group.offsets.enabled: "true"
topicsPattern: "postgres.*"
groupsPattern: ""
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: mirror-maker
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Full cluster-level operations
- resource:
type: cluster
name: kafka-cluster
patternType: literal
operation: All
# All operations on all topics
- resource:
type: topic
name: "*"
patternType: literal
operation: All
# Allow use of any consumer group
- resource:
type: group
name: "*"
patternType: literal
operation: All
# Access to all transactional IDs (for producers using transactions)
- resource:
type: transactionalId
name: "*"
patternType: literal
operation: All
---
apiVersion: v1
kind: Secret
metadata:
name: ccloud-api
type: Opaque
stringData:
password: ''
Task 2: Install MM2
Install MM2 and verify that it is running.
Apply the config, view the pod in the cluster. Verify by looking at the logs and also into the target cluster.
Hint 1
Apply the mm2 CR to you namespace.
View the pod logs.
Look for the data in Lenses Confluent Cloud Environment.
Hint 2
kubectl apply -n {your namespace} mm2.yml k9s -> pods -> mm2 -> log
Lenses -> Confluent Cloud Cluster -> View Your Topic for data
Hint 3
ask the trainer
Task 3: Add Monitoring
Add Monitoring config to the CR.
Use the metricsCOnfig with the jmxPrometheusExporter.
View the scaped metrics in grafana.
Use following config map for Monitoring.
Hint 1
Use:
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: mirror-maker-2-metrics
key: metrics-config.yml
Hint 2
Use the config map from the repo in https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/metrics/kafka-mirror-maker-2-metrics.yaml
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: mm2-to-ccloud
spec:
version: 4.0.0
replicas: 1
connectCluster: ccloud
clusters:
# === SOURCE: Strimzi (SASL/SCRAM) ===
- alias: src
bootstrapServers: kafka-cluster-kafka-bootstrap.strimzi:9093
authentication:
type: scram-sha-512
username: mirror-maker
passwordSecret:
secretName: mirror-maker
password: password
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
pattern: "*.crt"
- secretName: cluster-tls-managed
pattern: "ca.crt"
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
# === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
- alias: ccloud
bootstrapServers: {}.azure.confluent.cloud:9092
tls: {}
authentication:
type: plain
username: "{}"
passwordSecret:
secretName: ccloud-api
password: password
config:
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
ssl.endpoint.identification.algorithm: "https"
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: mirror-maker-2-metrics
key: metrics-config.yml
mirrors:
- sourceCluster: src
targetCluster: ccloud
sourceConnector:
tasksMax: 1
config:
sync.topic.acls.enabled: "false"
sync.group.acls.enabled: "false"
sync.cluster.acls.enabled: "false"
sync.topic.configs.enabled: "false"
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
refresh.topics.interval.seconds: 60
heartbeatConnector:
tasksMax: 1
checkpointConnector:
tasksMax: 1
config:
emit.checkpoints.enabled: "true"
sync.group.offsets.enabled: "true"
topicsPattern: "postgres.*"
groupsPattern: ""
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: mirror-maker
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Full cluster-level operations
- resource:
type: cluster
name: kafka-cluster
patternType: literal
operation: All
# All operations on all topics
- resource:
type: topic
name: "*"
patternType: literal
operation: All
# Allow use of any consumer group
- resource:
type: group
name: "*"
patternType: literal
operation: All
# Access to all transactional IDs (for producers using transactions)
- resource:
type: transactionalId
name: "*"
patternType: literal
operation: All
---
apiVersion: v1
kind: Secret
metadata:
name: ccloud-api
type: Opaque
stringData:
password: ''
---
kind: ConfigMap
apiVersion: v1
metadata:
name: mirror-maker-2-metrics
labels:
app: strimzi
data:
metrics-config.yml: |
# See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: UNTYPED
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: COUNTER
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: COUNTER
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: COUNTER
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: COUNTER
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE
#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: COUNTER
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE
#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE
#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+-total)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: COUNTER
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE
#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+-total)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: COUNTER
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE
#kafka.connect:type=MirrorSourceConnector
- pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=(.+)><>([a-z-_]+)
name: kafka_connect_mirror_mirrorsourceconnector_$4
labels:
target: "$1"
topic: "$2"
partition: "$3"
help: "Kafka Mirror Maker 2 Source Connector metrics"
type: GAUGE
#kafka.connect:type=MirrorCheckpointConnector
- pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+), group=(.+), topic=(.+), partition=(.+)><>([a-z-_]+)
name: kafka_connect_mirror_mirrorcheckpointconnector_$6
labels:
source: "$1"
target: "$2"
group: "$3"
topic: "$4"
partition: "$5"
help: "Kafka Mirror Maker 2 Checkpoint Connector metrics"
type: GAUGE