Apache Spark 集成 Apache Kafka 使用 Scala 实时数据处理

实时数据处理!

Spark 计算或者 Spark SQL 计算, 他们类似的地方是涉及的数据量庞大,计算时间长,典型的场景下一次计算的好事一般是几分钟或者几个小时。但在实际业务场景中,还有一类称作流式计算应用,需要实时对大量数据进行快速处理,最明显的特点就是处理周期短,一般是分钟级,甚至秒级、毫秒级,而且是 7 x 24 小时连续不断地进行计算。—— << Spark 最佳实践 >>

最近需要实现一个实时数据处理的应用,用什么来实现呢?经过搜集资料打算祭出 Apache Kafka 和 Apache Spark 这两个神器。

Spark Streaming 是基于 Apache Spark 核心 API 的一个扩展,他对实时流式数据处理具有可扩展性、高吞吐量、可容错性等特点。类似的其他替代品有 Flink, Storm 等等。

正如上面所讨论的,使用 Spark Streaming 读取并处理流式数据。那使用什么工具来将这些流式数据给到 Spark 呢? 在这种场景下应该使用 Apache Kafka 来完成这项任务。 Challenge accepted !

但是为什要选择 Kafka 而不直接通过 TCP 套接字来实现呢?关键在于并行性(parallelism)—— 数据在 Kafka 中被组织到不同的 topic (“主题”,在 Kafka 系统中,主题是一个数据物理分区)中,topic 又被划分成不同的分区以便并行处理,能实时处理数据流并且能同时并行让消费者得到数据,且容错性好。另外 Kafka 中的 Partition (“分区”,主题可以进一步细分为许多分区)和 Spark 中的 Partition 是一一对应的,这样一来更利于理解和优化。

现在已经足够清楚为什么使用 Spark 和 Kafka 的原因了。现在开始来组合这两个模块来实现一个实时流式数据分析的应用。

我的开发环境

  • Spark 2.2
  • Scala 2.11.12
  • SBT 1.1.1
  • Kafka 2.10-0.10.2.1
  • OS Centos 7 64bit
  • 腾讯云服务器 S2 型

根据 Spark 的开发文档中的叙述 “Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.2.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).” 所以为了避免依赖产生的问题最好使用 2.11.x 版本的 Scala。

在开始码代码之前, 最好仔细检查你的 Scala 和 SBT 是否已经安装在你的机器上。

scala -version
cat: /usr/lib/jvm/jre-1.8.0-openjdk/release: No such file or directory
Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL
sbt sbtVersion
[info] Loading project definition from /root/project
[info] Set current project to root (in build file:/root/)
[info] 1.1.1

开始使用 Scala 开发一个 Spark 的 APP

注意:随着时间的变化,这个项目中使用到的软件之间的依赖关系也会随之变化,这些依赖关系主要基于部署在你机器上的 Scala 版本,Spark 版本, SBT 版本等等。所以尽量保持上述开发环境或者根据错误日志来调整你的开发环境。

话不多说,让我们开始开始使用 Kafka、Spark 和 Scala 完成一个实时数据处理小目标吧!

目录结构

项目的文件要按照一个特定的目录组织结构来满足 SBT 的支持。文件目录结构如下:

|-- build.sbt
|-- src
|     |-- main
|     |      |-- scala
|     |      |      |-- kafka.scala

build.sbt

这个项目是基于 SBT 来管理的,这是我的 SBT 构建文件 build.sbt 相当于 C/C++ 的 makefile,这个文件主要列出了这个项目里面所有的依赖项:

name := "DirectKafkaWordCount"
version := 0.01"
scalaVersion := "2.11.12"
retrieveManaged := true
fork := true
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion,
  "org.apache.kafka" %% "kafka" % "1.0.1",
  "com.yammer.metrics" % "metrics-core" % "2.2.0",
)

整合 Spark Streaming 和 Apache Kafka

