k8s быстрое развертывание Kafka 3.3.1
k8s быстрое развертывание Kafka 3.3.1

В повседневной разработке и тестировании хорошим выбором будет использование docker или k8s для быстрого развертывания компонента. Kafka 3.3.1 — первая доступная рабочая версия Kraft. В этой статье рассказывается об использовании k8s для быстрого развертывания Kafka 3.3.1, работающего на Kraft.

Создание образа Кафки

Сначала нам нужно собрать образ Kafka 3.3.1. Проще говоря, нам нужен только файл конфигурации, скрипт запуска и файл Dockerfile.

  • server.properties
  • start-kafka.sh
  • Dockerfile

Чтобы получить доступ к kafka в k8s из внешней сетевой среды, файл конфигурации server.properties необходимо изменить следующим образом:

Язык кода:javascript
копировать
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present.  See config/kraft/README.md for details.
#

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@kafka:9093

############################# Socket Server Settings #############################

# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners = INTERNAL://kafka:9092,EXTERNAL://kafka:30092,CONTROLLER://kafka:9093
# advertised.listeners = INTERNAL://kafka:9092,EXTERNAL://kafka:30092,CONTROLLER://kafka:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=INTERNAL

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
# advertised.listeners=PLAINTEXT://localhost:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

Содержимое сценария запуска start-kafka.sh:

Язык кода:javascript
копировать
#!/bin/sh

/data/kafka/bin/kafka-storage.sh format \
                    --config /data/kafka/config/kraft/server.properties \
                    --cluster-id $(/data/kafka/bin/kafka-storage.sh random-uuid)

/data/kafka/bin/kafka-server-start.sh /data/kafka/config/kraft/server.properties

Содержимое Dockerfile:

Язык кода:javascript
копировать
FROM centos:centos7.9.2009

WORKDIR /data

COPY start-kafka.sh /data
COPY server.properties /data

ARG ZK_VERSION=3.8.0
ARG KAFKA_SCALA_VERSION=2.12
ARG KAFKA_VERSION=3.3.1

EXPOSE 30092

RUN yum update -y
RUN yum install wget java-1.8.0-openjdk-devel java-1.8.0-openjdk -y

RUN wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz

RUN tar zxvf kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz

RUN ln -s kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION} kafka

RUN rm -rf /data/kafka/config/kraft/server.properties

RUN cp /data/server.properties /data/kafka/config/kraft

RUN mkdir /data/kafka/kafka-logs

RUN chmod a+x /data/start-kafka.sh

CMD ["sh", "/data/start-kafka.sh"]

Сохраните указанные выше файлы в каталоге и затем выполните сборку.

Язык кода:javascript
копировать
docker build . --build-arg KAFKA_VERSION=3.3.1 --build-arg ZK_VERSION=3.8.0 --tag xiaozhongcheng2022/kafka:3.3.1 --no-cache=true

Вы можете получить зеркало:

K8s развертывают Kafka

На основе этого образа мы можем построить развертывание Kafka.

Язык кода:javascript
копировать
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka
  name: kafka
spec:
  replicas: 1
  revisionHistoryLimit: 2
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: kafka
      containers:
        - env:
            - name: KUBERNETES_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          image: xiaozhongcheng2022/kafka:3.3.1
          imagePullPolicy: IfNotPresent
          name: kafka
          ports:
            - containerPort: 30092
              name: kafka-port
              protocol: TCP
          securityContext:
            privileged: false

Сохраните приведенный выше файл конфигурации как: kafka-deployment.yaml, запустите kafka.

Язык кода:javascript
копировать
kubectl apply -f kafka-deployment.yaml

получать:

Просмотр журналов Кафки:

Разверните службу kafka и откройте службу kafka через NodePort.

Язык кода:javascript
копировать
---
apiVersion: v1
kind: Service
metadata:
  labels:
    expose: "true"
    app: kafka
  name: kafka
spec:
  type: NodePort
  ports:
    - name: kafka-port
      port: 30092
      protocol: TCP
      nodePort: 30092
  selector:
    app: kafka

Сохраните приведенный выше файл как kafka-service.yaml.

Язык кода:javascript
копировать
kubectl apply -f kafka-service.yaml

Java-клиент Kafka подключается к Kafka

