Usage
If you are not installing the operator using Helm then after installation the CRD for this operator must be created:
kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml
To create an Apache Kafka cluster named simple-kafka assuming that you already have a Zookeeper cluster named simple-zk:
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
  name: simple-kafka-znode
spec:
  clusterRef:
    name: simple-zk
    namespace: default
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 1If you wish to include integration with Open Policy Agent and already have an OPA cluster, then you can include an opa field pointing to the OPA cluster discovery ConfigMap and the required package. The package is optional and will default to the metadata.name field:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authorization:
      opa:
        configMapName: simple-opa
        package: kafka
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 1You can change some opa cache properties by overriding:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authorization:
      opa:
        configMapName: simple-opa
        package: kafka
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    configOverrides:
      server.properties:
        opa.authorizer.cache.initial.capacity: "100"
        opa.authorizer.cache.maximum.size: "100"
        opa.authorizer.cache.expire.after.seconds: "10"
    roleGroups:
      default:
        replicas: 1A full list of settings and their respective defaults can be found here.
Monitoring
The managed Kafka instances are automatically configured to export Prometheus metrics. See Monitoring for more details.
Provide log4j.properties
Per default, the log4j.properties from the kafka package is used. However, you can provide your own log4j.properties via the custom resource:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
    log4j: |-
      log4j.rootLogger=INFO, stdout, kafkaAppender
      log4j.appender.stdout=org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
      log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
      log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
      log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
      log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
      log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  brokers:
    roleGroups:
      default:
        replicas: 3Encryption
The internal and client communication can be encrypted TLS. This requires the Secret Operator to be present in order to provide certificates. The utilized certificates can be changed in a top-level config.
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    zookeeperConfigMapName: simple-kafka-znode
    tls:
      serverSecretClass: tls (1)
      internalSecretClass: kafka-internal-tls (2)
  brokers:
    roleGroups:
      default:
        replicas: 3| 1 | The spec.clusterConfig.tls.serverSecretClassrefers to the client-to-server encryption. Defaults to thetlssecret. Can be deactivated by settingserverSecretClasstonull. | 
| 2 | The spec.clusterConfig.tls.internalSecretClassrefers to the broker-to-broker internal encryption. This must be explicitly set or defaults totls. May be disabled by settinginternalSecretClasstonull. | 
The tls secret is deployed from the Secret Operator and looks like this:
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: tls
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-ca
          namespace: default
        autoGenerate: trueYou can create your own secrets and reference them e.g. in the spec.clusterConfig.tls.serverSecretClass or spec.clusterConfig.tls.internalSecretClass to use different certificates.
Authentication
The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an AuthenticationClass reference in the custom resource provided by the Commons Operator.
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
  name: kafka-client-tls (2)
spec:
  provider:
    tls:
      clientCertSecretClass: kafka-client-auth-secret (3)
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: kafka-client-auth-secret (4)
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-kafka-client-ca
          namespace: default
        autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  image:
    productVersion: 3.3.1
    stackableVersion: 0.3.0
  clusterConfig:
    authentication:
      - authenticationClass: kafka-client-tls (1)
    zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 3| 1 | The clusterConfig.authentication.authenticationClasscan be set to use TLS for authentication. This is optional. | 
| 2 | The referenced AuthenticationClassthat references aSecretClassto provide certificates. | 
| 3 | The reference to a SecretClass. | 
| 4 | The SecretClassthat is referenced by theAuthenticationClassin order to provide certificates. | 
Configuration & Environment Overrides
The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).
| Overriding certain properties which are set by operator (such as the ports) can interfere with the operator and can lead to problems. | 
Configuration Properties
For a role or role group, at the same level of config, you can specify: configOverrides for the server.properties. For example, if you want to set the auto.create.topics.enable to disable automatic topic creation, it can be configured in the KafkaCluster resource like so:
brokers:
  roleGroups:
    default:
      configOverrides:
        server.properties:
          auto.create.topics.enable: "false"
      replicas: 1Just as for the config, it is possible to specify this at role level as well:
brokers:
  configOverrides:
    server.properties:
      auto.create.topics.enable: "false"
  roleGroups:
    default:
      replicas: 1All override property values must be strings.
For a full list of configuration options we refer to the Apache Kafka Configuration Reference.
Environment Variables
In a similar fashion, environment variables can be (over)written. For example per role group:
servers:
  roleGroups:
    default:
      envOverrides:
        MY_ENV_VAR: "MY_VALUE"
      replicas: 1or per role:
servers:
  envOverrides:
    MY_ENV_VAR: "MY_VALUE"
  roleGroups:
    default:
      replicas: 1Storage for data volumes
You can mount volumes where data is stored by specifying PersistentVolumeClaims for each individual role group:
brokers:
  roleGroups:
    default:
      config:
        resources:
          storage:
            data:
              capacity: 2GiIn the above example, all Kafka brokers in the default group will store data (the location of the property log.dirs) on a 2Gi volume.
By default, in case nothing is configured in the custom resource for a certain role group, each Pod will have a 1Gi large local volume mount for the data location.
Resource Requests
Stackable operators handle resource requests in a sligtly different manner than Kubernetes. Resource requests are defined on role or group level. See Roles and role groups for details on these concepts. On a role level this means that e.g. all workers will use the same resource requests and limits. This can be further specified on role group level (which takes priority to the role level) to apply different resources.
This is an example on how to specify CPU and memory resources using the Stackable Custom Resources:
---
apiVersion: example.stackable.tech/v1alpha1
kind: ExampleCluster
metadata:
  name: example
spec:
  workers: # role-level
    config:
      resources:
        cpu:
          min: 300m
          max: 600m
        memory:
          limit: 3Gi
    roleGroups: # role-group-level
      resources-from-role: # role-group 1
        replicas: 1
      resources-from-role-group: # role-group 2
        replicas: 1
        config:
          resources:
            cpu:
              min: 400m
              max: 800m
            memory:
              limit: 4GiIn this case, the role group resources-from-role will inherit the resources specified on the role level. Resulting in a maximum of 3Gi memory and 600m CPU resources.
The role group resources-from-role-group has maximum of 4Gi memory and 800m CPU resources (which overrides the role CPU resources).
| For Java products the actual used Heap memory is lower than the specified memory limit due to other processes in the Container requiring memory to run as well. Currently, 80% of the specified memory limits is passed to the JVM. | 
For memory only a limit can be specified, which will be set as memory request and limit in the Container. This is to always guarantee a Container the full amount memory during Kubernetes scheduling.
If no resource requests are configured explicitly, the Kafka operator uses the following defaults:
brokers:
  roleGroups:
    default:
      config:
        resources:
          memory:
            limit: '2Gi'
          cpu:
            min: '500m'
            max: '4'
          storage:
            log_dirs:
              capacity: 1Gi