Spark Graphx はグラフの最大クリークマイニング、疑似並列アルゴリズムを実装します

Spark Graphx はグラフの最大クリークマイニング、疑似並列アルゴリズムを実装します

[[206073]]

####背景:####

Spark graphxは最大クリークマイニングアルゴリズムを提供していない

現在の最大クリーク アルゴリズムはすべて、Bron-Kerbosch アルゴリズムに基づくシリアル化されたアルゴリズムです。

####考え:####

Spark graphx は、接続されたグラフのアルゴリズムを提供します。接続されたグラフと最大クラスターは、無向グラフの概念です。最大クラスターは、接続されたグラフのサブセットです。

Spark graphx を使用して接続されたグラフを見つけ、シリアル化された最大クリーク アルゴリズムを使用して各接続グラフから最大クリークを見つけます (疑似並列化)

相関関係が強いグラフの場合、見つかった接続グラフは非常に大きくなります。この場合、シリアル化された最大クリークアルゴリズムは依然として長い時間がかかります。ここでは、プルーニングの考え方を使用してサンプルデータの量を減らしますが、大きなグラフの場合、最適化スペースは限られています。

真に並列化された最大クリークアルゴリズムを期待する

####設定ファイル:####

  1. graph_data_path=hdfs://localhost/graph_data
  2. out_path=hdfs://localhost/clique
  3. ck_path=hdfs://localhost/チェックポイント 
  4. numIter=50 剪定回数
  5. count =3 クラスター内の頂点の最大数
  6. algorithm=2 最大クリークアルゴリズム、1: 個人実装 2: jgrapht
  7. percent=90 剪定後の頂点の数(前の数に対する割合)。剪定後にデータの 90% が残っている場合、剪定効率は高くありません。
  8. spark.master=ローカル 
  9. spark.app.name =グラフ
  10. spark.serializer=org.apache.spark.serializer.KryoSerializer
  11. spark.yarn.executor.memoryOverhead=20480
  12. spark.yarn.driver.memoryOverhead=20480
  13. spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
  14. spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
  15. spark.driver.maxResultSize=10g
  16. spark.default.parallelism =60

グラフ

####サンプルデータ:####

{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}

####サンプル画像:####

####出力:####

0,1,2 0,2,3 3,4,5 4,5,6

