Skip to content

Lab: Setup Mirror Maker

Task 1: Create Mirror Maker base config

Setup the Mirror Maker CR config.

https://strimzi.io/docs/operators/0.29.0/configuring#proc-mirrormaker-replication-str

It should limit to 2Gi of RAM and 500m of CPU.

It should use 1 replica. The connectCluster is ccloud since it needs to be the target. For the source alias:

Use your strimzi cluster.

You need to setup tls and authentication. Therefore you also need a user.

For the target alias:

Use Confluent Cloud.

Access information will be provided by the trainer.

To configure tls, use: tls: {}

To configure authentication use type plain and the API Key as username and the API Secret as password. You may need to create a secret.

For the source connector: Use strimzi as source and confluent as target.

Use 1 Task.

Set following config: 
```yaml
config:
  sync.topic.acls.enabled: "false"
  sync.group.acls.enabled: "false"
  sync.cluster.acls.enabled: "false"
  sync.topic.configs.enabled: "false"
  replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
  refresh.topics.interval.seconds: 60
```

For the hearbeat connector:

Use 1 task.

For the checkpoint connector:

Use 1 task.

Set following config:
```yaml
config:
    emit.checkpoints.enabled: "true"
    sync.group.offsets.enabled: "true"
```

For the topics pattern, use: “postgres.*” for the groups pattern use groupsPattern: “”.

Hint 1

If you need a good starting point look into the examples directory of the strimzi repo. It contains example MIRROR-MAKER configs. Also look in the security folder of the examples. For the User: give the user access to all topics and consumer groups and the cluster and the transactional IDs. Use a k8s secret to store the api key secret.

Hint 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
    name: mm2-to-ccloud
spec:
    version: 4.0.0
replicas: 1
connectCluster: ccloud
clusters:
    # === SOURCE: Strimzi (SASL/SCRAM) ===
    - alias: src
      bootstrapServers: {TODO}
      authentication:
        type: {TODO}
        username: {TODO}
        passwordSecret:
            secretName: {TODO}
            password: {TODO}
      tls:
        trustedCertificates:
            - secretName: {TODO}
              pattern: {TODO}
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1

    # === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
    - alias: ccloud
      bootstrapServers: {given bootstrap}
      tls: {}
      authentication:
        type: plain
        username: {given api key}
        passwordSecret:
            secretName: ccloud-api
            password: password
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1
        ssl.endpoint.identification.algorithm: "https"

mirrors:
    - sourceCluster: src
      targetCluster: ccloud
      sourceConnector:
        tasksMax: {TODO}
        config:
        {TODO}
      heartbeatConnector:
        {TODO}
      checkpointConnector:
        {TODO}
      topicsPattern: ""
      groupsPattern: ""
---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
    name: mirror-maker
    labels:
        strimzi.io/cluster: kafka-cluster
    spec:
    authentication:
        type: scram-sha-512
    authorization:
        type: simple
        acls:
        # Full cluster-level operations
        - resource:
            type: cluster
            name: kafka-cluster
            patternType: literal
            operation: All

        # All operations on all topics
        - resource:
            type: topic
            name: "*"
            patternType: literal
            operation: All

        # Allow use of any consumer group
        - resource:
            type: group
            name: "*"
            patternType: literal
            operation: All

        # Access to all transactional IDs (for producers using transactions)
        - resource:
            type: transactionalId
            name: "*"
            patternType: literal
            operation: All
---
apiVersion: v1
kind: Secret
metadata:
    name: ccloud-api
type: Opaque
stringData:
    password: {TODO}
Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: mm2-to-ccloud
spec:
  version: 4.0.0
  replicas: 1
  connectCluster: ccloud
  clusters:
    # === SOURCE: Strimzi (SASL/SCRAM) ===
    - alias: src
      bootstrapServers: kafka-cluster-kafka-bootstrap.strimzi:9093
      authentication:
        type: scram-sha-512
        username: mirror-maker
        passwordSecret:
          secretName: mirror-maker
          password: password
      tls:
        trustedCertificates:
            - secretName: kafka-cluster-cluster-ca-cert
              pattern: "*.crt"
            - secretName: cluster-tls-managed
              pattern: "ca.crt"
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1

    # === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
    - alias: ccloud
      bootstrapServers: {}.azure.confluent.cloud:9092
      tls: {}
      authentication:
        type: plain
        username: "{}"
        passwordSecret:
          secretName: ccloud-api
          password: password
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1
        ssl.endpoint.identification.algorithm: "https"

  mirrors:
    - sourceCluster: src
      targetCluster: ccloud
      sourceConnector:
        tasksMax: 1
        config:
          sync.topic.acls.enabled: "false"
          sync.group.acls.enabled: "false"
          sync.cluster.acls.enabled: "false"
          sync.topic.configs.enabled: "false"
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
          refresh.topics.interval.seconds: 60
      heartbeatConnector:
        tasksMax: 1
      checkpointConnector:
        tasksMax: 1
        config:
          emit.checkpoints.enabled: "true"
          sync.group.offsets.enabled: "true"
      topicsPattern: "postgres.*"
      groupsPattern: ""
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: mirror-maker
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      # Full cluster-level operations
      - resource:
          type: cluster
          name: kafka-cluster
          patternType: literal
        operation: All

      # All operations on all topics
      - resource:
          type: topic
          name: "*"
          patternType: literal
        operation: All

      # Allow use of any consumer group
      - resource:
          type: group
          name: "*"
          patternType: literal
        operation: All

      # Access to all transactional IDs (for producers using transactions)
      - resource:
          type: transactionalId
          name: "*"
          patternType: literal
        operation: All
