Kafka 消息队列服务安装

2020-05-19 0 By admin

Kafka 服务依赖 JDK 环境和 Zookeeper 服务,所以在安装 Kafka 之前,需要将依赖环境配置完成。

一、直接安装在宿主机上

1.1、软件包下载地址

http://kafka.apache.org/downloads
下载合适版本的软件包,并解压到相应的目录中。

1.2、Kafka目录介绍

  1. /bin 操作kafka的可执行脚本,还包含windows下脚本
  2. /config 配置文件所在目录
  3. /libs 依赖库目录
  4. /logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controlle

4.3、配置文件

Kafka 的配置文件在 conf 目录下;
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect。
kafka server 服务端的配置文件 config/server.properties 参数说明:

Kafka 消息队列配置项说明

4.4、启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties
Kafka 默认端口为:9092

二、Docker 容器安装 Kafka

使用的镜像仓库:wurstmeister/kafka。

2.1、强制设置的几个环境变量

  1. KAFKA_ZOOKEEPER_CONNECT # Zookeeper 的链接地址
  2. KAFKA_ADVERTISED_LISTENERS #使用的主机名
  3. KAFKA_LISTENERS #监听端口设置

新的版本中,部分强制变量已经可以省略。
环境变量补充

  1. KAFKA_BROKER_ID broker 的ID
  2. KAFKA_CREATE_TOPICS 自动创建的topic

2.2、启动命令

docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0
-e KAFKA_ZOOKEEPER_CONNECT=12.168.3.62:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://12.168.3.62:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d wurstmeister/kafka:2.12-2.1.0

2.3、创建一个主题

#docker exec -it kafka01 /bin/bash
#/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.112.150:2181 --replication-factor 1 --partitions 1 --topic my_log_kafka

2.4、发布的主机名(KAFKA_ADVERTISED_HOST_NAME)

您可以通过其他方式配置发布的主机名
1、明确地,使用 KAFKA_ADVERTISED_HOST_NAME
2、通过命令,使用HOSTNAME_COMMAND,例如HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
使用命令时,请确保查看 https://docs.docker.com/compose/compose-file/ 中的“变量替换”部分
3、如果 KAFKA_ADVERTISED_HOST_NAME 指定,则优先于 HOSTNAME_COMMAND

2.5、将HOSTNAME_COMMAND注入配置

如果您需要 HOSTNAME_COMMAND 任何其他 KAFKA_XXX 变量_{HOSTNAME_COMMAND} 中的值,请在变量值中使用字符串,即KAFKA_ADVERTISED_LISTENERS=SSL://_{HOSTNAME_COMMAND}:9093,PLAINTEXT://9092
advertised.listeners 发布到 zookeeper 供客户端使用的监听器,可以简单理解为kafka提供的外部访问地址。

三、Kubernetes 容器编排工具配置

在配置 Kafka 的容器的过程中,遇到一些问题:
1、Kafka 应用启动需要设置对外暴露的链接地址(IP:port),kafka 启动会链接这个地址,进行检测。
2、默认情况下,Kubernetes 中的POD 并不能通过其 service 来链接访问自己。
3、根据上面的情况,就只能设置 POD 对外暴漏的地址为 podIP 地址加端口号了。
4、通过 status.podIP,可以获取到 Pod 启动后的IP的地址。

        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: KAFKA_LOG_DIRS
            value: /kafka/my-kafka
          - name: KAFKA_LISTENERS
            value: 'PLAINTEXT://$(MY_POD_IP):9092'
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: zookeeper-server:2181
          - name: KAFKA_BROKER_ID
            value: "1"

设置共享存储时,需要设置日志的目录地址,默认会使用 POD 的名称,这样每次 POD 重建日志目录都会改变。