博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Graphx处理janusGraph数据实现
阅读量:6415 次
发布时间:2019-06-23

本文共 7299 字,大约阅读时间需要 24 分钟。

hot3.png

声明: 此方案是在spark直接执行gremlinSQL方案实现受阻的情况下的备选方案,不涉及工作机密,不存在泄密可能,纯属个人思考,希望抛砖引玉

方案: 将gremlinSql的查询结果转化为startGraph,然后转写到HDFS,spark读取hdfs的starGraphJSon构建graphx可用的图,然后就可以调用graphx丰富的图计算算法;从而将实现graphX操作janusgraph的目的

1. gremlinSql的查询结果转换成starGraphJson

由于org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter保存的graphSon格式无法满足需求,所以将查询出的带path的点边数据自己转换成单点图的json结构,转化方法如下,然后存到hdfs,存储方法不再赘述。

public StringBuilder generatorStarGraphJson(Vertex vertex, Edge edge, StringBuilder starGraphJson){	String inVId;	String outVId;	String VId;	starGraphJson.append("{").append("\"id\":"+vertex.id()+","+"\"label\":\""+vertex.label()+"\",");	//这种情况有outE和inE,outE中会有inV信息,inE会有一个inV	inVId = edge.inVertex().id().toString();	outVId = edge.outVertex().id().toString();	VId = vertex.id().toString();	if(inVId.equalsIgnoreCase(VId)){		starGraphJson.append("\"outE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",")				.append("\"inV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},");	}else if(outVId.equalsIgnoreCase(VId)){		starGraphJson.append("\"inE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",")				.append("\"outV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},");	}else{		throw new Exception("点边不对应数据错误!!!");	}	//拼接点的properties	starGraphJson.append("\"properties\":{").append(concatVertexProperties(vertex)).append("}}");	return  starGraphJson;}

2. spark读取指定路径的starGraph转成graph

class GraphSon2GraphXRDD() extends Serializable {def getGraphConf(HDFSFilePath : String): BaseConfiguration ={ val inputGraphConf = new BaseConfiguration inputGraphConf.setProperty("gremlin.graph", classOf\[HadoopGraph\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_GRAPH\_READER, classOf\[GraphSONInputFormat\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_INPUT\_LOCATION, HDFSFilePath) inputGraphConf.setProperty(Constants.MAPREDUCE\_INPUT\_FILEINPUTFORMAT_INPUTDIR, HDFSFilePath) inputGraphConf }def getSc(sparkHost:String ,isRemote:Boolean): SparkContext ={ var sparkConf = new SparkConf() if(isRemote){ //待完善 }else{ sparkConf.setMaster("local\[*\]").setAppName("GraphSon2GraphX") } val sc = new SparkContext(sparkConf) sc }def getJavaRDD(conf : BaseConfiguration, sc : SparkContext): JavaPairRDD\[AnyRef, VertexWritable\] ={ val jsc = JavaSparkContext.fromSparkContext(sc) val graphRDDInput = new InputFormatRDD val vertexWritableJavaPairRDD = graphRDDInput.readGraphRDD(conf, jsc) vertexWritableJavaPairRDD }def getVertexRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] ={ vertexWritableJavaPairRDD.rdd.map((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { // Get the center vertex val v = tuple2._2.get val g = StarGraph.of(v) // In case the vertex id in TinkerGraph is not long type // val vid = convertStringIDToLongID([v.id](http://v.id)().toString) val vid = [v.id](http://v.id)().toString.toLong // Pass the vertex properties to GraphX vertex value map and remain the original vertex id var graphxValueMap : util.HashMap\[String,java.io.Serializable\] = new util.HashMapString,java.io.Serializable graphxValueMap.put("originalID",[v.id](http://v.id)().toString) graphxValueMap.putAll(g.traversal.V([v.id](http://v.id)).valueMap().next(1).get(0)) (vid,graphxValueMap) }) }def getEdgeRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[graphx.Edge\[util.HashMap\[String, java.io.Serializable\]\]\] ={ val edge = vertexWritableJavaPairRDD.rdd.flatMap((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { val v = tuple2._2.get val g = StarGraph.of(v) val edgelist:util.List\[Edge\] = g.traversal.V([v.id](http://v.id)).outE().toList  // Put all edges of the center vertex into the list  val list = new collection.mutable.ArrayBuffer[graphx.Edge[util.HashMap[String,java.io.Serializable]]]()  var x = 0  for(x <- 0 until edgelist.size()){    var srcId = edgelist.get(x).inVertex.id().toString    var dstId = edgelist.get(x).outVertex.id().toString    //        val md1 = convertStringIDToLongID(srcId)    //        val md2 = convertStringIDToLongID(dstId)    val md1 = srcId.toLong    val md2 = dstId.toLong    // Get the properties of the edge    var edgeAttr = new util.HashMap[String,java.io.Serializable]()    var perporties : util.Iterator[Property[Nothing]] = edgelist.get(x).properties()    while(perporties.hasNext){      val property = perporties.next()      edgeAttr.put(property.key(),property.value().toString)    }    list.append(graphx.Edge(md1,md2,edgeAttr))  }  list})val edgeRDD = edge.distinct()edgeRDD}def doLAP(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], iterationNum : Int): Array\[Array\[String\]\] = { val vertexRDD = getVertexRDD(vertexWritableJavaPairRDD)val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD)val graph = graphx.Graph[util.HashMap[String,java.io.Serializable],  util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]())val LVMRsult = lib.LabelPropagation.run(graph , iterationNum).vertices.collect.sortWith (_._1 < _._1).map(f => {  println(f.toString())  f})getFinalCommunit(LVMRsult)}def getFinalCommunit(LVMRsult:Array\[(Long,Long)\]): Array\[Array\[String\]\] ={ var result = new Array[Array\[String\]](LVMRsult.length) var tmp = new ArrayBufferString for(i <- 0 until LVMRsult.length){ var k = 0 val array = new ArrayBufferString  //社区中包含多个值  for(j <- (i+1) until LVMRsult.length) {    if(LVMRsult(i)._2.equals(LVMRsult(j)._2)){      if(!tmp.contains(LVMRsult(i)._1.toString)){        array += LVMRsult(i)._1.toString        tmp += LVMRsult(i)._1.toString      }      if(!tmp.contains(LVMRsult(j)._1.toString)){        array += LVMRsult(j)._1.toString        tmp += LVMRsult(j)._1.toString      }      k = k+1    }  }  //自己为一个社区  if(k.equals(0)){    if(!tmp.contains(LVMRsult(i)._1.toString)){      array += LVMRsult(i)._1.toString      tmp += LVMRsult(i)._1.toString    }  }  if(array.length > 0){    result.update(i,array.toArray.distinct)  }}result.filter(f => {  println(if (f.length >0) f.mkString("(",",",")"))  f != null})}def doPageRank(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], stopThreshold : Double): Array\[Array\[Any\]\] = { val vertexRDD:RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] = getVertexRDD(vertexWritableJavaPairRDD)val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD)val graph = graphx.Graph[util.HashMap[String,java.io.Serializable],  util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]())val gpgraph = graph.pageRank(stopThreshold).cache()val titleAndPrGraph = graph.outerJoinVertices(gpgraph.vertices) {  (v, title, rank) => (rank.getOrElse(0.0), title)}//倒序 false  正序 true// titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) => entry.\_2.\_1, false).foreach(f => println(f.\_1+":"+f.\_2._1))val pageRank = titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) => entry._2._1, false).map(f => {  println(f._1+":"+f._2._1)  Array(f._1.toString,f._2._1)})pageRank.collect()}}

这样就贯通了janusgraph和graphx,调用graphx的丰富的图计算功能就畅通无阻,就是实现有点挫,希望抛砖引玉

转载于:https://my.oschina.net/zhouwang93/blog/3052345

你可能感兴趣的文章
WPF 虚拟键盘
查看>>
储存卡无法打开专家教您怎么数据恢复
查看>>
彼得原理
查看>>
如何利用【百度地图API】,制作房产酒店地图?(下)——结合自己的数据库...
查看>>
[20171113]修改表结构删除列相关问题3.txt
查看>>
特征选择
查看>>
在Winform程序中设置管理员权限及为用户组添加写入权限
查看>>
RTMP直播到FMS中的AAC音频直播
查看>>
多能互补提速 加快我国能源转型和现代能源体系建设
查看>>
《JavaScript设计模式》——2.5 多种调用方式——多态
查看>>
Redis开发运维实践高可用和集群架构与实践(二)
查看>>
程序员的常见“谎话”:对,这是一个已知 Bug
查看>>
如何侦查SQL执行状态
查看>>
CentOS 7 命令行如何连接无线网络
查看>>
Ubuntu 12.04上享用新版本Linux的功能
查看>>
logstash + grok 正则语法
查看>>
Zimbra开源版(v8.6)安装说明
查看>>
Android性能优化之TraceView和Lint使用详解
查看>>
linux centos7.2 安装mysq,nginx,php
查看>>
myrocks之事务处理
查看>>