---
apiVersion: v1
kind: Secret
metadata:
  name: ccloud-api
type: Opaque
stringData:
  password: ''

Task 2: Install MM2

Install MM2 and verify that it is running.

Apply the config, view the pod in the cluster. Verify by looking at the logs and also into the target cluster.

Hint 1

Apply the mm2 CR to you namespace.

View the pod logs.

Look for the data in Lenses Confluent Cloud Environment.

Hint 2

kubectl apply -n {your namespace} mm2.yml k9s -> pods -> mm2 -> log

Lenses -> Confluent Cloud Cluster -> View Your Topic for data

Hint 3

ask the trainer

Task 3: Add Monitoring

Add Monitoring config to the CR.

Use the metricsCOnfig with the jmxPrometheusExporter.

View the scaped metrics in grafana.

Use following config map for Monitoring.

https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/metrics/kafka-mirror-maker-2-metrics.yaml

Hint 1

Use:

metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
    configMapKeyRef:
        name: mirror-maker-2-metrics
        key: metrics-config.yml
Hint 2

Use the config map from the repo in https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/metrics/kafka-mirror-maker-2-metrics.yaml

Hint 3
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: mm2-to-ccloud
spec:
  version: 4.0.0
  replicas: 1
  connectCluster: ccloud
  clusters:
    # === SOURCE: Strimzi (SASL/SCRAM) ===
    - alias: src
      bootstrapServers: kafka-cluster-kafka-bootstrap.strimzi:9093
      authentication:
        type: scram-sha-512
        username: mirror-maker
        passwordSecret:
          secretName: mirror-maker
          password: password
      tls:
        trustedCertificates:
            - secretName: kafka-cluster-cluster-ca-cert
              pattern: "*.crt"
            - secretName: cluster-tls-managed
              pattern: "ca.crt"
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1

    # === TARGET: Confluent Cloud (SASL_SSL + PLAIN) ===
    - alias: ccloud
      bootstrapServers: {}.azure.confluent.cloud:9092
      tls: {}
      authentication:
        type: plain
        username: "{}"
        passwordSecret:
          secretName: ccloud-api
          password: password
      config:
        config.storage.replication.factor: -1
        offset.storage.replication.factor: -1
        status.storage.replication.factor: -1
        ssl.endpoint.identification.algorithm: "https"
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: mirror-maker-2-metrics
        key: metrics-config.yml
  mirrors:
    - sourceCluster: src
      targetCluster: ccloud
      sourceConnector:
        tasksMax: 1
        config:
          sync.topic.acls.enabled: "false"
          sync.group.acls.enabled: "false"
          sync.cluster.acls.enabled: "false"
          sync.topic.configs.enabled: "false"
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
          refresh.topics.interval.seconds: 60
      heartbeatConnector:
        tasksMax: 1
      checkpointConnector:
        tasksMax: 1
        config:
          emit.checkpoints.enabled: "true"
          sync.group.offsets.enabled: "true"
      topicsPattern: "postgres.*"
      groupsPattern: ""
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: mirror-maker
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      # Full cluster-level operations
      - resource:
          type: cluster
          name: kafka-cluster
          patternType: literal
        operation: All

      # All operations on all topics
      - resource:
          type: topic
          name: "*"
          patternType: literal
        operation: All

      # Allow use of any consumer group
      - resource:
          type: group
          name: "*"
          patternType: literal
        operation: All

      # Access to all transactional IDs (for producers using transactions)
      - resource:
          type: transactionalId
          name: "*"
          patternType: literal
        operation: All
---
apiVersion: v1
kind: Secret
metadata:
  name: ccloud-api
