作者:--
发布时间:2019-11-20
评论:0
阅读:0
在本章中,将讨论如何将apache kafka与spark streaming api集成。
spark是什么?
spark streaming api支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从kafka,flume,twitter等许多来源获取,并且可以使用复杂算法进行处理,例如:映射,缩小,连接和窗口等高级功能。 最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。 弹性分布式数据集(rdd)是spark的基础数据结构。 它是一个不可变的分布式对象集合。 rdd中的每个数据集都被划分为逻辑分区,这些分区可以在集群的不同节点上进行计算。
与spark整合
kafka是spark流媒体的潜在消息传递和集成平台。 kafka充当实时数据流的中心枢纽,并使用spark streaming中的复杂算法进行处理。 数据处理完成后,spark streaming可以将结果发布到hdfs,数据库或仪表板中的另一个kafka主题中。 下图描述了概念流程。
![]()
现在,详细介绍一下kafka-spark api。
sparkconf api
它代表spark应用程序的配置。 用于将各种spark参数设置为键值对。
sparkconf类具有以下方法 -
set(string key, string value) − 设置配置变量。remove(string key) − 从配置中删除键。setappname(string name) − 为应用程序设置应用程序名称。get(string key) − 获得键。
streamingcontext api
这是spark功能的主要入口点。 sparkcontext表示与spark群集的连接,并且可用于在群集上创建rdd,累加器和广播变量。 签名的定义如下所示。
public streamingcontext(string master, string appname, duration batchduration,
string sparkhome, scala.collection.seq<string> jars,
scala.collection.map<string,string> environment)
- master - 要连接的群集url(例如,
mesos://host:port,spark://host:port,local [4])。 - appname - 作业的名称,以显示在集群web ui上。
- batchduration - 流数据将被分成批次的时间间隔。
public streamingcontext(sparkconf conf, duration batchduration)
通过提供新的sparkcontext所需的配置来创建streamingcontext。
- conf - spark参数。
- batchduration - 流数据将被分成批次的时间间隔。
kafkautils api
kafkautils api用于将kafka集群连接到spark流。 该api具有如下定义的重要方法createstream签名。
public static receiverinputdstream<scala.tuple2<string,string>> createstream(
streamingcontext ssc, string zkquorum, string groupid,
scala.collection.immutable.map<string,object> topics, storagelevel storagelevel)
上面显示的方法用于创建从kafka brokers中提取消息的输入流。
ssc - streamingcontext对象。zkquorum - zookeeper仲裁。groupid - 此消费者的组id。topics - 返回要消费的主题地图。storagelevel - 用于存储接收对象的存储级别。
kafkautils api还有另一种方法createdirectstream,它用于创建一个输入流,直接从kafka brokers中提取消息而不使用任何接收器。 此流可以保证来自kafka的每条消息都只包含在一次转换中。
示例应用程序在scala中完成。 要编译应用程序,请下载并安装sbt,scala构建工具(与maven类似)。 主应用程序代码如下所示。
import java.util.hashmap
import org.apache.kafka.clients.producer.{kafkaproducer, producerconfig, produc-errecord}
import org.apache.spark.sparkconf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object kafkawordcount {
def main(args: array[string]) {
if (args.length < 4) {
system.err.println("usage: kafkawordcount <zkquorum><group> <topics> <numthreads>")
system.exit(1)
}
val array(zkquorum, group, topics, numthreads) = args
val sparkconf = new sparkconf().setappname("kafkawordcount")
val ssc = new streamingcontext(sparkconf, seconds(2))
ssc.checkpoint("checkpoint")
val topicmap = topics.split(",").map((_, numthreads.toint)).tomap
val lines = kafkautils.createstream(ssc, zkquorum, group, topicmap).map(_._2)
val words = lines.flatmap(_.split(" "))
val wordcounts = words.map(x => (x, 1l))
.reducebykeyandwindow(_ + _, _ - _, minutes(10), seconds(2), 2)
wordcounts.print()
ssc.start()
ssc.awaittermination()
}
}
构建脚本
spark-kafka集成取决于spark,spark流和spark kafka集成jar。 创建一个新的文件build.sbt并指定应用程序的详细信息及其依赖关系。 sbt将在编译和打包应用程序时下载必要的jar。
name := "spark kafka project"
version := "1.0"
scalaversion := "2.10.5"
librarydependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
librarydependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
librarydependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
编译/包
运行以下命令来编译和打包应用程序的jar文件。 需要将jar文件提交到spark控制台来运行应用程序。
sbt package
提交给spark
启动kafka producer cli(在前一章中介绍),创建一个名称为my-first-topic的新主题,并提供一些示例消息,如下所示。
another spark test message
运行以下命令将应用程序提交到 spark 控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "kafkawordcount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
这个应用程序的输出示例如下所示。
spark console messages ..
(test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..