私の質問は「同じフィールドのソート値に基づいてソートされた 2 つのファイルを結合する」ですが、これを名前付きパイプに拡張します。
たとえば、整数がソートされた 2 つのテキスト ファイルがあり、それらを結合したいとします。 を使用するsort -nm file1.txt file2.txt > merged.txt
と、1 パスの非ブロッキング マージを実行できます。
さて、これらのファイルは実際には私が作成し、Python 内から入力している名前付きパイプ (FIFO) だとします。1 つのパイプに書き込み、次に次のパイプに書き込みを交互に行う限り、問題なく実行できます。このコードは、順序付けられた整数のリストを 2 つ生成し、サブプロセスによって読み取られる名前付きパイプに書き込みsort
、マージされた結果を 1 つのファイルに出力します。
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
i % 2
ソートされた結果が得られます。しかし、をに変更してリストのバランスを崩してみましょうi % 3
。元のバージョンでは、これは単に fifo1、fifo2、fifo1、fifo2 というように印刷されます。変更されたバージョンでは、2 つのパイプの 1 つに 2 倍の行が印刷されます。
これを実行すると、i % 3
次の出力が得られます。
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
いつも同じ場所で止まります。strace を使用すると次のことがわかります:
Python プロセスはwrite
スポット 4 の呼び出しでハングアップしました:
write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
しかし、スポット 3 への呼び出し
sort
でプロセスが停止します。read
read(3,
lsof -n -p
プロセスの出力を見ると、プロセスsort
が に値が来るのを待っているfifo1
一方で、write
プロセスが に値を書き込むのを待っていることがわかりますfifo2
。
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
そのため、何らかの理由で、sort
プロセスは のリッスンを停止しfifo2
、プロセスがハングします。
fifo2
ここで、次のコマンドを発行して別のリスナーをオンにするとcat fifo2
、プロセスが再び開始され、何千回も反復されて、別のランダムなポイント (反復 53733) で停止します。
パイプのバッファリングで何か理解できないことがあり、sort
1 つのストリームから次のストリームへの読み取りがどのように変更されるのかがわからないのだと思います。私にとって奇妙に思えるのは、それが決定論的で、まったく同じ場所で失敗し、リストのバランスが取れていない場合にのみ失敗するように見えることです。
これを解決する方法はありますか?
答え1
明らかに、プログラムが 2 つの名前付きパイプに異なる量のデータを書き込むと、デッドロックが発生します。プログラムはwrite
fifo2 (バッファがいっぱい) でブロックし、sort
プロセスは fifo1 (バッファが空) でブロックしますread
。
の実装方法がわかりませんsort
。効率を上げるために、ファイルを大きなブロックで読み取り、メモリ内でデータを処理する必要があると思われます。データの読み取りにsort
の関数を使用する場合、バッファリングが自動的に行われることもあります。stdio.h
名前付き (および名前なし) パイプは、データ用にバッファを使用します。
バッファがいっぱいの場合、書き込みプロセスは、読み取りプロセスがデータを読み取るか、その終了を閉じるまでブロックされます。
バッファが空の場合、読み取りプロセスは、書き込みプロセスがデータを書き込むか、その終了を閉じるまでブロックされます。
毎サイクルで fifo1 に 1 行、fifo2 に 2 行書き込むと、fifo2 のバッファはいっぱいになりますが、fifo1 のバッファは半分しかいっぱいになりません。
プログラムが fifo に書き込むデータの量と読み取りたいデータの量に応じてsort
、プログラムはバッファーが空である fifo1 から何かを読み取りたい一方で、バッファーがいっぱいの fifo2 に書き込みたいという状況に陥ることは明らかですsort
。
パイプ バッファーのサイズは固定されており、プログラムでもsort
データの読み取りまたは書き込みに固定のバッファー サイズが使用される可能性が高いため、結果は確定的です。
GNUのソースコードはsort
以下で見ることができます。
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
最初は、関数を使用して、すべてのファイルに対するループですべての入力ファイルの入力バッファを埋めようとしますfillbuf
。
その後、いくつかの条件下では、fillbuf
入力ファイルを再度呼び出します。
関数内にfillbuf
コメントがある
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
どうやらsort
入力ファイルの 1 つを選択し、一定量のデータを必要としているようです。ブロックを読み取る場合は入力ファイルを切り替えません。
この実装は通常のファイルでは適切に機能します。read
操作によって、しばらくするとデータまたは EOF が返されるため、永続的にブロックされることはありません。
2 つのプロセス/スレッド間でブロックできるものが複数ある場合、デッドロックを回避するのは常に困難です。この場合、パイプは 1 つだけ使用してください。fifo2 がブロックされる場合、または逆の場合に、常に fifo1 に書き込むデータがある場合は、非ブロッキング操作を使用すると役立つ場合があります。
2 つのパイプを使用してパイプに書き込む場合は、2 つの別々のスレッド/プロセスを使用することもできますが、スレッド/プロセスが互いに独立して動作する場合に限られます。パイプ 1 に書き込むスレッド A が、パイプ 2 への書き込みをブロックするスレッド B を何らかの方法で待機する場合は役に立ちません。