基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

导言


本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例。在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber数据。本文将讨论使用已保存的K-means模型和流数据实时分析Uber汽车何时何地聚集。

下图描绘了数据管道的体系结构:

  1. Uber旅行数据使用Kafka API发布到MapR Streams主题;

  2. Spark流应用程序订阅了第一个主题:

    1. 摄取Uber旅行事件流;

    2. 标识与Uber旅程的纬度和经度相对应的位置群集;

    3. 将集群位置添加到事件,并将结果以JSON格式发布到另一个主题;

  3. Spark流媒体应用程序订阅了第二个主题:

    1. 分析按日期和时间流行的优步旅行地点群集

用例数据示例


示例数据集是Uber旅行数据,在第一篇文章中已经详细的介绍。输入数据是CSV格式,以下显示一个示例:

1date/time, latitude,longitude,base
22014-08-01 00:00:00,40.729,-73.9422,B02598


丰富的数据记录采用JSON格式。如下所示:

Spark Kafka消费者、生产者代码




解析数据集记录

Scala Uber实例类定义与CSV记录对应的架构。该parseUber函数将逗号分隔值解析为Uber案例类。

加载K-Means模型

Spark KMeansModel类用于加载安装在历史Uber数据上的已保存的K-means模型。

模型的输出clusterCenters

群集中心下方显示在Google地图上:

Spark流媒体代码


这些是Spark Streaming Consumer Producer代码的基本步骤:

  1. 配置Kafka Consumer Producer属性;

  2. 初始化Spark  StreamingContext 对象。使用此上下文,创建一个从主题中读取消息的DStream;

  3. 应用转换(创建新的DStream);

  4. 将转换后的DStream中的消息写入主题;

  5. 开始接收数据和处理。等待处理停止;

1、配置Kafka Consumer Producer属性

第一步是设置KafkaConsumer KafkaProducer 配置属性,稍后将使用它们创建用于接收/发送消息到主题的DStream。需要设置以下参数:

  • 键和值反序列化器:用于反序列化消息;

  • 自动偏移重置:从最早或最晚的消息开始读取;

  • 引导服务器:这可以设置为虚拟主机:端口,因为MapR Streams实际上不使用代理地址;


有关配置参数的更多信息,可以参考MapR Streams文档。


2、初始化Spark  StreamingContext 对象

ConsumerStrategies.Subscribe,如下所示,用于设置主题和Kafka配置参数。我们使用KafkaUtils createDirectStream带有StreamingContext消费者和位置策略的方法,从MapR Streams主题创建输入流。将创建一个表示传入数据流的DStream,其中每一条消息都是一个键值对、我们使用DStream映射转换来创建带有消息值的DStream。


3、应用转换(创建新的DStream)

使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。Uber使用DStream上的map操作将消息值解析为对象。然后将RDD转换为DataFrame,它允许对流数据使用DataFrames和SQL操作。


以下是来自示例的输出df.show


VectorAssembler用于转换并返回带有向量列中纬度和经度要素列的新DataFrame。


然后,使用模型转换方法获取特征的聚类,该方法返回具有聚类预测的DataFrame。


输出categories.show如下:


然后将DataFrame注册为表,以便可以在SQL语句中使用它。SQL查询输出如下所示:


4、将转换后的DStream中的消息写入主题



查询的数据集结果将转换为JSON RDD字符串,然后RDD sendToKafka 方法用于将JSON键值对消息发送到主题(在这种情况下,键为null)。


示例消息值(输出temp.take(2))如下所示:

1{"dt":"2014-08-01 00:00:00","lat":40.729,"lon":-73.9422,"base":"B02598","cluster":7}
2{"dt":"2014-08-01 00:00:00","lat":40.7406,"lon":-73.9902,"base":"B02598","cluster":7}


5、开始接收数据处理它,然后等待处理停止

要开始接收数据,必须显示调start()用上StreamingContext,然后调用awaitTermination等待流计算来完成。

Spark Kafka消费者代码


接下来,我们介绍JSON丰富消息的Spark流代码。


使用Spark Structype指定模式:

以下是代码:

  • 创建直接Kafka流;

  • 将JSON消息值转换为 与架构一起Dataset[Row] 使用  spark.read.json

  • 为后续SQL查询创建两个临时视图;

  • 使用ssc.remember 缓存的查询数据;



现在我们可以查询流数据,查询哪些小时接单次数最多?

1spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")



每个群集中发生了多少次接单?

1df.groupBy("cluster").count().show()


...要么:

1spark .sql(“select cluster,count(cluster)as uber group by cluster”)



一天中的哪个小时和哪个集群的接单次数最多?

1spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")



显示Uber旅行的日期时间和群集计数:

1%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster order by dt, cluster


结语


在本文中,学习了如何在Spark Streaming应用程序中使用Spark机器学习模型,以及如何将Spark Streaming与MapR Streams集成以使用Kafka API使用和生成消息。

参考信息:

  • 将Spark与MapR Streams文档集成

  • 免费在线培训MapR Streams,Spark at learn.mapr.com

  • Apache Spark Streaming编程指南

  • 使用Apache API的实时流数据管道:Kafka,Spark Streaming和HBase

  • Apache Kafka和MapR Streams:术语,技术和新设计


长按订阅↓