サイズの異なる 2 つのソート済み名前付きパイプを結合するにはどうすればよいでしょうか?

サイズの異なる 2 つのソート済み名前付きパイプを結合するにはどうすればよいでしょうか?

私の質問は「同じフィールドのソート値に基づいてソートされた 2 つのファイルを結合する」ですが、これを名前付きパイプに拡張します。

たとえば、整数がソートされた 2 つのテキスト ファイルがあり、それらを結合したいとします。 を使用するsort -nm file1.txt file2.txt > merged.txtと、1 パスの非ブロッキング マージを実行できます。

さて、これらのファイルは実際には私が作成し、Python 内から入力している名前付きパイプ (FIFO) だとします。1 つのパイプに書き込み、次に次のパイプに書き込みを交互に行う限り、問題なく実行できます。このコードは、順序付けられた整数のリストを 2 つ生成し、サブプロセスによって読み取られる名前付きパイプに書き込みsort、マージされた結果を 1 つのファイルに出力します。

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

i % 2ソートされた結果が得られます。しかし、をに変更してリストのバランスを崩してみましょうi % 3。元のバージョンでは、これは単に fifo1、fifo2、fifo1、fifo2 というように印刷されます。変更されたバージョンでは、2 つのパイプの 1 つに 2 倍の行が印刷されます。

これを実行すると、i % 3次の出力が得られます。

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

いつも同じ場所で止まります。strace を使用すると次のことがわかります:

Python プロセスはwriteスポット 4 の呼び出しでハングアップしました: write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

しかし、スポット 3 への呼び出し sortでプロセスが停止します。readread(3,

lsof -n -pプロセスの出力を見ると、プロセスsortが に値が来るのを待っているfifo1一方で、writeプロセスが に値を書き込むのを待っていることがわかりますfifo2

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

そのため、何らかの理由で、sortプロセスは のリッスンを停止しfifo2、プロセスがハングします。

fifo2ここで、次のコマンドを発行して別のリスナーをオンにするとcat fifo2、プロセスが再び開始され、何千回も反復されて、別のランダムなポイント (反復 53733) で停止します。

パイプのバッファリングで何か理解できないことがあり、sort1 つのストリームから次のストリームへの読み取りがどのように変更されるのかがわからないのだと思います。私にとって奇妙に思えるのは、それが決定論的で、まったく同じ場所で失敗し、リストのバランスが取れていない場合にのみ失敗するように見えることです。

これを解決する方法はありますか?

答え1

明らかに、プログラムが 2 つの名前付きパイプに異なる量のデータを書き込むと、デッドロックが発生します。プログラムはwritefifo2 (バッファがいっぱい) でブロックし、sortプロセスは fifo1 (バッファが空) でブロックしますread

の実装方法がわかりませんsort。効率を上げるために、ファイルを大きなブロックで読み取り、メモリ内でデータを処理する必要があると思われます。データの読み取りにsortの関数を使用する場合、バッファリングが自動的に行われることもあります。stdio.h

名前付き (および名前なし) パイプは、データ用にバッファを使用します。
バッファがいっぱいの場合、書き込みプロセスは、読み取りプロセスがデータを読み取るか、その終了を閉じるまでブロックされます。
バッファが空の場合、読み取りプロセスは、書き込みプロセスがデータを書き込むか、その終了を閉じるまでブロックされます。

毎サイクルで fifo1 に 1 行、fifo2 に 2 行書き込むと、fifo2 のバッファはいっぱいになりますが、fifo1 のバッファは半分しかいっぱいになりません。

プログラムが fifo に書き込むデータの量と読み取りたいデータの量に応じてsort、プログラムはバッファーが空である fifo1 から何かを読み取りたい一方で、バッファーがいっぱいの fifo2 に書き込みたいという状況に陥ることは明らかですsort

パイプ バッファーのサイズは固定されており、プログラムでもsortデータの読み取りまたは書き込みに固定のバッファー サイズが使用される可能性が高いため、結果は確定的です。

GNUのソースコードはsort以下で見ることができます。
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

最初は、関数を使用して、すべてのファイルに対するループですべての入力ファイルの入力バッファを埋めようとしますfillbuf

その後、いくつかの条件下では、fillbuf入力ファイルを再度呼び出します。

関数内にfillbufコメントがある

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

どうやらsort入力ファイルの 1 つを選択し、一定量のデータを必要としているようです。ブロックを読み取る場合は入力ファイルを切り替えません。

この実装は通常のファイルでは適切に機能します。read操作によって、しばらくするとデータまたは EOF が返されるため、永続的にブロックされることはありません。


2 つのプロセス/スレッド間でブロックできるものが複数ある場合、デッドロックを回避するのは常に困難です。この場合、パイプは 1 つだけ使用してください。fifo2 がブロックされる場合、または逆の場合に、常に fifo1 に書き込むデータがある場合は、非ブロッキング操作を使用すると役立つ場合があります。

2 つのパイプを使用してパイプに書き込む場合は、2 つの別々のスレッド/プロセスを使用することもできますが、スレッド/プロセスが互いに独立して動作する場合に限られます。パイプ 1 に書き込むスレッド A が、パイプ 2 への書き込みをブロックするスレッド B を何らかの方法で待機する場合は役に立ちません。

関連情報