Ray で効率的なディープラーニング データ パイプラインを作成する

Ray で効率的なディープラーニング データ パイプラインを作成する

ディープラーニング モデルのトレーニングに使用される GPU は強力ですが、高価です。 GPU を効果的に活用するには、開発者は、GPU が次のトレーニング ステップを計算する準備ができたときに、できるだけ早くデータを GPU に転送するための効率的なデータ パイプラインが必要です。 Ray を使用すると、データ パイプラインの効率が大幅に向上します。

1. トレーニングデータパイプラインの構造

まず、次のモデルトレーニング疑似コードを検討してください。

 for step in range(num_steps): sample, target = next(dataset) # 步骤1 train_step(sample, target) # 步骤2

ステップ 1 では、サンプルとラベルの次のミニバッチを取得します。ステップ 2 では、それらは train_step 関数に渡され、GPU にコピーされ、前方パスと後方パスを実行して損失と勾配を計算し、オプティマイザーの重みを更新します。

ステップ 1 を詳しく見てみましょう。データセットが大きすぎてメモリに収まらない場合、ステップ 1 ではディスクまたはネットワークから次のデータのミニバッチを取得します。さらに、ステップ 1 にはある程度の前処理が含まれます。つまり、入力データは、モデルに入力される前に数値テンソルまたはテンソルのコレクションに変換する必要があります。場合によっては、テンソルをモデルに渡す前に追加の変換が適用されます (正規化、回転、ランダム化など)。

ワークフローが厳密に順次的で、最初にステップ 1 を実行してからステップ 2 を実行する場合、モデルは常に次の入力、出力、および前処理操作のバッチを待機する必要があります。 GPU は効率的に使用されず、次のデータのミニバッチをロードしている間はアイドル状態になります。

この問題に対処するには、データ パイプラインをプロデューサーとコンシューマーの問題として考えると便利です。データ パイプラインは小さなデータ バッチを生成し、それを境界付きバッファーに書き込みます。モデル/GPU は、バッファからデータのミニバッチを消費し、前方/後方計算を実行し、モデルの重みを更新します。データ パイプラインが、モデル/GPU が消費できる速度と同じ速さで小さなバッチのデータを生成することができれば、トレーニング プロセスは非常に効率的になります。

写真

2. Tensorflow tf.data API

Tensorflow tf.data API は、バックグラウンド スレッドを使用して小さなバッチのデータを取得し、モデルが待機する必要がないようにすることで、データ パイプラインを効率的に作成するための豊富な関数セットを提供します。単にデータをプリフェッチするだけでは不十分です。データのミニバッチを生成する速度が GPU がデータを消費できる速度よりも遅い場合は、並列化を使用してデータの読み取りと変換を高速化する必要があります。この目的のために、Tensorflow は、複数のスレッドを利用してデータを並列に読み取るインターリーブ関数と、複数のスレッドを使用して小さなデータ バッチを変換する並列マップ関数を提供します。

これらの API はマルチスレッドであるため、Python グローバル インタープリター ロック (GIL) によって制限される可能性があります。 Python GIL は、Python インタープリターが一度に 1 つのスレッドのバイトコードを実行するように制限します。パイプラインで純粋な TensorFlow コードを使用する場合、TensorFlow コア実行エンジンは GIL の範囲外で動作するため、通常はこの制限に直面することはありません。ただし、GIL を解放しないサードパーティのライブラリを使用している場合、または Python で大量の計算を実行している場合は、マルチスレッドに依存してパイプラインを並列化することは現実的ではありません。

3. マルチプロセスを使用してデータパイプラインを並列化する

次のジェネレータ関数を検討してください。これは、データ サンプルとラベルのミニバッチを生成するために、読み込みと計算の実行をシミュレートします。

 def data_generator(): for _ in range(10): # 模拟获取# 从磁盘/网络time.sleep(0.5) # 模拟计算for _ in range(10000): pass yield ( np.random.random((4, 1000000, 3)).astype(np.float32), np.random.random((4, 1)).astype(np.float32) )

次に、このジェネレーターを架空のトレーニング パイプラインで使用し、データのミニバッチを生成するのにかかる平均時間を測定します。

 generator_dataset = tf.data.Dataset.from_generator( data_generator, output_types=(tf.float64, tf.float64), output_shapes=((4, 1000000, 3), (4, 1)) ).prefetch(tf.data.experimental.AUTOTUNE) st = time.perf_counter() times = [] for _ in generator_dataset: en = time.perf_counter() times.append(en - st) # 模拟训练步骤time.sleep(0.1) st = time.perf_counter() print(np.mean(times))

観測された平均時間は約 0.57 秒でした (Intel Core i7 プロセッサを搭載した Mac ラップトップで測定)。これが実際のトレーニング ループであれば、GPU の使用率は非常に低くなり、計算に 0.1 秒しかかからず、その後 0.57 秒間アイドル状態になって次のデータ バッチを待つことになります。