####コード実装:####

  1. java.util をインポートします。java.util.Properties をインポートします。
  1. org.apache.spark.broadcast.Broadcast をインポートします。
  2. org.apache.spark.graphx.{Edge, Graph} をインポートします。
  3. org.apache.spark.rdd.RDD をインポートします。
  4. org.apache.spark.sql.{Row, SQLContext} をインポートします。
  5. org.apache.spark.storage.StorageLevel をインポートします
  6. org.apache.spark.{SparkConf, SparkContext} をインポートします。
  7. org.jgrapht.alg.BronKerboschCliqueFinder をインポートします。
  8. org.jgrapht.graph.{DefaultEdge, SimpleGraph} をインポートします。
  9.  
  10. scala.collection.JavaConverters._ をインポートします。
  11. scala.collection.mutable をインポートする
  12.  
  13. オブジェクトApplicationTitan {
  14. def main(args: 配列[文字列]) {
  15. val prop = 新しいプロパティ()
  16. prop.load (getClass.getResourceAsStream( "/config.properties" ))
  17.      
  18. val graph_data_path = prop.getProperty( "graph_data_path" )
  19. val out_path = prop.getProperty( "out_path" )
  20. val ck_path = prop.getProperty( "ck_path" )
  21. val count = Integer .parseInt(prop.getProperty( "count" ))
  22. val numIter = Integer .parseInt(prop.getProperty( "numIter" ))
  23. val アルゴリズム = Integer .parseInt(prop.getProperty( "アルゴリズム" ))
  24. val パーセント = Integer .parseInt(prop.getProperty( "パーセント" ))
  25. val conf = 新しい SparkConf()
  26. 試す {
  27. Runtime.getRuntime.exec ( "hdfs dfs -rm -r" + out_path )
  28. // Runtime.getRuntime.exec ( "cmd.exe /C rd /s /q " + out_path)
  29. } キャッチ {
  30. ケース例: 例外 =>
  31. ex.printStackTrace(System.out )
  32. }
  33.      
  34. prop.stringPropertyNames().asScala.foreach(s => {
  35. s.startsWith( "spark" )の場合
  36. conf.set (s、prop.getProperty(s))
  37. }
  38. })
  39. conf.registerKryoClasses(配列(getClass))
  40. val sc = 新しい SparkContext(conf)
  41. sc.setLogLevel( "エラー" )
  42. sc.setCheckpointDir(ck_path)
  43. val sqlc = 新しいSQLContext(sc)
  44. 試す {
  45. val e_df = sqlc.read  
  46. // .json(グラフデータパス)
  47. .parquet(グラフデータパス)
  48.  
  49. var e_rdd = e_df
  50. .mapPartitions(it => {
  51. it.map({
  52. ケースRow(dst: 文字列、src: 文字列) =>
  53. val src_long = src.toLong
  54. val dst_long = dst.toLong
  55. src_long < dst_long の場合 (src_long、dst_long)、それ以外の場合(dst_long、src_long)
  56. })
  57. })。明確な()
  58. e_rdd.persist(ストレージレベル.MEMORY_AND_DISK_SER)
  59.      
  60. var bc: ブロードキャスト[ Set [Long]] = null  
  61. var 反復 = 0
  62. 変数bc_size = 0
  63. //剪定
  64. 繰り返し回数 <= 繰り返し回数) {
  65. val temp = e_rdd
  66. .flatMap(x => リスト((x._1, 1), (x._2, 1)))
  67. .reduceByKey((x, y) => x + y)
  68. .filter(x => x._2 >=カウント- 1)
  69. .mapPartitions(it => it.map(x => x._1))
  70. val bc_value = temp .collect().toSet
  71. bc = sc.broadcast(bc_value)
  72. e_rdd = e_rdd.filter(x => bc.value.contains (x._1) && bc.value.contains ( x._2))
  73. e_rdd.persist(ストレージレベル.MEMORY_AND_DISK_SER)
  74. 反復 += 1
  75. bc_size != 0 && bc_value.size >= bc_size * パーセント / 100 の場合 {
  76. println( "合計反復回数: " + 反復回数)
  77. 反復= Int.MaxValue
  78. }
  79. bc_size = bc_value.サイズ 
  80. }
  81.      
  82. // 構成図
  83. val エッジ: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))
  84. val グラフ = Graph.fromEdges(エッジ、0、ストレージレベル.MEMORY_AND_DISK_SER、ストレージレベル.MEMORY_AND_DISK_SER)
  85.      
  86. //接続グラフ
  87. val cc = graph.connectedComponents().vertices
  88. cc.persist(ストレージレベル.MEMORY_AND_DISK_SER)
  89.      
  90. cc.join (e_rdd)
  91. .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))
  92. .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)
  93. .mapPartitions(it => it.map(x => (x._1.部分文字列(1), x._2)))
  94. .aggregateByKey(リスト[(Long, Long)]())((リスト1, リスト2) => リスト1 ::: リスト2、(リスト3, リスト4) => リスト3 ::: リスト4)
  95. .filter(x => x._2.サイズ>=カウント- 1)
  96. .flatMap(x => {
  97. if (アルゴリズム == 1)
  98. find(x, count )を見つける
  99. それ以外 
  100. find2(x,カウント)
  101. })
  102. .mapPartitions(it => {
  103. it.map({
  104. 場合 設定=>
  105. var temp = ""  
  106. .asScala.foreach(x => temp += x + "," )を設定します
  107. temp .部分文字列(0, temp . 長さ - 1)
  108. ケース_ =>
  109. })
  110. })
  111. // .合体(1)
  112. .saveAsTextFile(出力パス)
  113. }
  114.  
  115. キャッチ{
  116. ケース例: 例外 =>
  117. ex.printStackTrace(System.out )
  118. }
  119. sc.stop()
  120. }
  121. //最大クリークアルゴリズムの独自の実装
  122. def find(x: (String, List[(Long, Long)]), count : Int ): mutable. Set [util. ​​Set [String]] = {
  123. println(x._1 + "|s|" + x._2.size )
  124. println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
  125. val neighbors = 新しい util.HashMap[String, util.Set [String]]
  126. val finder = 新しい CliqueFinder(近隣、カウント)
  127. x._2.foreach(r => {
  128. val v1 = r._1.toString
  129. val v2 = r._2.toString
  130. if (neighbors.containsKey(v1)) {
  131. 隣人.get(v1) .add (v2)
  132. }それ以外{
  133. val temp = 新しい util.HashSet[String]()
  134. 一時追加(v2 )
  135. 近隣住民.put(v1, temp )
  136. }
  137. if (neighbors.containsKey(v2)) {
  138. 隣人.get(v2) .add (v1)
  139. }それ以外{
  140. val temp = 新しい util.HashSet[String]()
  141. 一時追加(v1 )
  142. 近隣住民.put(v2, temp )
  143. }
  144. })
  145. println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
  146. finder.findMaxCliques().asScala
  147. }
  148. //jgrapht の最大クリークアルゴリズム
  149. def find2(x: (String, List[(Long, Long)]), count : Int ): Set [util. ​​Set [String]] = {
  150. println(x._1 + "|s|" + x._2.size )
  151. println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
  152. val to_clique = 新しい SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])
  153. x._2.foreach(r => {
  154. val v1 = r._1.toString
  155. val v2 = r._2.toString
  156. to_clique.addVertex(v1)
  157. to_clique.addVertex(v2)
  158. to_clique.addEdge(v1, v2)
  159. })
  160. val ファインダー = 新しい BronKerboschCliqueFinder(to_clique)
  161. val リスト = finder.getAllMaximalCliques.asScala
  162. var result = Set [util. ​​Set [String]]()
  163. リスト.foreach(x => {
  164. ( x.size () >= count )の場合
  165. 結果 = 結果 + x
  166. })
  167. println( "BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
  168. 結果
  169. }
  170. }

