当前位置: 代码迷 >> MySQL >> R语言与mysql组合处理交通数据及其算法优化
  详细解决方案

R语言与mysql组合处理交通数据及其算法优化

热度:58   发布时间:2016-05-05 17:01:01.0
R语言与mysql结合处理交通数据及其算法优化

一、序言

交通数据处理是智能交通的一个很关键的要素,更好的分析交通数据,可以为市政管理、交通信号管制、道路规划、交通设施建设提供更好的咨询和建议。全国各地政府都在寄期望于智能交通,以缓解城市拥堵,甚至一定程度上解决大城市病或者说是市政建设滞后的问题。同时,诸如百度地图、谷歌地图、高德地图、微软地图都推出了相应的交通应用,以期找到更大的商机。

用好的存储方法和好的算法进行分析,在批处理方面可以更多的分析历史数据,分析和发现问题,为未来进行预测以及公共查询服务;在实时计算方面可以更多的进行交通监控、突发事件处理、甚至是罪犯跟踪。

因此,寻求好的存储策略,好的计算算法,成为非常必要解决的问题。

二、数据概述及存储

目前交通数据有人流量数据、汽车数据。前者对于有大规模地铁,公共交通的城市十分有用,如北京,上海,其必要前提是能通过设备采集到人流信息,对于城市管理者而言,一卡通是最好的工具。因此,在北京,交通一卡通成为了监测地铁,公交车客流信息的主要采集工具,客流信息的采集准确性也相当高,信息数据的格式也容易控制。而后者,目前主要是通过摄像头拍照采集,存在图像识别度不高,设备故障影响,数据格式半结构化,数据缺失(部分信息没有采集到),脏数据存在(拍到人,自行车)。

对于贵阳这样的三线城市,一卡通还没有普及使用,故而监测公交客流人流信息存在诸多困难。但车流信息由于平安城市的建设,变得各个交通路口存在大量的摄像头以采集数据,从而为车流信息分析提供了可能。

对于贵阳车流信息数据,存储的格式是以日志形式传输到文件服务器,并压缩存储,每天大约有1千万条数据。想要处理这些文本,要分析其格式,并转存于数据库中。

在大数据流行的今天,最好的存储和处理方式当然是放在hdfs上,并用相应的nosql分布式数据库进行管理。但作为初步研究使用,分析的内容没有那么多,财力也没有那么大,所以找了一台小型机,16G的内存,CPU8*2.3MHZ。应用的数据信息也只有时间,拍摄地点,车牌号三个字段,故经过解析文本,直接将者三个字段存储到了mysql中,以便处理。

三、数据处理的目标

初步处理数据的目标有三个:

  1. 计算各个路口各个时段的车流量,以五分钟为间隔;
  2. 计算出城市交通通行有向图;
  3. 计算出各个路段各个时段的车流量,以五分钟为间隔;
  4. 根据结果1,画出城市车流信息可视化动态图(全部车流,公交出租车流、本地车流、外地车流,外城车流,柱状图);
  5. 根据结果1,结合gis画出,城市车流可视化图(热力图,柱状图);
  6. 根据结果2,3,画出交通流可视化图;
  7. 根据结果1,2,3,对第二天车流拥堵信息进行预测,给出绕行建议。

四、计算过程

目标1,可以通过sql语句很容易达到;目标2,在前一篇《R语言空间换时间算法、Hash键值对在字符串处理中的应用》已经完成;本篇主要解决目标3。

解决目标3的主要思想是根据同一辆车的运行轨迹,以经过路口为节点,按照到达某一路段的时间,并归结到划分的时间段内,并计数到该路段的某一时刻中。

直接上代码,以下是原始版本,其思想借鉴的前篇博文的算法思想,以空间换时间,遍历900多万条数据,只需要几个小时。

