如何合併兩個大小不均勻的排序命名管道?

如何合併兩個大小不均勻的排序命名管道?

我的問題類似於“根據同一欄位中的排序值合併兩個排序文件「但將其擴展到命名管道。

假設我有兩個帶有排序整數的文字文件,我想合併它們。我可以用來sort -nm file1.txt file2.txt > merged.txt進行一次性、非阻塞合併。

現在,假設這些檔案實際上是我正在製作然後從 python 中填充的命名管道 (FIFO)。只要我交替寫入一個管道,然後寫入下一個管道,我就可以很好地做到這一點。此程式碼用於產生兩個有序的整數列表,將它們寫入sort子進程讀取的命名管道,子進程將合併結果輸出到單個檔案:

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

我得到一個排序結果。但是 - 現在讓我們透過將 更改i % 2為來使列表不平衡i % 3。在原來的版本中,這只是印到 fifo1,然後是 fifo2,然後是 fifo1,然後是 fifo2,等等。

運行它i % 3我得到以下輸出:

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

它總是停在同一個地方。使用 strace 我可以看到:

writepython 進程在呼叫點 4 時掛起:write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

但該進程在呼叫點 3 sort時掛起:readread(3,

查看進程lsof -n -p的輸出sort表示它正在等待值到達fifo1,而write進程正在等待寫入值fifo2

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

因此由於某種原因,sort進程 * 停止監聽fifo2,導致進程掛起。

fifo2現在,如果我透過發出...來放置一個單獨的偵聽器cat fifo2,該過程將再次開始並繼續數千次迭代,直到...它現在在另一個隨機點停止(迭代 53733)。

我認為一定有一些我不明白的事情發生在緩衝管道上,以及sort從一個流的讀取到下一個流的讀取是如何變化的。對我來說奇怪的是,它是確定性的,在完全相同的位置失敗,並且只有當清單不平衡時才會失敗。

我有什麼辦法可以解決這個問題嗎?

答案1

顯然,當您向兩個命名管道寫入不同數量的資料時,您的程式會產生死鎖。您的程式在writefor 1 fifo2 上阻塞(緩衝區已滿),而sort進程在 for fifo1 上阻塞read(緩衝區為空)。

你不知道如何sort實現。它可能希望以更大的區塊讀取文件,然後處理記憶體中的資料以提高效率。如果使用讀取資料的sort函數,緩衝甚至可能會自動發生。stdio.h

命名(和未命名)管道使用資料緩衝區。
如果緩衝區已滿,寫入程序將阻塞,直到讀取程序讀取了一些資料或關閉其末尾。
如果緩衝區為空,則讀取程序將阻塞,直到寫入程序寫入一些資料或結束為止。

如果在每個週期中向 fifo1 寫入一行,向 fifo2 寫入兩行,則會填滿 fifo2 的緩衝區,而 fifo1 的緩衝區僅填滿一半。

根據您的程式向 fifo 寫入的資料量以及sort想要讀取的資料量,這顯然會導致這樣的情況:您sort想要從fifo1 讀取某些內容,而程式只有一個空緩衝區,而您的程式想要寫入完整緩衝區的fifo2 。

結果是確定性的,因為管道緩衝區具有固定大小,而且您的程式也可能具有固定大小並sort使用固定緩衝區大小來讀取或寫入資料。

您可以查看 GNU 的源代碼sort
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

一開始,它嘗試使用 function 在所有檔案的循環中填入所有輸入檔案的輸入緩衝區fillbuf

稍後在某些情況下它會fillbuf再次調用輸入檔。

函數中fillbuf有一條註釋

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

顯然sort選擇了一個輸入檔並需要一定量的資料。如果讀取阻塞,它不會切換輸入檔。

實作對於普通文件效果很好,因為read操作會在一段時間後返回一些資料或 EOF,因此它不會永久阻塞。


如果有不只一件事可以在兩個進程/執行緒之間阻塞,那麼總是很難避免死鎖。在您的情況下,您應該只使用一根管道。如果您總是有資料要寫入 fifo1(如果 fifo2 會阻塞),則使用非阻塞操作可能會有所幫助,反之亦然。

如果您使用兩個單獨的執行緒/進程寫入管道,則使用兩個管道可能會起作用,但前提是執行緒/進程彼此獨立工作。如果應該寫入 pipeline1 的線程 A 以某種方式等待線程 B(該線程 B 只在寫入 pipeline2 時阻塞),這將無濟於事。

相關內容