//最大クリークアルゴリズムの独自の実装

  1. java.util.* をインポートします。
  2.  
  3. /**
  4. * [@author](https://my.oschina.net/arthor) [email protected]
  5. * [@日付](https://my.oschina.net/u/2504391) 2017/7/31
  6. */
  7. パブリッククラスCliqueFinder {
  8. プライベート Map<String, Set <String>> 近隣ノード;
  9. private Set <String> ノード;
  10. プライベートSet < Set <String>> maxCliques = new HashSet<>();
  11. プライベート整数minSize;
  12.  
  13. パブリックCliqueFinder(Map<String, Set <String>> neighbors, Integer minSize) {
  14. this.neighbors = 隣人;
  15. this.nodes = neighbors.keySet();
  16. 最小サイズ = 最小サイズ;
  17. }
  18.  
  19. private void bk3( <String> クリーク、<String> 候補リスト、<String> 除外リストを設定) {
  20. 候補が空の場合(&&除外が空の場合)
  21. clique.isEmpty() の場合、 clique.size () が minSize 以上であるとき、
  22. maxCliques.add (クリーク);
  23. }
  24. 戻る;
  25. }
  26.  
  27. (文字s : degeneracy_order(候補)) {
  28. List<String> new_candidates = 新しいArrayList<>(candidates);
  29. new_candidates.retainAll(neighbors.get(s));
  30.  
  31. List<String> new_excluded = 新しい ArrayList<>(excluded);
  32. new_excluded.retainAll(近隣ノードを取得(s));
  33. <String>を設定しますnextClique = new HashSet<>(clique);
  34. nextClique.add (s);
  35. bk2(nextClique、新しい候補、新しい除外);
  36. 候補者を削除します。
  37. 除外。追加(s);
  38. }
  39. }
  40.  
  41. private void bk2( <String> クリーク、<String> 候補リスト、<String> 除外リストを設定) {
  42. 候補が空の場合(&&除外が空の場合)
  43. clique.isEmpty() の場合、 clique.size () が minSize 以上であるとき、
  44. maxCliques.add (クリーク);
  45. }
  46. 戻る;
  47. }
  48. 文字列 pivot = pick_random(候補);
  49. ピボットがnull場合
  50. pivot = pick_random(除外);
  51. }
  52. List<String> tempc = new ArrayList<>(候補);
  53. tempc.removeAll(neighbors.get(pivot));
  54.  
  55. (文字列 s : tempc) {
  56. List<String> new_candidates = 新しいArrayList<>(candidates);
  57. new_candidates.retainAll(neighbors.get(s));
  58.  
  59. List<String> new_excluded = 新しい ArrayList<>(excluded);
  60. new_excluded.retainAll(近隣ノードを取得(s));
  61. <String>を設定しますnextClique = new HashSet<>(clique);
  62. nextClique.add (s);
  63. bk2(nextClique、新しい候補、新しい除外);
  64. 候補者を削除します。
  65. 除外。追加(s);
  66. }
  67. }
  68.  
  69. プライベートリスト<String> degenerate_order(リスト<String> innerNodes) {
  70. List<String> 結果 = 新しい ArrayList<>();
  71. Map<String, Integer > deg = new HashMap<>();
  72. (文字列ノード: innerNodes) {
  73. deg.put(node, neighbors.get(node) .size ());
  74. }
  75. while (!deg.isEmpty()) {
  76. 整数  min = Collections.min (deg.values ( ) );
  77. 文字列 minKey = null ;
  78. (文字列キー: deg.keySet()) {
  79. if (deg.get(キー).equals(最小値)) {
  80. minKey =キー;
  81. 壊す;
  82. }
  83. }
  84. 結果にminKey を追加します。
  85. deg.remove(minKey);
  86. (文字列 k : neighbors.get(minKey)) {
  87. (度を含むキー(k))の場合
  88. deg.put(k, deg.get(k) - 1);
  89. }
  90. }
  91.  
  92. }
  93. 結果を返します
  94. }
  95.  
  96.  
  97. プライベート文字列pick_random(List<String> random) {
  98. ランダム != null && !random.isEmpty() の場合 {
  99. random.get(0)を返します
  100. }それ以外{
  101. 戻る ヌル;
  102. }
  103. }
  104.  
  105. 公共 設定<設定 <文字>> findMaxCliques() {
  106. this.bk3(新しいHashSet<>()、新しいArrayList<>(ノード)、新しいArrayList<>());
  107. maxCliquesを返します
  108. }
  109.  
  110. 公共 静的void main(String[] args) {
  111. Map<String, Set <String>> neighbors = new HashMap<>();
  112. neighbors.put( "0" , new HashSet<>(Arrays.asList( "1" , "2" , "3" )));
  113. neighbors.put( "1" , new HashSet<>(Arrays.asList( "0" , "2" )));
  114. neighbors.put( "2" , new HashSet<>(Arrays.asList( "0" , "1" , "3" , "6" )));
  115. neighbors.put( "3" , new HashSet<>(Arrays.asList( "0" , "2" , "4" , "5" )));
  116. neighbors.put( "4" , new HashSet<>(Arrays.asList( "3" , "5" , "6" )));
  117. neighbors.put( "5" , new HashSet<>(Arrays.asList( "3" , "4" , "6" )));
  118. neighbors.put( "6" , new HashSet<>(Arrays.asList( "2" , "4" , "5" )));
  119. neighbors.put( "7" , new HashSet<>(Arrays.asList( "6" )));
  120. CliqueFinder ファインダー = 新しい CliqueFinder(近隣、3);
  121. finder.bk3(新しい HashSet<>()、新しい ArrayList<>(neighbors.keySet())、新しい ArrayList<>());
  122. システム.out.println (finder.maxCliques);
  123. }
  124. }