这里我们的 Spark 应用将从 Kafka 的 topic 中获取数据。如果你对这块知识完全没有了解的话,推荐你先去 Google / duckduckgo 一下什么是 Kafka?它是如何工作的,什么是 Kafka 的 topic?Spark 在这里起什么所用?权当做这是布置给你的课后作业吧~

以下是代码环节:

import org.apache.log4j.{Level, Logger}
import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 * <brokers> is a list of one or more Kafka brokers
 * <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 * $ bin/spark-submit --class "DirectKafkaWordCount" --master spark://localhost:7077 target/scala-2.11/directkafkawordcount_2.11-1.0.jar   broker1-host:port topic1
 */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
  if (args.length < 2) {
  System.err.println(s"""
  |Usage: DirectKafkaWordCount <brokers> <topics>
  | <brokers> is a list of one or more Kafka brokers
  | <topics> is a list of one or more kafka topics to consume from
  |
  """.stripMargin)
  System.exit(1)
  }

  //StreamingExamples.setStreamingLogLevels()
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  val Array(brokers, topics) = args

  // Create context with 2 second batch interval
  val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
  val ssc = new StreamingContext(sparkConf, Seconds(2))

  // Create direct kafka stream with brokers and topics
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

  // Get the lines, split them into words, count the words and print
  val lines = messages.map(_._2)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
  wordCounts.print()

  // Start the computation
  ssc.start()
  ssc.awaitTermination()
  }
}

为了使 Spark Streaming 以 standalone 模式工作,推荐多核的机器(>= 2 核),在本文的开发环境上设置 Master URL 为 local[2] 的情况下运行的很好。 以下是几种常用的 Master URL 的一些设置:

  1. local - 运行在本机的 Spark 单 worker 线程(并没有并行)
  2. local[K] - 运行在本机的的 Spark 拥有 K 个 worker 线程(在理想的情况下,设置 K 与机器的 CPU 核数一致)
  3. local[*] - 运行在本机的 Spark 和机器上的逻辑核数量一样多的 worker 线程
  4. spark://HOST:PORT - 连接到指定的 Spark 独立集群的 Master 上

到此为止,已经完成了 Saprk 在流式应用的所有工作。在运行我们的应用前,需要确保已经启动并运行了 Kafka 的一个 Topic,这里命名这个 topic 名为 “test”。如果没有,Spark 应用将不会获取并处理到流式数据。

配置Kafka 的 Broker

Broker (“代理”,是组成 Kafka 集群的物理进程)。如果你没有在你的运行环境安装过 Kafka,请自行 Google, 或者参考下 Blog 的文章。如果你已经安装了 Kafka, 那么现在就要创建一个名为 “test” 的 topic 并启动一个 Kafka producer。

启动 Zookeeper 服务

。如果你现在使用的是 kafka_2.10-0.10.2.0 目录下(或有出入,取决于你的 Kafka 版本),执行以下命令:

bin/zookeeper-server-start.sh config/zookeeper.properties

。启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

。创建 topic 创建一个名为“test” 的topic,单partition 且单 replica (在另外一个终端上) 。查看 topic 是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

运行生产者(Producer)

。这是为了为了测试 “test” 这个 topic(在另外的一个终端上)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

。你可以在终端上输入一些文字消息到 Kafka 集群上。输入的消息会被我们的 Spark 应用获取并处理。

This is a short message

运行消费者(Consumer)

。监听消费者(请在一个单独的终端上操作) 。以下命令将会侦听指定主题的输入并输出到终端上

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

。如果在生产者的终端上输入消息,这些消息将会出现在消费者的终端上。

现在 Kafka 的准备工作也已就绪了。是时候来运行我们的 Spark 应用了!

运行 Spark 应用

激动人心的时刻到了!做了那么多前期准备都是为了能顺利运行我们的 Spark 应用。首先是要打开一个终端并cd 进我们的工程目录下。剩下要做的就是使用 sbt pakeage 命令编译我们的代码。