データの読み込みを高速化するには、マルチプロセス ジェネレーターを使用できます。

 from multiprocessing import Queue, cpu_count, Process def mp_data_generator(): def producer(q): for _ in range(10): # 模拟获取# 从磁盘/网络time.sleep(0.5) # 模拟计算for _ in range(10000000): pass q.put(( np.random.random((4, 1000000, 3)).astype(np.float32), np.random.random((4, 1)).astype(np.float32) )) q.put("DONE") queue = Queue(cpu_count()*2) num_parallel_processes = cpu_count() producers = [] for _ in range(num_parallel_processes): p = Process(target=producer, args=(queue,)) p.start() producers.append(p) done_counts = 0 while done_counts < num_parallel_processes: msg = queue.get() if msg == "DONE": done_counts += 1 else: yield msg queue.join()

ここで、次のデータのミニバッチを待つのに費やされた時間を測定すると、平均 0.08 秒になります。速度は7倍近くまで上がりましたが、理想としてはこの時間を0に近づけたいところです。

分析してみると、デシリアライズのためのデータの準備にかなりの時間がかかっていることがわかります。マルチプロセス ジェネレーターでは、プロデューサー プロセスは、メイン プロセスで準備してデシリアライズする必要がある大きな NumPy 配列を返します。プロセス間で大きな配列を渡す際の効率を改善できますか?

4. Rayを使用してデータパイプラインを並列化する

ここでレイが登場します。 Ray は、Python で分散計算を実行するためのフレームワークです。異なるプロセス間でオブジェクトを効率的に転送できる共有メモリ オブジェクト ストアが付属しています。特に、オブジェクト ストア内の Numpy 配列は、シリアル化やデシリアル化を行わずに、同じノード上のワーカー間で共有できます。 Ray を使用すると、複数のマシン間でのデータ読み込みのスケーリングも簡単になり、Apache Arrow を使用して大規模な配列を効率的にシリアル化および逆シリアル化できます。

Ray には並列イテレータを作成できるユーティリティ関数 from_iterators が付属しており、開発者はこれを使用して data_generator ジェネレータ関数をラップできます。

 import ray def ray_generator(): num_parallel_processes = cpu_count() return ray.util.iter.from_iterators( [data_generator]*num_parallel_processes ).gather_async()

ray_generator を使用すると、次のデータのミニバッチを待機する時間は 0.02 秒と測定され、これは multiprocessing を使用する場合よりも 4 倍高速です。

<<: 

>>:  シーメンスとマイクロソフトが製造業向け AI アシスタントを発表 - Siemens Industry Copilot

ブログ    

推薦する

知識をグラフに変換するには、いくつのステップが必要ですか?インターネット上で最も包括的な清華ナレッジグラフレポートの89ページ

ナレッジグラフは、人工知能の重要な分野技術です。2012年にGoogleによって提案され、大規模な知...

考えてみると恐ろしいですね!人工知能は、成功率70%で人間の行動を操作することを学習したと疑われている。

人工知能に関しては、多くの人が懸念を表明しています。例えば、人類開発の最前線にいるホーキング博士とマ...

...

[ビッグガイがやってくるエピソード8] 電子商取引リスク管理ツール - モバイルデバイス向けの信頼できるID

[51CTO.com からのオリジナル記事] ライブショー「ビッグネームがやってくる」の今回のエピ...

顔認識は終わったのか?最初の「顔ハイジャック」型バンキングトロイの木馬が誕生

各人の顔、指紋、虹彩の情報はそれぞれ固有であり偽造が困難であるため、生体認証は長年にわたり究極の本人...

...

人工知能クロニクル | これら 10 大イベントは、人工知能の 64 年間の発展を記録しています

1956 年の夏、アメリカの小さな町ハノーバーの静かなダートマス大学に、ジョン・マッカーシー (Li...

あなたは人工知能/機械学習についてどれくらい知っていますか?

[[188835]]クイズ番組やマンマシン囲碁で人間に勝ったり、広告で人種差別的な偏見を示したとし...

...

劉強東:人工知能の時代が来ています。このチャンスをつかめば、あなたは豊かになれます。

劉強東は言った。「この世で働かずに得られる唯一のものは貧困であり、無から創造できる唯一のものは夢であ...

AIを使ってAIを攻撃する?敵対的機械学習に対する脅威と防御

人工知能 (AI) や機械学習 (ML) プロジェクトを適用する組織が増えるにつれて、これらのプロジ...

私の国はAI医療機器の標準化を加速しています

今年は、新たに改訂された「医療機器監督管理条例」の実施初年度であり、企業の主な責任がより顕著になり、...

ChatGPTのモバイル収益は9月に460万ドルという過去最高を記録し、成長疲れが現れ始めている。

10月10日、人工知能チャットボットChatGPTのモバイル分野での取り組みは大きな成果をもたらし...

人工知能の登場により、私たちは仕事を維持できるのでしょうか?

週末にニュースを見て衝撃を受けました。Google は最近、同社が開発したロボット (AI) システ...

...