<<:  せっかちなGoogleのハードウェアから、中国と米国がAI商業化の問題をそれぞれどのように解決できるかまで

>>:  簡単な議論: モノのインターネット、クラウド コンピューティング、ビッグ データ、人工知能をどのように区別し、関連付けるか?

ブログ    
ブログ    

推薦する

...

人工知能が税務業界を変える7つの方法

[[313080]]政府は、医療、輸送、防衛、国家安全保障など、多くの分野で AI とロボット工学を...

Word2vec の作者が明かす: seq2seq は私のアイデア、GloVe はスキルを盗用、反撃が迫る

NeurIPS 2023の受賞論文が発表され、10年前の単語埋め込み技術word2vecが当然の「T...

シンプルな人工ニューラル ネットワークをゼロから構築する: 1 つの隠れ層

[51CTO.com クイック翻訳] 前回の記事「人工ニューラルネットワークをゼロから構築する(パー...

自動運転がどんどん近づき、高精度地図の実用化も加速

近年、自動運転技術の急速な発展とインテリジェントコネクテッドカーの導入が進む中、鍵となる高精度地図の...

企業セキュリティのための AI 生体認証

生体認証技術は、市場に登場した最新の AI イノベーションのおかげで、特に 2021 年には長年にわ...

アメリカの科学者が新技術を開発:ロボットが行動する前によく考えさせる

カリフォルニア大学バークレー校の新しい研究によると、ロボットはビデオ認識技術を通じて物体を移動させる...

7つの予測ストレージ分析ツールの比較

人工知能技術は、機械学習、計算統計、さまざまなディープラーニングモデルの使用を通じて主流になりました...

AI主導のサイバーセキュリティチームが人間の能力拡張に取り組む

サイバー脅威の範囲は、企業資産や選挙から健康データや物理インフラまで拡大しており、新興技術の予期せぬ...

...

AI基盤を強化し、業界の実践に注力する---WOTグローバル人工知能技術サミット機械学習実践フォーラムの記録

[51CTO.comよりオリジナル記事] 6月21日、51CTO主催のWOT2019グローバル人工知...

...

...

ネイチャー誌に「LK-99は超伝導体ではない」という記事が掲載された。

長年続いていた室温超伝導の謎が解明されたようだ。昨日、ネイチャー誌は「LK-99は室温超伝導体ではな...

DAMOアカデミー物流ロボットQA

1. 物流ロボットとは?物流ロボット「Xiaomanlu」は、ターミナル物流シナリオ向けに設計され...