[[206073]] ####背景:#### Spark graphxは最大クリークマイニングアルゴリズムを提供していない 現在の最大クリーク アルゴリズムはすべて、Bron-Kerbosch アルゴリズムに基づくシリアル化されたアルゴリズムです。 ####考え:#### Spark graphx は、接続されたグラフのアルゴリズムを提供します。接続されたグラフと最大クラスターは、無向グラフの概念です。最大クラスターは、接続されたグラフのサブセットです。 Spark graphx を使用して接続されたグラフを見つけ、シリアル化された最大クリーク アルゴリズムを使用して各接続グラフから最大クリークを見つけます (疑似並列化) 相関関係が強いグラフの場合、見つかった接続グラフは非常に大きくなります。この場合、シリアル化された最大クリークアルゴリズムは依然として長い時間がかかります。ここでは、プルーニングの考え方を使用してサンプルデータの量を減らしますが、大きなグラフの場合、最適化スペースは限られています。 真に並列化された最大クリークアルゴリズムを期待する ####設定ファイル:#### - graph_data_path=hdfs://localhost/graph_data
- out_path=hdfs://localhost/clique
- ck_path=hdfs://localhost/チェックポイント
- numIter=50 剪定回数
- count =3 クラスター内の頂点の最大数
- algorithm=2 最大クリークアルゴリズム、1: 個人実装 2: jgrapht
- percent=90 剪定後の頂点の数(前の数に対する割合)。剪定後にデータの 90% が残っている場合、剪定効率は高くありません。
- spark.master=ローカル
- spark.app.name =グラフ
- spark.serializer=org.apache.spark.serializer.KryoSerializer
- spark.yarn.executor.memoryOverhead=20480
- spark.yarn.driver.memoryOverhead=20480
- spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
- spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
- spark.driver.maxResultSize=10g
- spark.default.parallelism =60
グラフ ####サンプルデータ:#### {"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"} ####サンプル画像:#### ####出力:#### 0,1,2 0,2,3 3,4,5 4,5,6 ####コード実装:#### - java.util をインポートします。java.util.Properties をインポートします。
- org.apache.spark.broadcast.Broadcast をインポートします。
- org.apache.spark.graphx.{Edge, Graph} をインポートします。
- org.apache.spark.rdd.RDD をインポートします。
- org.apache.spark.sql.{Row, SQLContext} をインポートします。
- org.apache.spark.storage.StorageLevel をインポートします
- org.apache.spark.{SparkConf, SparkContext} をインポートします。
- org.jgrapht.alg.BronKerboschCliqueFinder をインポートします。
- org.jgrapht.graph.{DefaultEdge, SimpleGraph} をインポートします。
-
- scala.collection.JavaConverters._ をインポートします。
- scala.collection.mutable をインポートする
-
- オブジェクトApplicationTitan {
- def main(args: 配列[文字列]) {
- val prop = 新しいプロパティ()
- prop.load (getClass.getResourceAsStream( "/config.properties" ))
-
- val graph_data_path = prop.getProperty( "graph_data_path" )
- val out_path = prop.getProperty( "out_path" )
- val ck_path = prop.getProperty( "ck_path" )
- val count = Integer .parseInt(prop.getProperty( "count" ))
- val numIter = Integer .parseInt(prop.getProperty( "numIter" ))
- val アルゴリズム = Integer .parseInt(prop.getProperty( "アルゴリズム" ))
- val パーセント = Integer .parseInt(prop.getProperty( "パーセント" ))
- val conf = 新しい SparkConf()
- 試す {
- Runtime.getRuntime.exec ( "hdfs dfs -rm -r" + out_path )
- // Runtime.getRuntime.exec ( "cmd.exe /C rd /s /q " + out_path)
- } キャッチ {
- ケース例: 例外 =>
- ex.printStackTrace(System.out )で
- }
-
- prop.stringPropertyNames().asScala.foreach(s => {
- s.startsWith( "spark" )の場合
- conf.set (s、prop.getProperty(s))
- }
- })
- conf.registerKryoClasses(配列(getClass))
- val sc = 新しい SparkContext(conf)
- sc.setLogLevel( "エラー" )
- sc.setCheckpointDir(ck_path)
- val sqlc = 新しいSQLContext(sc)
- 試す {
- val e_df = sqlc.read
- // .json(グラフデータパス)
- .parquet(グラフデータパス)
-
- var e_rdd = e_df
- .mapPartitions(it => {
- it.map({
- ケースRow(dst: 文字列、src: 文字列) =>
- val src_long = src.toLong
- val dst_long = dst.toLong
- src_long < dst_long の場合 (src_long、dst_long)、それ以外の場合(dst_long、src_long)
- })
- })。明確な()
- e_rdd.persist(ストレージレベル.MEMORY_AND_DISK_SER)
-
- var bc: ブロードキャスト[ Set [Long]] = null
- var 反復 = 0
- 変数bc_size = 0
- //剪定
- 繰り返し回数 <= 繰り返し回数) {
- val temp = e_rdd
- .flatMap(x => リスト((x._1, 1), (x._2, 1)))
- .reduceByKey((x, y) => x + y)
- .filter(x => x._2 >=カウント- 1)
- .mapPartitions(it => it.map(x => x._1))
- val bc_value = temp .collect().toSet
- bc = sc.broadcast(bc_value)
- e_rdd = e_rdd.filter(x => bc.value.contains (x._1) && bc.value.contains ( x._2))
- e_rdd.persist(ストレージレベル.MEMORY_AND_DISK_SER)
- 反復 += 1
- bc_size != 0 && bc_value.size >= bc_size * パーセント / 100 の場合 {
- println( "合計反復回数: " + 反復回数)
- 反復= Int.MaxValue
- }
- bc_size = bc_value.サイズ
- }
-
- // 構成図
- val エッジ: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))
- val グラフ = Graph.fromEdges(エッジ、0、ストレージレベル.MEMORY_AND_DISK_SER、ストレージレベル.MEMORY_AND_DISK_SER)
-
- //接続グラフ
- val cc = graph.connectedComponents().vertices
- cc.persist(ストレージレベル.MEMORY_AND_DISK_SER)
-
- cc.join (e_rdd)
- .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))
- .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)
- .mapPartitions(it => it.map(x => (x._1.部分文字列(1), x._2)))
- .aggregateByKey(リスト[(Long, Long)]())((リスト1, リスト2) => リスト1 ::: リスト2、(リスト3, リスト4) => リスト3 ::: リスト4)
- .filter(x => x._2.サイズ>=カウント- 1)
- .flatMap(x => {
- if (アルゴリズム == 1)
- find(x, count )を見つける
- それ以外
- find2(x,カウント)
- })
- .mapPartitions(it => {
- it.map({
- 場合 設定=>
- var temp = ""
- .asScala.foreach(x => temp += x + "," )を設定します。
- temp .部分文字列(0, temp . 長さ - 1)
- ケース_ =>
- })
- })
- // .合体(1)
- .saveAsTextFile(出力パス)
- }
-
- キャッチ{
- ケース例: 例外 =>
- ex.printStackTrace(System.out )で
- }
- sc.stop()
- }
- //最大クリークアルゴリズムの独自の実装
- def find(x: (String, List[(Long, Long)]), count : Int ): mutable. Set [util. Set [String]] = {
- println(x._1 + "|s|" + x._2.size )
- println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
- val neighbors = 新しい util.HashMap[String, util.Set [String]]
- val finder = 新しい CliqueFinder(近隣、カウント)
- x._2.foreach(r => {
- val v1 = r._1.toString
- val v2 = r._2.toString
- if (neighbors.containsKey(v1)) {
- 隣人.get(v1) .add (v2)
- }それ以外{
- val temp = 新しい util.HashSet[String]()
- 一時追加(v2 )
- 近隣住民.put(v1, temp )
- }
- if (neighbors.containsKey(v2)) {
- 隣人.get(v2) .add (v1)
- }それ以外{
- val temp = 新しい util.HashSet[String]()
- 一時追加(v1 )
- 近隣住民.put(v2, temp )
- }
- })
- println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
- finder.findMaxCliques().asScala
- }
- //jgrapht の最大クリークアルゴリズム
- def find2(x: (String, List[(Long, Long)]), count : Int ): Set [util. Set [String]] = {
- println(x._1 + "|s|" + x._2.size )
- println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
- val to_clique = 新しい SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])
- x._2.foreach(r => {
- val v1 = r._1.toString
- val v2 = r._2.toString
- to_clique.addVertex(v1)
- to_clique.addVertex(v2)
- to_clique.addEdge(v1, v2)
- })
- val ファインダー = 新しい BronKerboschCliqueFinder(to_clique)
- val リスト = finder.getAllMaximalCliques.asScala
- var result = Set [util. Set [String]]()
- リスト.foreach(x => {
- ( x.size () >= count )の場合
- 結果 = 結果 + x
- })
- println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
- 結果
- }
- }
//最大クリークアルゴリズムの独自の実装 - java.util.* をインポートします。
-
- /**
- * [@author](https://my.oschina.net/arthor) [email protected]
- * [@日付](https://my.oschina.net/u/2504391) 2017/7/31
- */
- パブリッククラスCliqueFinder {
- プライベート Map<String, Set <String>> 近隣ノード;
- private Set <String> ノード;
- プライベートSet < Set <String>> maxCliques = new HashSet<>();
- プライベート整数minSize;
-
- パブリックCliqueFinder(Map<String, Set <String>> neighbors, Integer minSize) {
- this.neighbors = 隣人;
- this.nodes = neighbors.keySet();
- 最小サイズ = 最小サイズ;
- }
-
- private void bk3( <String> クリーク、<String> 候補リスト、<String> 除外リストを設定) {
- 候補が空の場合(&&除外が空の場合)
- clique.isEmpty() の場合、 clique.size () が minSize 以上であるとき、
- maxCliques.add (クリーク);
- }
- 戻る;
- }
-
- (文字列s : degeneracy_order(候補)) {
- List<String> new_candidates = 新しいArrayList<>(candidates);
- new_candidates.retainAll(neighbors.get(s));
-
- List<String> new_excluded = 新しい ArrayList<>(excluded);
- new_excluded.retainAll(近隣ノードを取得(s));
- <String>を設定しますnextClique = new HashSet<>(clique);
- nextClique.add (s);
- bk2(nextClique、新しい候補、新しい除外);
- 候補者を削除します。
- 除外。追加(s);
- }
- }
-
- private void bk2( <String> クリーク、<String> 候補リスト、<String> 除外リストを設定) {
- 候補が空の場合(&&除外が空の場合)
- clique.isEmpty() の場合、 clique.size () が minSize 以上であるとき、
- maxCliques.add (クリーク);
- }
- 戻る;
- }
- 文字列 pivot = pick_random(候補);
- ピボットがnullの場合
- pivot = pick_random(除外);
- }
- List<String> tempc = new ArrayList<>(候補);
- tempc.removeAll(neighbors.get(pivot));
-
- (文字列 s : tempc) {
- List<String> new_candidates = 新しいArrayList<>(candidates);
- new_candidates.retainAll(neighbors.get(s));
-
- List<String> new_excluded = 新しい ArrayList<>(excluded);
- new_excluded.retainAll(近隣ノードを取得(s));
- <String>を設定しますnextClique = new HashSet<>(clique);
- nextClique.add (s);
- bk2(nextClique、新しい候補、新しい除外);
- 候補者を削除します。
- 除外。追加(s);
- }
- }
-
- プライベートリスト<String> degenerate_order(リスト<String> innerNodes) {
- List<String> 結果 = 新しい ArrayList<>();
- Map<String, Integer > deg = new HashMap<>();
- (文字列ノード: innerNodes) {
- deg.put(node, neighbors.get(node) .size ());
- }
- while (!deg.isEmpty()) {
- 整数 min = Collections.min (deg.values ( ) );
- 文字列 minKey = null ;
- (文字列キー: deg.keySet()) {
- if (deg.get(キー).equals(最小値)) {
- minKey =キー;
- 壊す;
- }
- }
- 結果にminKey を追加します。
- deg.remove(minKey);
- (文字列 k : neighbors.get(minKey)) {
- (度を含むキー(k))の場合
- deg.put(k, deg.get(k) - 1);
- }
- }
-
- }
- 結果を返します。
- }
-
-
- プライベート文字列pick_random(List<String> random) {
- ランダム != null && !random.isEmpty() の場合 {
- random.get(0)を返します。
- }それ以外{
- 戻る ヌル;
- }
- }
-
- 公共 設定<設定 <文字列>> findMaxCliques() {
- this.bk3(新しいHashSet<>()、新しいArrayList<>(ノード)、新しいArrayList<>());
- maxCliquesを返します。
- }
-
- 公共 静的void main(String[] args) {
- Map<String, Set <String>> neighbors = new HashMap<>();
- neighbors.put( "0" , new HashSet<>(Arrays.asList( "1" , "2" , "3" )));
- neighbors.put( "1" , new HashSet<>(Arrays.asList( "0" , "2" )));
- neighbors.put( "2" , new HashSet<>(Arrays.asList( "0" , "1" , "3" , "6" )));
- neighbors.put( "3" , new HashSet<>(Arrays.asList( "0" , "2" , "4" , "5" )));
- neighbors.put( "4" , new HashSet<>(Arrays.asList( "3" , "5" , "6" )));
- neighbors.put( "5" , new HashSet<>(Arrays.asList( "3" , "4" , "6" )));
- neighbors.put( "6" , new HashSet<>(Arrays.asList( "2" , "4" , "5" )));
- neighbors.put( "7" , new HashSet<>(Arrays.asList( "6" )));
- CliqueFinder ファインダー = 新しい CliqueFinder(近隣、3);
- finder.bk3(新しい HashSet<>()、新しい ArrayList<>(neighbors.keySet())、新しい ArrayList<>());
- システム.out.println (finder.maxCliques);
- }
- }
|