原创

kafka基础

kafka基础

一、kafka概念

1.什么是kafka

Kafka是一个开源的分布式流处理平台和消息系统,最初由LinkedIn开发并开源。它设计用于处理大规模的实时数据流,并具有高吞吐量、低延迟和高可靠性的特点。

优点:吞吐量高,性能好,伸缩性好,支持在线水平扩展。具有容错性和可靠性,与大数据生态紧密融合。

kafka的消息存在topic中,主题包含多个分区。

常用的消息队列:

Apache Kafka高性能的分布式消息系统和流处理平台,Kafka广泛应用于大规模数据流处理和实时数据分析场景。
RabbitMQ开源的消息队列系统,支持多种消息协议,适用于构建各种消息通信场景。
Apache ActiveMQ流行的开源消息中间件,支持JMS标准,提供了丰富的功能和可靠性。
Amazon SQS亚马逊提供的托管消息队列服务,适用于构建分布式系统和云原生应用。
2.消息队列应用场景
1.异步通信:消息队列可以用于实现系统之间的异步通信,发送方将消息发送到队列中,接收方可以在合适的时间接收并处理这些消息,从而实现解耦和提高系统的可伸缩性。
2.应用解耦:通过引入消息队列,不同的应用程序可以通过消息传递进行通信,而无需直接调用对方的接口,从而实现应用之间的解耦合,降低耦合度。
3.流量削峰:消息队列可以用于平滑处理系统的流量波动。当系统面临高峰期时,消息可以被缓冲在队列中,然后按照系统的处理能力逐步处理,避免系统因突发流量而崩溃。
4.日志处理:将系统产生的日志消息发送到消息队列中,可以实现日志的集中管理和处理。消费者可以从队列中获取日志消息并进行存储、分析或监控。
5.任务调度:将需要执行的任务封装成消息,发送到消息队列中,消费者可以按照一定的规则获取并执行这些任务,实现任务的异步执行和调度。
6.分布式系统协调:在分布式系统中,消息队列可以用于实现不同节点之间的协作和协调,例如实现分布式事务、事件驱动架构等。
3.kafka两种模式

消息队列通常有两种主要的模式:点对点(Point-to-Point)模式和发布/订阅(Publish/Subscribe)模式。

1.点对点(Point-to-Point)模式: 在点对点模式下,消息生产者将消息发送到队列中,而消息消费者从队列中接收消息。每条消息只能被一个消费者接收,即一条消息只有一个消费者消费。如果有多个消费者监听同一个队列,每条消息只会被其中一个消费者接收。这种模式适合于需要确保消息被处理一次且仅一次的场景。
2.发布/订阅(Publish/Subscribe)模式: 在发布/订阅模式下,消息生产者将消息发布到主题(Topic)中,而消息订阅者(消费者)可以订阅感兴趣的主题并接收相关消息。每条消息可以被多个订阅者同时接收,即一条消息可以被多个消费者消费。这种模式适合于需要广播消息给多个消费者的场景。
4.kafka架构

Kafka的架构包括以下几个关键组件:

1.Producer:消息生产者,负责将消息发布到Kafka的主题(Topic)中。
2.Broker:Kafka集群中的每个节点称为Broker,负责存储消息数据、处理生产者和消费者的请求,并进行消息的复制和分发。
3.Topic:消息主题,是消息的逻辑分类单位,生产者将消息发布到特定的主题中,消费者可以订阅感兴趣的主题来接收消息。
4.Partition:每个主题可以分为多个分区(Partition),每个分区在不同的Broker上进行存储和复制,实现数据的分布式存储和负载均衡。
5.Replica:每个分区可以有多个副本(Replica),副本分布在不同的Broker上,用于实现数据的冗余备份和容错性。
6.Consumer:消息消费者,订阅主题并从分区中拉取消息进行处理。
7.Consumer Group:消费者组,多个消费者可以组成一个消费者组,每个分区只能被消费者组中的一个消费者消费,实现消息的负载均衡和并行处理。
8.ZooKeeper:Kafka集群依赖ZooKeeper来进行协调和管理,包括Broker的注册、主题的分配、分区的领导者选举等功能。