rm(list=ls(all=TRUE))gc()library(RODBC)library(hash)# 读取文件中排序好的路口地址数据address_file <-file("/home/wanglinlin/transport/address.txt","r") sorted_address <-readLines(address_file)sorted_address_hash_pairs<-hash(sorted_address,1:269)close(address_file)#矩阵的列表示形式transection_code_length <-length(sorted_address)*length(sorted_address)transection_code_matrix <- matrix(0,transection_code_length,1)k=1;for(i in 1:269){  for(j in 1:269){    transection_code_matrix[k]<- i*1000+j    k=k+1  }}transection_code_hash<-hash(transection_code_matrix,1:transection_code_length)#transection_code_hashtime<-file("/home/wanglinlin/transport/time.txt","r")traffic_time<-readLines(time)close(time)length(traffic_time)traffic_time_hash<-hash(traffic_time,1:length(traffic_time))traffic_time_hash[["1409501100"]]trafic_flow <- matrix(0,nrow=length(traffic_time),ncol=transection_code_length)channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport")  #连接mysql test 数据库sqlTables(channel)  # 显示test数据库中的表格trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time")trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901")trafic_flow_data<-as.matrix(trafic_flow_data)#找出所有车牌号,并散列化,形成键值对表plates<-sqlQuery(channel,"select distinct plate from transport20140901")odbcClose(channel)plate_list=(as.matrix(plates))[,1]plate_count=length(plate_list)plate_hash_pairs=hash(plate_list,1:plate_count)#各路段车流计数transection_code_hash[[toString(sorted_address_hash_pairs[[trafic_flow_data[1000,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[1001,3]]])]]trafic_flow_count_number<-as.numeric(trafic_flow_count)-1for(i in 1:trafic_flow_count_number){    if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){      start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300      count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300)      col_index_code <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]]      col_index <- transection_code_hash[[toString(col_index_code)]]      for(j in 1:count_times){        timestamp <- start_time_stamp + 300 * j        row_index <- traffic_time_hash[[toString(timestamp)]]        trafic_flow[row_index,col_index] <- trafic_flow[row_index,col_index] + 1      }    }}write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow.txt",row.names = FALSE,col.names = FALSE)#把trafic_flow中的数据读取第一个五分钟车流量到矩阵中five_minues_trafic_flow <- matrix(0,269,269)for(i in 1:269){  for (j in 1:269){    five_minues_trafic_flow[i,j]=trafic_flow[1,transection_code_hash[[toString(i*1000+j)]]]  }}write.table(five_minues_trafic_flow,"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)
上述代码中,最终的各路段车流信息存储于一个矩阵中,每行代表所有路段的一个时间段的车流信息。借助了对路口地址的编码和hash散列。建立编码和散列的过程一方面消耗了时间,另一方面也增加了内存开销。除此之外,将某一时段的车流信息以正规的方式显示,任然依赖于编码和散列,对后续不利。故可进一步优化这一部分,以三维数组存储车流信息,x,y平面刻画各路口的信息,z轴刻画时间变化。代码如下:

rm(list=ls(all=TRUE))gc()library(RODBC)library(hash)# 读取文件中按照拼音排序好的路口地址数据address_file <-file("/home/wanglinlin/transport/address.txt","r") sorted_address <-readLines(address_file)sorted_address_hash_pairs<-hash(sorted_address,1:269) #生成路口地址hash键值对,便于通过路口地址名称找序号close(address_file)#sorted_address#class(sorted_address_hash_pairs[["黔灵西路(合群路口-威清路口)"]])time<-file("/home/wanglinlin/transport/time.txt","r")traffic_time<-readLines(time)close(time)length(traffic_time)traffic_time_hash<-hash(traffic_time,1:length(traffic_time))traffic_time_hash[["1409501100"]]channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport")  #连接mysql test 数据库sqlTables(channel)  # 显示test数据库中的表格trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time")trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901")trafic_flow_data<-as.matrix(trafic_flow_data)#找出所有车牌号,并散列化,形成键值对表plates<-sqlQuery(channel,"select distinct plate from transport20140901")odbcClose(channel)plate_list=(as.matrix(plates))[,1]plate_count=length(plate_list)plate_hash_pairs=hash(plate_list,1:plate_count)trafic_flow <- array(0, dim=c(269,269,length(traffic_time))) trafic_flow_count_number<-as.numeric(trafic_flow_count)-1for(i in 1:trafic_flow_count_number){  if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){    start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300    count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300)    x_index <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]]     y_index <- sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]]    for(j in 1:count_times){      timestamp <- start_time_stamp + 300 * j      z_index <- traffic_time_hash[[toString(timestamp)]]      trafic_flow[x_index,y_index,z_index] <- trafic_flow[x_index,y_index,z_index] + 1    }  }}write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow_roads.txt",row.names = FALSE,col.names = FALSE)#把trafic_flow中的数据读取第一个五分钟车流量到矩阵中#write.table(trafic_flow[,,1],"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)

上述代码,解决了前述的问题,但时间戳还用到了hash,这个hash也可以去掉,并将其统一化,用函数的方式将功能改的更加通用化,代码还需进一步优化。

可以看到的是,算法通过优化,具备了更好的额性能,只有通过不断优化,才会更好。

五、总结

解决了一个计算目标,下次也算迈出了新的一步,如果有对算法的最终优化版,到时候再贴上来。



==================================关于原创的声明==================================

本博主的所有原创文章,非本人书面授权,严禁转载。

可以应用于一切非商用的内部学习和讨论研究。

欢迎通过留言或email进行广泛交流。





  相关解决方案