前提: 对应的 Scala 版本已安装, ZooKeeper 已启动.
Kafka 安装
解压压缩文件到 ~/workspace
下
修改环境变量
|
|
配置生效:1source ~/.bashrc
修改 Kafka 配置
~/workspace/kafka_2.11-0.8.2.2/config
123456789$ vim server.properties #添加配置内容broker.id=1 # 对应集群中唯一数字port=9092host.name=h16 # hostnameadvertised.host.name=h16log.dirs=/home/data/kafka/kafka-logsnum.partitions=2zookeeper.connect=h16:2181,h17:2181,h18:2181
Kafka 启动
在 h16 上启动 Kafka12bin/kafka-server-start.sh config/server.properties (正常启动)bin/kafka-server-start.sh -daemon config/server.properties (后台启动)
验证启动情况, 在服务器上查看:123jps # h1611013 Kafka561 Jps
运行示例
创建topic:
1bin/kafka-topics.sh --create --zookeeper h16::2181 --replication-factor 3 --partitions 1 --topic mykafka查看Topic:
1bin/kafka-topics.sh --list --zookeeper h16::2181查看详细信息:
1234bin/kafktopics.sh --describe --zookeeper h16:2181Topic:mykafka PartitionCount:1 ReplicationFactor:3 Configs:Topic: mykafka Partition: 0 Leader: 133 Replicas: 133,134,132 Isr: 134发送消息:
1234bin/kafka-console-producer.sh --broker-list 192.168.40.134:9092 --topic mykafka23423bin/kafka-console-producer.sh --brokelist 192.168.40.134:9092 --topic mykafka4533如果出现以下信息
123SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.则需要下载slftj-nop-1.5.jar, 并将其cp至kafka的libs目录下。
接收消息(注意–zookeeper配置与server.properties保持一致, 使用域名方式):
1bin/kafka-console-consumer.sh --zookeeper h16:2181 --topic mykafka --from-beginning更改topic过期时间,alter只对Topic-level级别的配置有效参考
123bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic withdata --config retention.ms=86400000 (一天)bin/kafka-topics.sh --zookeeper localhost:2181 --describe -topic withdatabin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic withdata --deleteConfig retention.ms