Поскольку при развертывании kafka мы устанавливаем имя хоста kafka, а также устанавливаем имя хоста kafka в файле конфигурации, когда мы используем клиент для подключения, нам необходимо настроить локальные хосты и настроить kafka в качестве IP-адреса узла k8s.

Язык кода:javascript
копировать
package com.zh.ch.bigdata.examples.kafka;

import com.zh.ch.bigdata.examples.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.Properties;

public class KafkaClientExample<K, V> {

    private Producer<K, V> getProducer() throws IOException {
        Properties kafkaConfig = PropertiesUtil.load("kafka/src/main/resources/kafkaConfig.properties");
        return new KafkaProducer<>(kafkaConfig);
    }

    public static void main(String[] args) throws IOException {
        KafkaClientExample<String, String> kafkaClientExample = new KafkaClientExample<>();
        Producer<String, String> producer = kafkaClientExample.getProducer();
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }

}

kafkaConfig.properties

Язык кода:javascript
копировать
bootstrap.servers        =192.168.1.15:30092
linger.ms                =1
key.serializer           =org.apache.kafka.common.serialization.StringSerializer
value.serializer         =org.apache.kafka.common.serialization.StringSerializer

Связанные журналы:

Язык кода:javascript
копировать
[2022-11-24 14:33:14,002] INFO [Producer clientId=producer-1] Instantiated an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer)
[2022-11-24 14:33:14,771] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-24 14:33:14,771] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-24 14:33:14,771] INFO Kafka startTimeMs: 1669271594768 (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-24 14:33:15,069] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2022-11-24 14:33:15,070] INFO [Producer clientId=producer-1] Cluster ID: EBIPehHLSZOQyWQ8el4N3g (org.apache.kafka.clients.Metadata)
[2022-11-24 14:33:15,129] INFO [Producer clientId=producer-1] ProducerId set to 0 with epoch 0 (org.apache.kafka.clients.producer.internals.TransactionManager)
[2022-11-24 14:33:15,199] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to UnwEqo7bRN-5vOb0rkA2Cw (org.apache.kafka.clients.Metadata)
[2022-11-24 14:33:15,217] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2022-11-24 14:33:15,320] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-11-24 14:33:15,320] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-11-24 14:33:15,320] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-11-24 14:33:15,320] INFO App info kafka.producer for producer-1 unregistered (org.apache.kafka.common.utils.AppInfoParser)

Проверьте журнал Kafka еще раз, и вы увидите, что моя тема была успешно создана.

сохранение данных Кафки

Если вам нужно сохранить данные Kafka, вы можете использовать nfs для сохранения. Используйте развертывание Helm, если служба nfs уже существует.

Язык кода:javascript
копировать
helm repo add nfs-subdir-external-provisioner https://kubernetes-sigs.github.io/nfs-subdir-external-provisioner/

helm install nfs-subdir-external-provisioner nfs-subdir-external-provisioner/nfs-subdir-external-provisioner \
    --set nfs.server=192.168.49.1 \
    --set nfs.path=/data1/nfs/rootfs

Вы можете получить StorageClass с именем nfs-client.

Создайте pvc с помощью nfs-клиента.

Язык кода:javascript
копировать
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: kafka-pvc
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 50Gi

Сохраните приведенный выше файл конфигурации как kafka-pvc.yaml.

Язык кода:javascript
копировать
kubectl apply -f kafka-pvc.yaml

изменитьkafka-deployment.yaml

Язык кода:javascript
копировать
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka
  name: kafka
spec:
  replicas: 1
  revisionHistoryLimit: 2
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: kafka
      containers:
        - env:
            - name: KUBERNETES_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          image: xiaozhongcheng2022/kafka:3.3.1
          imagePullPolicy: IfNotPresent
          name: kafka
          ports:
            - containerPort: 30092
              name: kafka-port
              protocol: TCP
          securityContext:
            privileged: false
          volumeMounts:
            - mountPath: /data/kafka/kafka-logs
              name: kafka-pvc
      volumes:
        - name: kafka-pvc
          persistentVolumeClaim:
            claimName: kafka-pvc

Просмотрите путь nfs, чтобы увидеть каталог постоянного хранения.

Подвести итог

k8s — хороший выбор для быстрого развертывания приложений для тестирования. В этой статье описывается, как создать с самого начала. образа Кафки и быстрое развертывание на к8с.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose