博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume+Kafka+SparkStreaming+Hbase+可视化(二)
阅读量:7079 次
发布时间:2019-06-28

本文共 5613 字,大约阅读时间需要 18 分钟。

分布式消息缓存Kafka
 
1、消息中间件:生产者和消费者 生产者、消费者、数据流(消息)
  • 发布和订阅消息
  • 容错存储消息记录
  • 处理流数据
Kafka架构:
procedure:生产者
consumer:消费者
broker:容错存储
topic:分类主题、标签
consumer group:一个consumer最多消费一个分区的数据 consumer数量=partitions
磁盘顺序读写,省掉寻道时间,提高性能
零字节拷贝:内核空间和用户空间不直接拷贝、SendFile
/opt/bigdata/kafka_2.11-1.0.0/kafka-log2s/logkafka-0/00000000000000000000.index index的序号就是message在日志文件中的相对offset(偏移量)
offsetIndex是稀疏索引,先根据offset找到对应log文件,计算 offset - (log文件第一个offset -1) 得到相对索引,再到index文件找到消息。如果index找不到,则取最近的,再去log文件对应位置向下查找
ack: 0 :不等待broker返回确认消息,无阻塞
1 :partitions 中的leader 保存成功
-1: partitions 中的leader和follower都成功
 
启动ZK:
启动Kafka:kafkaStart.sh
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server0.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server1.properties &
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server2.properties &
 
创建Topic:
kafka-topics.sh --create --zookeeper bigdata:2181,bigdata:2182,bigdata:2183 --replication-factor 1 --partitions 1 --topic logkafka
--partitions 可以提高消费并发
 
查看Topic:
kafka-topics.sh --list --zookeeper bigdata:2181
kafka-topics.sh --describe --zookeeper bigdata:2181 --topic test (指定Topic,否则查看所有topic的详细信息)
 
发送消息:
kafka-console-producer.sh --broker-list localhost:9092 --topic logkafka
 
接受消息:
kafka-console-consumer.sh --zookeeper bigdata:2181 --topic logkafka --from-beginning (--from-beginning . 是否从头开始消费消息)
 
停止Kafka:kafkaStop.sh
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server0.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server1.properties &
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server2.properties &
 
两种方式连接Kafka:简单理解为:Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据
Receiver:
1、Kafka中topic的partition与Spark中RDD的partition是没有关系的,因此,在KafkaUtils.createStream()中,提高partition的数量,只会增加Receiver的数量,也就是读取Kafka中topic partition的线程数量,不会增加Spark处理数据的并行度。
2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream,然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。由于数据消费偏移量是保存在checkpoint中,因此,如果后续想使用kafka高级API消费数据,需要手动的更新zookeeper中的偏移量
 
2、API操作
<dependency>  
       <groupId>org.apache.kafka</groupId>  
        <artifactId>kafka_2.11</artifactId>  
        <version>1.0.0</version>     
</dependency>  
 
Scala版 Producer :
package com.kafkaimport java.util.HashMapimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}object producer {  def main(args: Array[String]): Unit = {  // 传参  if (args.length < 4){  System.err.println("Usage: producer 
")  System.exit(1)  }  val Array(brokers, topics, messageSec, wordsPerMessage) = args  // ZK 配置  val zkProps = new HashMap[String, Object]()  zkProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)  zkProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")  zkProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")  // Kafka Producer  val producer = new KafkaProducer[String, String](zkProps)  var i = 0  for ( i <- 1 to 10) {    (1 to messageSec.toInt).foreach { messageNum =>      val msg = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")      val msgs = new ProducerRecord[String, String](topics, null, msg)      producer.send(msgs)    }    Thread.sleep(100)   }  }}

  

3、整合Flume:
conf1:exec-mem-avro.conf
# Name the components on this agenta1.sources = exec-sourcea1.channels = memory-channela1.sinks = avro-sink # configure for sourcesa1.sources.exec-source.type = execa1.sources.exec-source.command = tail -F /opt/datas/log-collect-system/log_server.log # configure for channelsa1.channels.memory-channel.type = memorya1.channels.memory-channel.capacity = 1000a1.channels.memory-channel.transactionCapacity = 100 # configure for sinksa1.sinks.avro-sink.type = avroa1.sinks.avro-sink.hostname = localhosta1.sinks.avro-sink.port = 44444 # configurea1.sinks.avro-sink.channel = memory-channela1.sources.exec-source.channels = memory-channel

 

Kafka conf:exec-memory-kafka.cnf
# Name the components on this agenta1.sources = avro-sourcea1.channels = memory-channela1.sinks = logger-sink # configure for sourcesa1.sources.avro-source.type = avroa1.sources.avro-source.bind = localhosta1.sources.avro-source.port = 44444 # configure for channelsa1.channels.memory-channel.type = memorya1.channels.memory-channel.capacity = 1000a1.channels.memory-channel.transactionCapacity = 100 # configure for sinksa1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink#a1.sinks.kafka-sink.bootstrap.servers = bigdata:9092,bigdata:9093,bigdata:9094a1.sinks.kafka-sink.brokerList = bigdata:9092,bigdata:9093,bigdata:9094a1.sinks.kafka-sink.topic = logkafka # configurea1.sinks.kafka-sink.channel = memory-channela1.sources.avro-source.channels = memory-channel

  

转载于:https://www.cnblogs.com/mlxx9527/p/9367543.html

你可能感兴趣的文章
文件系统中的目录查找
查看>>
[转]优化数据库大幅度提高Oracle的性能
查看>>
openwrt-智能路由器hack技术(1)---"DNS劫持"
查看>>
第十二章 数据备份与还原
查看>>
[redis] Redis 配置文件置参数详解
查看>>
Java 多线程程序设计
查看>>
浅谈TypeScript
查看>>
REST API出错响应的设计(转)
查看>>
js弹出层学习
查看>>
Oracle配置和使用闪回
查看>>
thinkphp中的AJAX返回ajaxReturn()
查看>>
BZOJ4347 : [POI2016]Nim z utrudnieniem
查看>>
jquery validate自定义checkbox验证规则和样式
查看>>
WPF自定义控件与样式(14)-轻量MVVM模式实践
查看>>
EF Code First学习系列
查看>>
Memcache服务器端参数说明
查看>>
SQLServer 复制和数据库镜像 具体配置部署
查看>>
ASP.NET MVC Model绑定的简单应用
查看>>
长期演进技术(LTE,Long Term Evolution)
查看>>
数学之路-python计算实战(5)-初识numpy以及pypy下执行numpy
查看>>