Kafka入门 专题
您的位置:Kafka > Kafka入门 专题 > Apache Kafka 与Spark的集成
Apache Kafka 与Spark的集成
作者:--    发布时间:2019-11-20

在本章中,我们将讨论如何将apache kafka与spark streaming api集成。

关于spark

spark streaming api支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从诸如kafka,flume,twitter等许多源中提取,并且可以使用复杂的算法来处理,例如地图,缩小,连接和窗口等高级功能。 最后,处理的数据可以推送到文件系统,数据库和活动仪表板。 弹性分布式数据集(rdd)是spark的基本数据结构。 它是一个不可变的分布式对象集合。 rdd中的每个数据集划分为逻辑分区,可以在集群的不同节点上计算。

与spark集成

kafka是spark流式传输的潜在消息传递和集成平台。 kafka充当实时数据流的中心枢纽,并使用spark streaming中的复杂算法进行处理。 一旦数据被处理,spark streaming可以将结果发布到另一个kafka主题或存储在hdfs,数据库或仪表板中。 下图描述了概念流程。

integration with spark

现在,让我们详细了解kafka-spark api。

sparkconf api

它表示spark应用程序的配置。 用于将各种spark参数设置为键值对。

sparkconf 类有以下方法 -

  • set(string key,string value) - 设置配置变量。

  • remove(string key) - 从配置中移除密钥。

  • setappname(string name) - 设置应用程序的应用程序名称。

  • get(string key) - get key

streamingcontext api

这是spark功能的主要入口点。 sparkcontext表示到spark集群的连接,可用于在集群上创建rdd,累加器和广播变量。 签名的定义如下所示。

public streamingcontext(string master, string appname, duration batchduration, 
   string sparkhome, scala.collection.seq<string> jars, 
   scala.collection.map<string,string> environment)
  • - 要连接的群集网址(例如mesos:// host:port,spark:// host:port,local [4])。

  • appname - 作业的名称,以显示在集群web ui上

  • batchduration - 流式数据将被分成批次的时间间隔

public streamingcontext(sparkconf conf, duration batchduration)

通过提供新的sparkcontext所需的配置创建streamingcontext。

  • conf - spark参数

  • batchduration - 流式数据将被分成批次的时间间隔

kafkautils api

kafkautils api用于将kafka集群连接到spark流。 此api具有如下定义的显着方法 createstream

public static receiverinputdstream<scala.tuple2<string,string>> createstream(
   streamingcontext ssc, string zkquorum, string groupid,
   scala.collection.immutable.map<string,object> topics, storagelevel storagelevel)

上面显示的方法用于创建从kafka brokers提取消息的输入流。

  • ssc - streamingcontext对象。

  • zkquorum - zookeeper quorum。

  • groupid - 此消费者的组id。

  • 主题 - 返回要消费的主题的地图。

  • storagelevel - 用于存储接收的对象的存储级别。

kafkautils api有另一个方法createdirectstream,用于创建一个输入流,直接从kafka brokers拉取消息,而不使用任何接收器。 这个流可以保证来自kafka的每个消息都包含在转换中一次。

示例应用程序在scala中完成。 要编译应用程序,请下载并安装 sbt ,scala构建工具(类似于maven)。 主要应用程序代码如下所示。

import java.util.hashmap

import org.apache.kafka.clients.producer.{kafkaproducer, producerconfig, produc-errecord}
import org.apache.spark.sparkconf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object kafkawordcount {
   def main(args: array[string]) {
      if (args.length < 4) {
         system.err.println("usage: kafkawordcount <zkquorum><group> <topics> <numthreads>")
         system.exit(1)
      }

      val array(zkquorum, group, topics, numthreads) = args
      val sparkconf = new sparkconf().setappname("kafkawordcount")
      val ssc = new streamingcontext(sparkconf, seconds(2))
      ssc.checkpoint("checkpoint")

      val topicmap = topics.split(",").map((_, numthreads.toint)).tomap
      val lines = kafkautils.createstream(ssc, zkquorum, group, topicmap).map(_._2)
      val words = lines.flatmap(_.split(" "))
      val wordcounts = words.map(x => (x, 1l))
         .reducebykeyandwindow(_ &plus; _, _ - _, minutes(10), seconds(2), 2)
      wordcounts.print()

      ssc.start()
      ssc.awaittermination()
   }
}

构建脚本

spark-kafka集成取决于spark,spark流和spark与kafka的集成jar。 创建一个新文件 build.sbt ,并指定应用程序详细信息及其依赖关系。 在编译和打包应用程序时, sbt 将下载所需的jar。

name := "spark kafka project"
version := "1.0"
scalaversion := "2.10.5"

librarydependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
librarydependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
librarydependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/包装

运行以下命令以编译和打包应用程序的jar文件。 我们需要将jar文件提交到spark控制台以运行应用程序。

sbt package

提交到spark

启动kafka producer cli(在上一章中解释),创建一个名为 my-first-topic 的新主题,并提供一些样本消息,如下所示。

another spark test message

运行以下命令将应用程序提交到spark控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "kafkawordcount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

此应用程序的示例输出如下所示。

spark console messages ..
(test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

网站声明:
本站部分内容来自网络,如您发现本站内容
侵害到您的利益,请联系本站管理员处理。
联系站长
373515719@qq.com
关于本站:
编程参考手册