导言
根据Gartner的数据,到2020年全球将有208亿个连接产品投入使用。Uber机器学习负责人Danny Lange的目标是将机器学习待到Uber业务的每个角落。连接事物的示例包括联网汽车和设备以及医疗保健、电信、制造、零售和金融的应用。利用来自这些设备的大量数据、实时处理事件,应用机器学习来增加价值,以及可扩展的快速存储。这些类型的应用程序的体系结构通常是事件驱动的微服务体系结构。
这是系列文章的最后一篇,该应用程序将流数据与机器学习相结合,对Uber汽车聚集的地点和时间进行实时分析和可视化,从而预测和可视化最受欢迎的Uber地点。
本系列的第一部分讨论如何创建机器学习模型,使用Apache Spark K-means算法按位置对Uber数据进行聚类。
在第二篇文章讨论了使用保存的K-mean与数据流模型做的时候尤伯杯车都聚集在那里,并实时分析。
在第三后讨论建立一个实时仪表板直观显示谷歌地图上的集群数据。下图描绘了数据管道:

本文将在Spark Streaming写入到MAPR-DB使用Spark HBase和MAPR-DB二进制接口以及使用SQL Spark和DataFrames从MAPR-DB解读。下图描绘了数据管道:
Uber旅行数据使用Kafka API发布到MapR Streams主题;
订阅第一个主题的Spark Streaming应用程序使用群集位置丰富事件,并将结果以JSON格式发布到另一个主题;
订阅第二个主题的Vert.x Web应用程序在热图中显示Uber旅行群集;
订阅第二个主题的Spark Streaming应用程序使用Spark HBase和MapR-DB二进制连接器将数据存储在MapR-DB中;
Spark批处理应用程序使用Spark HBase和MapR-DB二进制连接器使用Spark SQL查询MapR-DB;

Spark和MapR-DB
处理大量流数据时遇到的挑战之一是确定要存储它的位置。使用MapR-DB(HBase API或JSON API),表按键范围自动按群集分区,提供按行键可扩展和快速读写。

Spark HBase和MapR-DB二进制连接器利用Spark DataSource API。连接器体系结构再每个Spark Executor中都有一个HConnection对象,允许使用MapR-DB进行分布式并行写入、读取或扫描。

Spark Streaming写入MapR-DB

在第二篇文章中你可以阅读有关MapR Streams Spark Streaming代码的信息。本文专注于MapR-DB部分。来自MapR-DB主题的消息采用JSON格式,并且每个Uber行程包含以下内容:群集ID、行程日期时间、纬度和经度等。示例如下:
1{"cid":18, "dt":"2014-08-01 08:51:00", "lat":40.6858, "lon":-73.9923, "base":"B02682", "clat":40.67462874550765, "clon":-73.98667466026531}
在下面的代码中,使用HBaseContext对象创建了一个HBaseConfiguration 对象。将HBaseContext 配置广播信息携带到HConnections 执行器中。

在下面的代码中:
从消息键值对中获取消息值;
调用
HBaseContext streamBulkPut方法,传递消息值DStream,TableName写入,以及将Dstream值转换为HBase put记录的函数;
该
convertToPut函数解析JSON字符串并创建HBasePUT对象。

Spark Streaming允许大量并行发送存入HBase。

Spark SQL和DataFrames
Spark HBase和MapR-DB二进制连接器使用户能够使用Spark DataFrame在MapR-DB之上执行复杂的SQL查询,同时应用分区修建、列修建、谓词下推和数据局部性等关键技术。
要使用Spark HBase和MapR-DB二进制连接器,需要为HBase和Spark表之间的模式映射定义Catalog。以下是存储Uber数据的模式:
复合行键包含集群ID,基数,数据和时间,由下划线分隔。
存在用于存储所有数据的列族数据和用于统计汇总的列族统计数据。
有两列,一列用于纬度,一列用于每次旅行的经度。

Catalog定义了HBase和Spark表之间的映射。该目录有两个关键部分。一个是行键定义;另一个是Spark中的表列与HBase中的咧族和列名之间的映射。以下示例为MapR-DB表定义Catalog模式,其名称为/ user / user01 / db / uber,行和列分别为lat和lon。

将数据从MapR-DB加载到Spark DataFrame
在withCatalog 下面的功能中:
该
SQLContext读返回一个DataFrameReader可用于在数据帧读取数据;options函数将基础数据源的输入选项添加到
DataFrameReader;format函数指定的输入数据源格式
DataFrameReader;该
load()函数将输入加载为DataFrame。 函数df返回的前20行数据帧withCatalog随输出df.show;

输出 df.show 如下所示:

在下面的示例中, df.filter 使用给定的SQL表达式过滤器的行来过滤簇ID(行密钥的开始)>=9. select选择一组列:key 、lat 和lon 。

结果 df.show 如下所示:

以上讨论的用例体系结构的所有组件都可以与MapR融合数据平台在同一个集群上运行。
长按订阅更多精彩▼