sbt package
[info] Loading project definition from /data/saki/spark/project_DirectKafkaWordCount/project
[info] Loading settings from build.sbt ...
[info] Set current project to DirectKafkaWordCount (in build file:/data/saki/spark/project_DirectKafkaWordCount/)
[info] Compiling 1 Scala source to /data/saki/spark/project_DirectKafkaWordCount/target/scala-2.11/classes ...
[info] Done compiling.
[info] Packaging /data/saki/spark/project_DirectKafkaWordCount/target/scala-2.11/directkafkawordcount_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 17 s, completed Mar 18, 2018 11:22:03 AM

切换到 Kafka 生产者的终端上输入一些文字消息。

./start_kafka_ConsoleProducer.sh test
This is a short message

先让程序继续运行,再切换回运行着 Spark Streaming 的终端上并敲出 sbt run 命令运行我们程序。如无意外,就可以看到刚刚在生产者中断上输入的消息都被打印出来了。

sbt run

-------------------------------------------
Time: 1521343144000 ms
-------------------------------------------
(is,1)
(short,1)
(This,1)
(a,1)
(message,1)

终于完成了!Spark Streaming 现在连接上了 Apache Kafka 上并以每 2 秒一次惊人速度处理我们的流式数据。

彩蛋! Spark 还提供了一个 web UI (如无意外是 http://localhost:4040/)来获取更多的信息,可以更直观的观察你的 Spark 任务。

恭喜你又完成了一个小目标~

总结

构建这个项目中出现问题可能有两处

  • 依赖关系没有引入正确

  • 代码中引入的包有问题

中途遇到的问题记录

  • sbt package 报错:
[info]   Compilation completed in 8.631 s
/usr/local/sbt/bin/sbt-launch-lib.bash: line 58:    57 Killed

原因是内存好耗尽,把 Kafka 等耗内存的进程先关掉再重新执行 sbt package

  • sbt run 时报错:
Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics

submit 时引入依赖包,此问题是缺少kafka_2.11-0.8.2.1.jar 和 metrics-core-2.2.0.jar 下载后放置到 $SPARK_HOME/jars

  • spark 运行时错误:
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

在 Spark 的 conf 目录下,把 log4j.properties.template 修改为 log4j.properties

Done.

本文链接:/2018/03/18/apache-spark-apache-kafka-scala/
请尊重作者的劳动成果,转载请注明出处!Sakishum 博客保留对文章的所有权利。

Pingbacks已关闭。

评论
  1. ClieeHus ClieeHus 发布于 2018年9月15日 03:24 #

    Can you need something super new? Open and look at this link. Only there the choice of young girls for every unique guy and completely free! They are responsible slaves, they will and want perform anything you command !
    http://gov.shortcm.li/kings1#C35

  2. ClieeHus ClieeHus 发布于 2018年10月10日 07:17 #

    Why you! prefer something new? Open and look at this website. Only there the choice of hot pussy girls for every taste and completely free! They are hardcore slaves, they will and want do anything you say !
    https://rebrand.ly/governy#Q35

  3. Сialis Сialis 发布于 2019年2月14日 04:02 #

    Hello, yup this piece of writing is truly pleasant and I have learned lot of things from it on the topic of blogging.
    thanks.

  4. 바카라추천 바카라추천 发布于 2019年2月15日 12:18 #

    Eⲭcellent post. Ι used to be checking сonstantly this weblog and I am impressed!

    Very useful information specіfically the ultimate
    section :) I maintain ѕuch info much. I was looking
    for this certain іnfo for a lⲟng time. Thanks and best of
    luⅽk.

  5. 바카라추천 바카라추천 发布于 2019年2月15日 12:19 #

    Excellеnt post. I usеd to be checking constantly this weblog and I am
    impressed! Very usefᥙl infοrmation specifically tһe ultimate section :) I maintain such info much.

    I was looking for this certain info for a long time.
    Thanks and best of luck.

  6. athletic-store.ru athletic-store.ru 发布于 2019年2月15日 12:44 #

    Gօ᧐d day! Ɗo уou қnow if they make аny plugins to
    assist with SEO? I'm trying to get my bloց to rank for some targeted
    keywоrds but I'm not seeing very gоod resultѕ. Ιf you know of any please shaгe.
    Thanks!

  7. athletic-store.ru athletic-store.ru 发布于 2019年2月15日 12:44 #

    Good dаy! Do you knoѡ if they make any plugins to assist with SEO?
    I'm trying to get my blog to rank for some targeted ҝeywords but
    І'm not seeing very good results. If you know of any please share.
    Tһanks!

  8. http://www.pediascape.org http://www.pediascape.org 发布于 2019年2月15日 12:46 #

    Niϲe rеad, I just paѕsed thiѕ onto a c᧐lleague wһߋ was doing some research on that.

    And he just boᥙght mе lunch since I found
    it for him smile Therefore let me rephrase that: Thanks for lunch!

  9. http://www.pediascape.org http://www.pediascape.org 发布于 2019年2月15日 12:46 #

    Nice read, I ϳust passed this оnto a сolleague who wаs doing some research on that.
    And he just bought me lunch since I found it for him smile Therefore let me rephrase
    tһat: Thanks f᧐r lunch!

  10. https://bluucoin.com/index.php?action=profile https://bluucoin.com/index.php?action=profile 发布于 2019年2月15日 12:54 #

    Terrifіc work! This is the kind of informatіon that are meant to be
    shared around the net. Disgrace on the searcһ еngines for now
    not positioning thiѕ post hіgher! Come on over and consult ᴡith my
    sіte . Thank you =)

  11. https://bluucoin.com/index.php?action=profile https://bluucoin.com/index.php?action=profile 发布于 2019年2月15日 12:55 #

    Terrific woгк! This is the kind of information that are meant to be shared around the
    net. Disgrɑce on the search engines for now not posіtіoning tһis post higheг!
    Come on over and consult with my site . Тhank you =)

  12. bioinformatics.org bioinformatics.org 发布于 2019年2月15日 14:04 #

    I ԝanted to visit and let you know how considerably I cherished diѕcovering your site tοdɑy.

    We would consider it the honor to dօ things at my business office
    and be able to utilize tips shared on yօur web-site and also
    be a ρɑrt of visitߋrs' feedback like this. Should a pߋsitіon involving guest article writer become available at your end, i highly recommend you let me know.

  13. bioinformatics.org bioinformatics.org 发布于 2019年2月15日 14:04 #

    I wanted to visit and let you know how considerably I cherished discovering your site today.
    We would consider it tһе honor to do things at my business offiϲe and be able
    to utilize tips shared on your web-site and also
    be a part of viѕitors' feedback like this. Should a position involving guest article
    writer become available at your end, i highlу recommend you ⅼet me
    know.

  14. 바카라쿠폰 바카라쿠폰 发布于 2019年2月15日 14:04 #

    Ƭhis is а great tip particularly to thoѕe fresh to the bloɡospherе.

    Brief but very precise information... Thanks for sharing this one.

    A must read post!

  15. 바카라쿠폰 바카라쿠폰 发布于 2019年2月15日 14:05 #

    This iѕ a great tip particularly to those fresh to the blogosphere.

    Bгief but very precise information... Thanks for sharing this one.
    A must read рost!

  16. tide-profit.ru tide-profit.ru 发布于 2019年2月15日 14:11 #

    Eхcellеnt informatіon over agaіn. Thanks а lot.

  17. tide-profit.ru tide-profit.ru 发布于 2019年2月15日 14:11 #

    Excellent іnformation over again. Thanks a lot.

  18. 강원랜드 qoxmaos9999.wixsite.com/gangwonland 강원랜드 qoxmaos9999.wixsite.com/gangwonland 发布于 2019年2月17日 00:07 #

    Highly energetic blog, I liked that bit.
    Will there be a part 2?

  19. 강원랜드 qoxmaos9999.wixsite.com/gangwonland 강원랜드 qoxmaos9999.wixsite.com/gangwonland 发布于 2019年2月17日 00:07 #

    Highly energetic blog, I liked that bit. Will there be a part 2?

发表评论