type: Opaque
stringData:
  password: ''
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: mirror-maker-2-metrics
  labels:
    app: strimzi
data:
  metrics-config.yml: |
    # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    rules:
    #kafka.connect:type=app-info,client-id="{clientid}"
    #kafka.consumer:type=app-info,client-id="{clientid}"
    #kafka.producer:type=app-info,client-id="{clientid}"
    - pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
      name: kafka_$1_start_time_seconds
      labels:
        clientId: "$2"
      help: "Kafka $1 JMX metric start time seconds"
      type: GAUGE
      valueFactor: 0.001
    - pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
      name: kafka_$1_$3_info
      value: 1
      labels:
        clientId: "$2"
        $3: "$4"
      help: "Kafka $1 JMX metric info version and commit-id"
      type: UNTYPED

    #kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
    #kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total)
      name: kafka_$2_$6
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
      help: "Kafka $1 JMX metric type $2"
      type: COUNTER
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
      name: kafka_$2_$6
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
      help: "Kafka $1 JMX metric type $2"
      type: GAUGE

    #kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
    #kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total)
      name: kafka_$2_$5
      labels:
        clientId: "$3"
        topic: "$4"
      help: "Kafka $1 JMX metric type $2"
      type: COUNTER
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(compression-rate|.+-avg)
      name: kafka_$2_$5
      labels:
        clientId: "$3"
        topic: "$4"
      help: "Kafka $1 JMX metric type $2"
      type: GAUGE

    #kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
    #kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total)
      name: kafka_$2_$5
      labels:
        clientId: "$3"
        nodeId: "$4"
      help: "Kafka $1 JMX metric type $2"
      type: COUNTER
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-avg)
      name: kafka_$2_$5
      labels:
        clientId: "$3"
        nodeId: "$4"
      help: "Kafka $1 JMX metric type $2"
      type: GAUGE

    #kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
    #kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
    #kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
    #kafka.consumer:type=consumer-metrics,client-id="{clientid}"
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total)
      name: kafka_$2_$4
      labels:
        clientId: "$3"
      help: "Kafka $1 JMX metric type $2"
      type: COUNTER
    - pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
      name: kafka_$2_$4
      labels:
        clientId: "$3"
      help: "Kafka $1 JMX metric type $2"
      type: GAUGE

    #kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
    - pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
      name: kafka_connect_connector_status
      value: 1
      labels:
        connector: "$1"
        task: "$2"
        status: "$3"
      help: "Kafka Connect JMX Connector status"
      type: GAUGE

    #kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
    #kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
    #kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
    #kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
    - pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total)
      name: kafka_connect_$1_$4
      labels:
        connector: "$2"
        task: "$3"
      help: "Kafka Connect JMX metric type $1"
      type: COUNTER
    - pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
      name: kafka_connect_$1_$4
      labels:
        connector: "$2"
        task: "$3"
      help: "Kafka Connect JMX metric type $1"
      type: GAUGE

    #kafka.connect:type=connector-metrics,connector="{connector}"
    #kafka.connect:type=connect-worker-metrics,connector="{connector}"
    - pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
      name: kafka_connect_worker_$2
      labels:
        connector: "$1"
      help: "Kafka Connect JMX metric $1"
      type: GAUGE

    #kafka.connect:type=connect-worker-metrics
    - pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+-total)
      name: kafka_connect_worker_$1
      help: "Kafka Connect JMX metric worker"
      type: COUNTER
    - pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
      name: kafka_connect_worker_$1
      help: "Kafka Connect JMX metric worker"
      type: GAUGE

    #kafka.connect:type=connect-worker-rebalance-metrics
    - pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+-total)
      name: kafka_connect_worker_rebalance_$1
      help: "Kafka Connect JMX metric rebalance information"
      type: COUNTER
    - pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
      name: kafka_connect_worker_rebalance_$1
      help: "Kafka Connect JMX metric rebalance information"
      type: GAUGE

    #kafka.connect:type=MirrorSourceConnector
    - pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=(.+)><>([a-z-_]+)
      name: kafka_connect_mirror_mirrorsourceconnector_$4
      labels:
        target: "$1"
        topic: "$2"
        partition: "$3"
      help: "Kafka Mirror Maker 2 Source Connector metrics"
      type: GAUGE

    #kafka.connect:type=MirrorCheckpointConnector
    - pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+), group=(.+), topic=(.+), partition=(.+)><>([a-z-_]+)
      name: kafka_connect_mirror_mirrorcheckpointconnector_$6
      labels:
        source: "$1"
        target: "$2"
        group: "$3"
        topic: "$4"
        partition: "$5"
      help: "Kafka Mirror Maker 2 Checkpoint Connector metrics"
      type: GAUGE