车联⽹:基于spark的车辆分析
⾃2012年以来,公安部交通管理局在全国范围内推⼴了机动车缉查布控系统(简称卡⼝系统),通过整合共享各地车辆智能监测记录等信息资源,建⽴了横向联⽹、纵向贯通的全国机动车缉查布控系统,实现了⼤范围车辆缉查布控和预警拦截、车辆轨迹、交通流量分析研判、重点车辆布控、交通违法⾏为甄别查处及侦破涉车案件等应⽤。在侦破肇事逃逸案件、查处涉车违法⾏为、治安防控以及反恐维稳等⽅⾯发挥着重要作⽤。
随着联⽹单位和接⼊卡⼝的不断增加,各省市区部署的机动车缉查布控系统积聚了海量的过车数据。截⾄⽬前,全国32个省(区、市)已完成缉查布控系统联⽹⼯作,接⼊卡⼝超过50000个,汇聚机动车通⾏数据总条数超过2000亿条。以⼀个中等规模省市为例,每地市每⽇采集过车信息300万条,每年采集过车信息10亿条,全省每年将汇聚超过200亿条过车信息。如何将如此海量的数据管好、⽤好成为各省市所⾯临的巨⼤挑战。
随着车辆⽹以及汽车卡⼝应⽤的不断扩⼤,车辆数据的不断积累。对于原始数据的存储、处理、查询是⼀个很⼤的考验,为此我们需要⼀个能实时处理、多维度查询的分布式计算的平台。
⼀、关键需求分解
1.  车辆轨迹查询
能够根据输⼊的车牌号,或通过车牌号模糊查询对车辆进⾏状态查询、订单轨迹追踪。过车记录查询,过车轨迹查询,落脚点分析,进⾏轨迹回放。
2.  地理位置检索
能够根据经纬度坐标快速的进⾏经纬度的过滤,如指定⼀个坐标,快速圈定周边10公⾥内的车辆。
3.  多维碰撞, 多维度查询
要求可以有5个条件的维度查询,最常⽤的是时间,终端号,类型。
可以根据多个维度进⾏任意条件的组合过滤,进⾏数据碰撞。
也可以根据多个地理坐标进⾏车辆碰撞分析。
4.  车辆出⾏规律分析,
可以按照⼀辆车,或⼀批车辆进⾏统计分析,了解车辆的出⾏规律,出⾏时间,频繁出⼊地点。
5.  出⾏规律异常车辆分析
选定某⼀区域的,周边陌⽣⼈/车的识别。出⾏规律异常的⼈/车识别。c180k
6.  伴随分析
⼈车轨迹拟合,判断是否有代驾⾏为,有尾随,盯梢识别。
7.  数据碰撞分析
能够根据根据多个地理位置以及时间进⾏数据碰撞,连环时间进⾏数据碰撞分析。
8.  重点车辆分析
根据统计⼀定区域范围内的客运、危险品运输、特殊车辆等重点车辆通⾏数量,研判发现通⾏规律。对在路段内⾏驶时间异常的车辆、⾸次在本路段⾏驶的重点车辆、2到5点仍在道路上⾏驶的客运车辆等进⾏预警提⽰。
9.  车辆出⼊统计分析
挖掘统计⼀段时间内在某⼀个区域内(可设定中⼼城区、地市区域、省市区域、⾼速公路等区域)、进出区域、主要⼲道的经常⾏驶车辆、“候鸟”车辆、过路车辆的数量以及按车辆类型、车辆发证地的分类统计。普桑汽车
⼆、关键技术能⼒要求
1.  数据规模-数据节点数
能够承载⽇均数百亿条增量,数据要可以长久保留
也要⽀撑未来三到五年,每天百亿,甚⾄数千亿条数据增量。
每个数据节点每天能处理20亿的数据量。
2.  查询与统计功能灵活性
根据不同的⼚商,车型,往往在逻辑上有较⼤的区别,他们业务的不同查询逻辑也会有较⼤的区别,故⼀个查询系统要求⾮常灵活,可以处理复杂的业务逻辑,算法,⽽不是⼀些常规的简单的统计。
能⽀持复杂SQL
当业务满⾜不了需求的时候可以拓展SQL,⾃定义开发新的逻辑,udf,udaf,udtf。
迈腾的价格要能⽀持模糊检索
对于邮箱、⼿机号、车牌号码、⽹址、IP地址、程序类名、含有字母与数字的组合之类的数据会匹配不完整,导致数据查不全,因分词导致漏查以及缺失数据,对于模糊检索有精确匹配要求的场景下,
业务存在较⼤的风险
多维分析多维碰撞
要求可以有5个条件的维度查询,最常⽤的是时间,终端号,类型。
3.  检索与并发性能
每次查询在返回100条以内的数据时能在1秒内返回,并发数不少于200(6个节点以内)。对于并发数要做到随着节点数的增加可以按⽐例增加。
4.  数据导⼊与时效性
对数据时效性要求较⾼,要求某⼀车辆在经过产⽣数据后,可达到分钟级别内系统可查可分析。对检索性能要求很⾼,以上典型需求均要求能够在秒级内返回结果及明细。
采⽤SQL⽅式的批量导⼊,也要⽀持kafka的流式导⼊
5.  稳定性-与单点故障
易于部署,易于扩容,易于数据迁移;
多数据副本保护,硬件不怕硬件损坏;
服务异常能⾃动检测及恢复,减轻运维⼈员经常需要半夜起床的痛苦;
系统不能存在任何单点故障,当某个服务器存在问题时不能影响线上业务。
数据过百亿后,不能频繁的OOM,也不能出现节点调⽚的情况。
系统出现异常后,可以⾃动侦探服务异常,并⾃动重启恢复服务,不能每次调⽚都要运维    ⼈员半夜去机房重启。需要服务有⾃动迁移与恢复的特性,⼤幅减少运维⼈员驻场的⼯作量。
提供了导⼊与查询的限流控制,也提供了过载保护控制,甚⾄在极端场景提供了有损查询与有损服务
6.  要有较⾼的排序性能
排序可以说是很多⽇志系统的硬指标(如按照时间逆序排序),如果⼀个⼤数据系统不能进⾏排序,基本上是这个系统属于不可⽤状态,排序算得上是⼤数据系统的⼀个“刚需”,⽆论⼤数据采⽤的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的。
7.  ⽤户接⼝
尽量是SQL接⼝。如果是程序接⼝学习成本与接⼊成本均较⾼。
8.  ⽅便与周边系统的导⼊导出
能与现有的常见系统如hadoop,hive ,传统数据库,kafka等集成,⽅便数据导⼊导出。
⽀持原始数据的任意维度导出
可以全表,也可以通过过滤筛选局部导出
⽀持数据经过各种组合计算过滤后的导出
可以将Y多个表与其他系统的多个表,进⾏组合筛选过滤计算后在导出
可以将多个数据从⼀张表导⼊到、另外⼀张表
可以将数据导出到别的系统⾥⾯(如hive,hbase,数据库等)
也可以将其他系统的数据导⼊到当前系统⾥⾯。
可以导出成⽂件,也可以从⽂件导⼊。
可以从kafka流式导⼊,也可以写插件,导出到kafka。
9. 数据存储与恢复
数据不能存储在本地磁盘,迁移难,恢复也难。
1).磁盘读写没有很好的控速机制,导⼊数据没有良好的流量控制机制,⽆法控制流量,⽽⽣产系统,磁盘控速与流量控速是必须的,不能因为业务⾼峰对系统造成较⼤的冲击,导致磁盘都hang住或挂掉。
2).本地硬盘局部坏点,造成局部数据损坏对于系统来说可能⽆法识别,但是对于索引来说哪怕是仅仅⼀个byte数据的读异常,就会造成索引指针的错乱,导致检索结果数据丢失,甚⾄整个索引废掉,但是本地磁盘不能及时的发现并修正这些错误。
3).数据存储在本地磁盘,⼀旦本地将近20T的存储盘损坏,需要从副本恢复后才能继续服务,恢复时间太长。
要将数据存储在HDFS之上
1).基于HDFS做了磁盘与⽹络做了读写控速逻辑。
2).磁盘局部坏点hdfs配有crc32校验,有坏点会⽴即发现,并不影响服务,会⾃动切换到没有坏点的数据继续读取。
lancer ex
3).本地磁盘损坏,HDFS⾃动恢复数据,不会中断读写,不会有服务中断。
10. 数据迁移
不能采取这样的⽅案:
夸机房搬迁机器,不能让运维⼈员细⼼的进⾏索引1对1复制,这种搬迁⽅案往往要数星期,且⾮常容易出错。
迁移过程中为了保证数据的⼀致性,需要中断服务或者中断数据的实时导⼊,让数据静态化落地后不允许在变化后,才能进⾏迁移,这种⽅案业务中断时间太久。要采取这样的迁移⽅案
1.hdfs通过balance⾃动迁移数据。
2.可以控制迁移过程中的带宽流量。
2.迁移过程中不中断服务,hdfs扩容与移除机器也对服务没影响。
11. 增加主备kafka
采⽤的是KAFKA主备设置,当主个KAFKA出现问题时会⾃动切换到备KAFKA,不影响线上业务。
12. 可扩展性-预警与在线扩容
当系统存储出现瓶颈时能及时报警,可容易的对存储进⾏扩容和数据均衡。在扩容时可以在线扩容。
13. 系统监控
有成熟的系统存储监控平台,可以对平台的运⾏状态进⾏实时监控,⼀旦出现问题可以及时告知监控⼈员.
⼀、业界现有⽅案—优缺点分析
1.  开源⼤数据系统解决⽅案(Hadoop、Spark、Hive、Impala)
数据规模-数据节点数√基于HDFS之上,数据可⽆限拓展,存储PB级的数据很轻松。
查询与统计功能灵活性√1.SQL⽀持较为齐全。
2.与周边系统的集成⾮常⽅便,数据导⼊导出灵活。
3.⽀持JDBC⽅式,可以与常见的报表系统⽆缝集成
检索与并发性能×该类系统并⾮为即席查询⽽设计,⽐较适合离线分析,通常来说⼀个HiveSQL运⾏时间从⼏分钟到⼏⼩时不等,如果是百亿规模的数据分析时间可能会达到数个⼩时,如果以现有XX部门的预算来看,可能需要数天的时间,究其根本原因是该类系统是采⽤暴⼒扫描的⽅式,即如果是100亿条数据,也是采⽤从头遍历到末尾的⽅式扫描,性能可想⽽知,
基本⽆并发性可⾔。单并发就需要数⼩时。
数据导⼊与时效性×HDFS的特性导致数据延迟较⼤,常规应⽤均是T+1数据,即延迟⼀天。稳定性-与单点故障√⽆单点故障,⽐较完善
排序性能×采⽤暴⼒排序⽅式,业界第⼀腾讯采⽤512台机器,也是90多秒响应
⽤户接⼝√采⽤hive jdbc接⼝,⽬前hive为⼤数据SQL的即席标准
⽅便与周边系统的导⼊导出√
由于采⽤了hive接⼝,⽣态圈均基于该⽣态圈开发,与周围⽣态系统集成⾮常⽅便,有⼀系列的⽣态⼯具可⽤,可⽤与常见的系统集
数据存储与恢复√hadoop的长项,硬件损坏,机器宕机后可⾃动迁移任务,不需要⼈⼯⼲预,中间不影响服务。
1.从⼀开始设计之初,Hadoop即假设所有的硬件均不可靠,⼀旦硬件损坏,数据不会丢失,有多份副本可以⾃动恢
复数据。
2.数据迁移以及机器扩容有⽐较完备的⽅案,中间不停服务,动态扩容。
数据迁移√
增加主备kafka×hive不能对接kafka
预警与在线扩容√业界有完完备的⽅案
系统监控√hdp有完完备的⽅案
2.  流计算系统(Storm、Spark Streaming)
数据规模-数据节点数√数据规模可随节点拓展
查询与统计功能灵活性×⽆法查看明细数据,只能看特定粒度的汇总结果,⽽过车记录是⽆法先计算出来的,即⽆法预知那个车有可能会犯罪,那个车会出事故,故⽆法预计算。
检索与并发性能√1.预先将需要查询的数据计算好,查询的时候直接访问预计算好的结果,性能⾮常好。
2.预计算完毕的结果集存储在HBase或传统数据库⾥,因数据规模并不⼤故并发性⽐较好。
数据导⼊与时效性√时效性⾮常好,⼀般与Kafka采⽤消息队列的⽅式导⼊,时效性可达⼏秒可见。稳定性-与单点故障√⽆单点故障,⽐较完善
排序性能√预计算的⽅式,排序结果预先算好,性能⽐较好
⽤户接⼝×java接⼝,有独⽴的API,需要写类似mapreduce的程序
⽅便与周边系统
的导⼊导出
×⽐较难,需要单独独⽴开发对接程序
数据存储与恢复√
损坏的机器会⾃动摘除,进⾏会⾃动迁移,服务不中断。
数据迁移√数据迁移,扩容,容灾均有完善的⽅案,Storm的扩容需要简单的Rebanlance即可。增加主备kafka√可以⽀持
预警与在线扩容√有完完备的⽅案
系统监控√有完完备的⽅案
3.  全⽂检索系统(Solr、ElasticSearch)
数据规模-数据节点数×1.典型使⽤场景在千万级别,如果给予较⼤内存,数据量可上亿。
2.本⾝系统内存的限定,百亿以上将会是巨⼤的挑战-除⾮是512G内存的机器,弄个20~30台左右,且是数据总量百亿,⽽不是每天百亿。
查询与统计功能灵活性×1.为搜索引擎的场景⽽⽣,分析功能较弱。只有最简单的统计功能,⽆法满⾜
过车记录复杂的统计分析需求,⽆法⽀撑复杂SQL,多表关联,嵌套SQL甚⾄⾃定义函数等功能。
2.与周边系统的集成⿇烦,数据导⼊导出太⿇烦,甚⾄不可⾏,第三⽅有SQL引擎插件,但均是简单SQL,且由于Merger server是单节点的问题,很多SQL的查询性能很低,不具备通⽤性。
东风起亚悦达3.⽆法与常见的⽀持jdbc标准的报表系统集成,定制开发代价较⼤。
4. 对于邮箱、⼿机号、车牌号码、⽹址、IP地址、程序类名、含有字母与数字的组合之类的数据会匹配不完整,导致数据查不全,因分词导致漏查以及缺失数据,对于模糊检索有精确匹配要求的场景下,业务存在较⼤的风险。
5. 基于lucene的分词来实现,但并不考虑单词的匹配顺序,也不保证匹配词语的连续性,中间可以穿插其他单词。
6.solr与es中不⽀持多列的group by与统计(原因为⽆法交叉),所谓的实现是通过单列group by后 进⾏的笛卡尔及,按照每个单元格重新进⾏的查询。
检索与并发性能√1.采⽤倒排索引,直接根据索引定位到相关记录,⽽不需要采⽤全表暴⼒扫描的⽅式,检索查询性能特别⾼。
2.在千万级别以下,并且给予较多内存的情况下,并发情况很好。
数据导⼊与时效性×1.⽀持实时导⼊,在千万数据规模下导⼊性能较好。
2.数据过亿后,⽣产系统实时导⼊经常会出现OOM,以及CPU负载太⾼的问题,故过亿数据⽆法实时导⼊数据,⼀般过百亿的系统均采⽤离线创建索引的⽅式,即数据时效性延迟⼀天。
3.没有良好的合并控制策略,系统会发⽣阶段性(⼏分钟)的负载极⾼的情况(索引合并),此时系统资源占⽤特别⾼,前台查询响应速度极慢。
稳定性-与单点故障×1.数据规模⼀旦过百亿,就会频繁的出现OOM,节点调⽚的情况。
2.⼀旦调⽚后⽆法⾃动恢复服务,需要运维⼈员去重启相关服务。
3.系统⽆过载保护,经常是⼀个⼈员做了⼀个复杂的查询,导致集整体宕机,系统崩溃。
lucene在索引合并过程中,每进⾏⼀次commit都要进⾏⼀次全范围的ord关系的重新映射,数据规模⼩的时候整个索引⽂件的映射还没什么,但是当数据量达到亿级别,甚⾄百亿级别后,这种映射关系会占⽤超多的CPU、内存、硬盘资源,所以当数据量过亿后,solr与Es在数据⽐较⼤的情况下,实时索引⼏乎是不可能的,频繁的ord关系映射,会让整个系统不可⽤。
排序性能×采⽤暴⼒全表遍历的⽅式排序,性能较差,经常因为排序导致整个系统瘫痪。
采⽤lucene的Sort接⼝实现,本质是借助docvalues的暴⼒扫描,如果数据量很⼤排序过程耗费⾮常多的内存与IO,并且排序耗时很⾼。
⽤户接⼝×采⽤java API的⽅式,⽤户学习成本⾼。
因不是通⽤的通讯协议,与其他⼤数据系统集成对接⿇烦。
⽅便与周边系统的导⼊导出×
⽐较难,需要单独独⽴开发对接程序,
数据如若想导出到其他系统很难,超过百万级别的导出基本是不可⾏的,没有成型的⾼可⽤的导出⽅案。
全量数据导出基本是不可能的,更别谈经过多表复杂运算后的导出了
数据存储与恢复×索引存储在本地硬盘,恢复难
1.磁盘读写没有很好的控速机制,导⼊数据没有良好的流量控制机制,⽆法控制流量,⽽⽣产系统,磁盘控速与流量控速是必须的,不能因为业务⾼峰对系统造成较⼤的冲击,导致磁盘都hang住或挂掉。
2.本地硬盘局部坏点,造成局部数据损坏对于lucene来说⽆法识别,但是对于索引来说哪怕是仅仅⼀个byte数据的读异常,就会造成索引指针的错乱,导致检索结果数据丢失,甚⾄整个索引废掉,但是solr与es不能及时的发现并修正这些错误。
3.数据存储在本地磁盘,⼀旦本地将近20T的存储盘损坏,需要从副本恢复后才能继续服务,恢复时间太长。
数据迁移×1.如若夸机房搬迁机器,需要运维⼈员细⼼的进⾏索引1对1复制,搬迁⽅案往往要数星期,且⾮常容易出错。
2.迁移过程中为了保证数据的⼀致性,需要中断服务或者中断数据的实时导⼊,让数据静态化落地后不允许在变化后,才能进⾏迁移。
增加字段√⽀持
增加主备kafka×不⽀持,需要业务单独开发导⼊api
预警与在线扩容×分⽚数不可以随意更改,如果要扩分⽚数,需要重建全部的历史索引,也就是传说的reindex,另外出现问题后⽆法⾃动恢复服务,需要运维⼈员去现场恢复服务
系统监控√es本⾝有收费版的监控系统
⼆、最终⽅案-延云YDB混合⽅案集成多个系统的优势
针对上述典型场景,我们最终将多个系统整合,发挥系统的各⾃优势,扬长避短,深度集成。延云YDB作为机动车缉查布控即席分析引擎,已经在10个以上城市的成功部署或测试,取得⾮常好的效果,有的甚⾄超过了客户的预期。
YDB是⼀个基于Hadoop分布式架构下的实时的、多维的、交互式的查询、统计、分析引擎,具有万亿数据规模下的万级维度秒级统计分析能⼒,并具备企业级的稳定可靠表现。
YDB是⼀个细粒度的索引,精确粒度的索引。数据即时导⼊,索引即时⽣成,通过索引⾼效定位到相关数据。YDB与Spark深度集成,Spark直接对YDB检索结果集分析计算,同样场景让Spark性能加快百倍。
延云推荐配置
延云YDB⾼性能配置 (毫秒响应)
1.机器内存:128G
2.磁盘:企业级SSD,600~800G *12个磁盘
3.CPU:32线程(2颗,16核,32线程)
4.万兆⽹卡
延云YDB常规配置 (秒级响应)
1.机器内存:128G
2.磁盘:2T*12的磁盘
3.CPU:24线程(2颗,12核,24线程)
4.千兆⽹卡
指标⽐对
数据规模-数据节点数√1.在腾讯我们做到了53台机器 处理每天1800亿的⽇增量,总量达⼏万亿的数据规模(每条数据1kb左右)
2.在延云推荐的普通机器
以给的⽰例数据预估,每个节点每天实时处理30~50亿的数据⽐较适合。
处理的数据规模以及查询响应速度,根据节点数线性增长。
查询与统计功能灵活性√1.⽀持hive SQL表达,⽀持所有的hive内置函数,可以嵌套SQL,可以多表关联,也可以⾃定义UDF,UDAF
2. 内置的分词类型会确保查询准确度,不会出现漏查,内置的分词类型,很好的解决了lucene默认分词导致的查询数据缺失的问题。另外YDB可以⾃定义拓展任意的luene分词类型。如词库分词,语义分词,拼⾳分词等。
3.能⽀持任意维度的多维查询,多维统计,与分析。
检索与并发性能√常规情况下⽀持200~300的并发查询,持续性压测20天以上。
但是⽬前我的真实⽣产系统,确实没有很⼤的并发,最⼤的并发系统也就是每5分钟由系统触发的100并发的检索查询,但是查询完毕后会有5分钟的休息时间。
数据导⼊与时效性√数据从产⽣约1~2分钟,系统内可查每天千亿增量,总量可达万亿
稳定性-与单点故障√1.采⽤Spark Yarn的⽅式,系统宕机,硬件损坏,服务会⾃动迁移,数据不丢失。
2.有守护进程,⼀旦发现服务异常,⾃动重启服务,不需要运维⼈员亲⾃去机房重启机器。
3.延云YDB只需要部署在⼀台机器上,由Yarn⾃动分发,不需要维护⼀堆机器的配置,改参数很⽅便。易于部署,易于扩容,易于数据迁移;
4.多数据副本保护,硬件不怕硬件损坏;
5.服务异常能⾃动检测及恢复,减轻运维⼈员经常需要半夜起床的痛苦;
系统不能存在任何单点故障,当某个服务器存在问题时不能影响线上业务。
数据过百亿后,不能频繁的OOM,也不能出现节点调⽚的情况。
系统出现异常后,可以⾃动侦探服务异常,并⾃动重启恢复服务,不能每次调⽚都要运维  ⼈员半夜去机房重启。需要服务有⾃动迁移与恢复的特性,⼤幅减少运维⼈员驻场的⼯作量。
6.提供了导⼊与查询的限流控制,也提供了过载保护控制,甚⾄在极端场景提供了有损查询与有损服务
奔驰房车7.我们修正了⼤量的spark的bug,让系统⽐开源系统更稳定。
排序性能√采⽤延云独有的BLOCK SORT 技术,百亿数据秒级排序。技术原理请参考
⽤户接⼝√采⽤SQL的⽅式,⽤户学习陈本低。
⽀持HIVE的JDBC接⼊(编程),可以命令⾏接⼊(定时任务),http⽅式接⼊。
Hive的JDBC协议,已经是⼤数据的事实标准。
与常规⼤数据系统可⽆缝对接(如hive,spark,kafka等),也提供了拓展接⼝。
海量数据导⼊导出灵活⽅便,也可与常见的⽀持jdbc的报表⼯具、SQL可视化⼯具集成。
⽅便与周边系统的导⼊导出√
导出
⽀持原始数据的任意维度导出
可以全表,也可以通过过滤筛选局部导出
⽀持数据经过各种组合计算过滤后的导出
可以将YDB中的多个表与其他系统的多个表,进⾏组合筛选过滤计算后在导出
可以将多个数据从ydb的⼀张表导⼊到YDB的另外⼀张表
可以将YDB⾥⾯的数据导出到别的系统⾥⾯(如hive,hbase,数据库等)
也可以将其他系统的数据导⼊到YDB⾥⾯。
可以导出成⽂件,也可以从⽂件导⼊。
导⼊
采⽤SQL⽅式的批量导⼊,也⽀持kafka的流式导⼊
1.索引的设计实现,不会想solr与es那样将数据全部加载到内种内存中进⾏映射,这⽆论是在导⼊还是在查询过程中
均⼤幅的减少了OOM的风险。
2.在内存与磁盘多个区域不同合并策略,在结合控速逻辑,让导⼊占⽤的性能控制在⼀定范围之内,让系统更平稳,
尽量减少索引合并瞬间产⽣的⼏分钟占据了⼤量的资源的情况,分散资源的占⽤,让前台⽤户的查询更平稳。
3.结合了storm流式处理的优点,采⽤对接消息队列(如kafka)的⽅式,数据导⼊kafka后⼤约1~2分钟即可在
ydb中查到。
将数据存储在HDFS之上
1.YDB基于HDFS做了磁盘与⽹络做了读写控速逻辑。