Apache Kafka
Предварительные требования:
- 1. Не меньше 4GB RAM на сервере. Инсталляция с меньшим количеством ОЗУ может закончиться с ошибкой "out of memory" от JVM при старте.
- 2. На сервере должен быть установлен OpenJDK не менее 8 версии.
Шаг 1. Создание пользователя
Так как сервер Kafka обрабатывает запросы, получаемые из сети Интернет, необходимо создать специального пользователя. Это сведёт к минимуму ущерб, нанесённый серверу Kafka, в случае компроментации. Мы создадим отдельного пользователя с именем Kafka. Конечно кроме него на сервере неплохо бы иметь и другого пользователя с правами sudo, для выполнения других задач администрирования, не относящихся к обслуживанию сервера Kafka. Установим sudo:
# apt-get install sudo
Разрешим использовать sudo только пользователяем из группы wheel
# control sudowheel enable
Создаём пользователя kafka с домашней директорией:
# useradd kafka -m
Задаём пароль:
# passwd kafka
Добавим пользователя kafka в группу wheel:
# usermod -aG wheel kafka
Ну и собственно логинимся под свежесозданным пользователем:
# su -l kafka
Шаг 2. Начало установки Kafka
В первую очередь установим java:
$ sudo apt-get install java-1.8.0-openjdk
Создадим директорию для скачивания исходников
$ mkdir ~/Downloads && cd ~/Downloads
Скачиваем актуальную версию кафки:
$ wget -O kafka.tgz https://www.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
Создаём каталог с именем kafka, и сразу переходим в него:
$ mkdir ~/kafka && cd ~/kafka
После чего распаковываем в него скачанный архив с Apache Kafka
$ tar -xvzf ~/Downloads/kafka.tgz --strip 1
Шаг 3. Конфигурирование сервера Kafka
По умолчанию сервер Kafka не позволяет нам удалить тему, категории, группы, или имя канала в котором могут публиковаться сообщения. Для исправления такого поведения предлагается отредактировать конфигурационный файл server.properties. Необходимо добавить в конец этого файла настройки, которые позволят удалить темы Kafka:
$ echo "delete.topic.enable = true" >> ~/kafka/config/server.properties
Шаг 4. Создаём systemd unit файл, и запускаем сервер Kafka
На этом шаге необходимо создать файлы модулей sytemd для служб Kafka.
Создание сервиса Zookeeper Поскольку для управления состоянием кластера и конфигурациями Kafka использует службу Zookeeper, то сначала необходимо сконфигурировать сервис systemd для этой службы. Для детального изучения zookeeper лучше посетить сайт с официальной документацией - https://zookeeper.apache.org/doc/current/index.html
$ vim /etc/systemd/system/zookeeper.service
Наполняем следующим содержимым:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
В секции [Service] определено использование скриптов start и stop для сервиса zookeeper, а так же возможность автоматического рестарта при возникновении аномального поведения Сохраняем файл и выходим из vim
Создание сервиса Kafka
$ sudo vim /etc/systemd/system/kafka.service
С следующим содержимым:
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
В секции [Unit] определяем зависимость от сервиса zookeeper. В секции [Service] определяем использование скриптов start и stop, и добавляем возможность автоматического рестарта при возникновении аномального поведения
Сохраняемся и выходим.
Запуск сервиса Kafka
$ sudo systemctl start kafka
Проверяем логи старта
$ journalct -u kafka
Если всё успешно, вывод должен быть без ошибок, и последним сообщением должно значится что-то в духе:
Jul 26 14:08:59 kafka-centos systemd[1]: Started kafka.service.
Теперь kafka готова к работе и прослушивает порт 9092, проверить это можно с помощью ss:
$ netstat -tulpn | grep LISTEN
Включим запуск сервиса kafka по умолчанию:
$ sudo systemctl enable kafka
Шаг 5. Тестирование инсталляции
Попробуйте отправить на сервер Kafka сообщение "Hello World", чтобы убедиться в корректном поведении сервера Kafka: Публикация сообщений в kafka требует двух компонентов: 1. Producer - позволяет публиковать записи и данные по темам 2. Consumer - читает сообщения и данные по темам. Создадим первый тестовый topic с названием TutorialTopic
$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
В случае успешного выполнения команды увидим надпись:
Created topic "TutorialTopic".
Далее вы можете создать producer из командной строки с помощью скрипта kafka-console-producer.sh он будет ожидать в качестве аргументов имя хоста, порт и название темы. Опубликуем строку Hello, World в тему TutorialTopic, набрав:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Далее создаем consumer, используя для этого скрипт kafka-console-consumer.sh . Он так же в качестве аргументов будет ожидать имя хоста, порт и название темы. Следующая команда прочтёт сообщение из TutorialTopic:
- обратите внимание на флаг --from-beginning, который позволяет прочесть сообщения опубликованные раньше, чем
consumer был запущен.
$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
Если всё работает нормально, команда выведет в консоли сообщение:
Hello, World
и будет дальше продолжать работать, пока не будет остановлена принудительно через ctrl^c
На этом основная часть установки сервера Kafka заканчивается. Для повышения удобства администрирования вы можете установить инструмент KafkaT, который позволит упростить выполнение ряда административных задач из командной строки.