二、kafka部署

1.环境准备

准备3台机器用于部署kafka,一般kafka集群至少需要3台机器

kafka1192.168.xx.xx
kafka2192.168.xx.xx
kafka3192.168.xx.xx
2.下载kafka
#官网地址:https://kafka.apache.org/downloads.html
wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
#解压
tar -zxvf kafka_2.12-2.3.1.tgz -C /usr/local/kafka
3.配置zookeeper集群

在搭建kafka集群之前需要先安装zookeeper,kafka已经自带zookeeper只需要解压安装即可。

vim /usr/local/kafka/config/zookeeper.properties
dataDir=/usr/local/kafka/data/zookeeper	#数据目录
clientPort=2181	#端口
tickTime=2000	#维护心跳间隔时间
initLimit=20	#主从之间心跳数量(连接)
syncLimit=10	#主从之间心跳数量(请求)
server.1=ip:2888:3888
server.2=ip:2888:3888
server.3=ip:2888:3888
maxClientCnxns=0	#如果不设置0,则每个ip连接zookeeper时的连接数没有限制
#准备zookeeper需要的目录
mkdir -p /usr/local/kafka/data/zookeeper
#创建myid文件,没有这个没法启动,另外两台分别是3,4
echo 2 > /usr/local/kafka/data/zookeeper/myid
4.kafka配置
vim /usr/local/kafka/config/server.properties
broker.id=0	#此id每一台不能相同,另外两台分别为1,2
prot=9092	#端口
host.name=ip	#主机名或者ip
log.dir=/usr/local/kafka/data/kafka-logs	#日志位置
zookeeper.connect=ip:2181,ip:2181,ip:2181	#集群zookeeper的机器
num.partitions=16	#信息分区
log.dirs=/usr/local/kafka/data/kafka-logs	#数据目录
log.retention.hours=168	#按需求保留过期时间
5.启动
#先启动zookeeper再启动kafka
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
#检查zookeeper端口
netstat -anp|grep -E "2181|2888|3888"
#启动kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

三、kafka常用命令

1.查询
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper ip:2181
#查看具体信息
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper ip:2181 --topic test
2.创建
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper ip:2181  --replication-factor 2 --partitions 2 --topic test
选项解释:
		--replication-factor	#定义副本数
		--partitions	#定义分区数
		--topic	#定义topic名

3.删除
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper ip:2181 --delete --topic test

4.修改分区数
/usr/local/kafka/bin/kafka-topics.sh --zookeeper ip:2181 --alter --topic test --partitions 2

5.控制台生产者
/usr/local/kafka/bin/kafka-console-producer.sh --topic test --broker-list ip:9092

6.控制台消费者
/usr/local/kafka/bin/kafka-console-consumer.sh --topic test --bootstrap-seer ip:9092 --from-beginning

四、案列

本案列使用elk来做

vim /etc/logstash/conf.d/kafka.conf
input{
file{
path=>"/var/log/messages"
type=>"system"
start_position=>"beginning"
}
}
output{
kafka{
bootstrap_servers=>"192.168.92.129:9092,192.168.92.130:9092,192.168.92.131:9092"
topic_id=>"system-messages"
compression_type=>"snappy"
}
}
#启用
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf --configtest --verbose

在kafka上安装logstash将kafka收集的信息传给es

vim /etc/logstash/conf.d/kafka-ec.conf
input{
kafka{
zk_connect=>"192.168.92.129:2181,192.168.92.130:2181,192.168.92.131:2181"
topic_id=>"system-messages"
codec=>plain
reset_beginning=>false
consumer_threads=>5
decorate_events=>true
}
}
output{
elasticsearch{
hosts=>["192.168.92.129:9200","192.168.92.130:9200"]
index=>"test-system-messages-%{+YYYY.MM.dd}"
}
}
#启用
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-ec.conf --configtest --verbose

kafka
  • 作者:shi(联系作者)
  • 发表时间:2024-07-12 17:05:11
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论