Как объединить два отсортированных именованных канала с неравными размерами?

Как объединить два отсортированных именованных канала с неравными размерами?

Мой вопрос похож на "Объединить два отсортированных файла на основе сортировки значений в одном и том же поле" но распространяя его на именованные каналы.

Допустим, у меня есть два текстовых файла с отсортированными целыми числами, и я хочу их объединить. Я могу использовать sort -nm file1.txt file2.txt > merged.txtдля однопроходного, неблокирующего объединения.

Теперь предположим, что эти файлы на самом деле являются именованными каналами (FIFO), которые я создаю и заполняю изнутри python. Пока я чередую запись в один канал, а затем в другой, я могу делать это просто отлично. Этот код работает для генерации двух упорядоченных списков целых чисел, записывает их в именованные каналы, считываемые подпроцессом sort, который выводит объединенный результат в один файл:

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

Я получаю отсортированный результат. НО — теперь давайте сделаем списки несбалансированными, изменив на i % 2. i % 3В оригинале это просто печатает в fifo1, затем fifo2, затем fifo1, затем fifo2 и т. д. В измененной версии это печатает в два раза больше строк в один из двух каналов.

Запустив это, i % 3я получаю следующий вывод:

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

Он всегда останавливается в одном и том же месте. Используя strace я могу увидеть:

Процесс Python завис при writeвызове точки 4: write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

Но sortпроцесс зависает на readвызове точки 3: read(3,

Глядя на lsof -n -pвыходные данные sortпроцесса, мы видим, что он ожидает, пока значение достигнет fifo1, в то время как writeпроцесс ожидает записи значения в fifo2:

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

Итак, по какой-то причине sortпроцесс *перестал слушать fifo2, что привело к зависанию процесса.

Теперь, если я включу отдельный прослушиватель, fifo2просто выполнив cat fifo2..., процесс начнется снова и продолжится тысячи итераций, пока... он не остановится в другой случайной точке (итерация 53733).

Я думаю, что должно быть что-то, чего я не понимаю, с буферизацией каналов и как это sortменяется от чтения из одного потока к другому. Что мне кажется странным, так это то, что это детерминировано, дает сбой в одном и том же месте и, кажется, дает сбой только тогда, когда списки несбалансированы.

Можно ли как-то решить эту проблему?

решение1

Очевидно, что ваша программа создает взаимоблокировку, когда вы записываете разные объемы данных в два именованных канала. Ваша программа блокируется на writefor one fifo2 (с полным буфером), в то время как sortпроцесс блокируется на readfor fifo1 (с пустым буфером).

Вы не знаете, как sortэто реализовано. Вероятно, он хочет читать файлы большими блоками, а затем обрабатывать данные в памяти для эффективности. Буферизация может даже происходить автоматически, если для чтения данных sortиспользуются функции из .stdio.h

Именованные (и неименованные) каналы используют буфер для данных.
Если буфер заполнен, процесс записи будет заблокирован до тех пор, пока процесс чтения не прочитает некоторые данные или не закроет свой конец.
Если буфер пуст, процесс чтения будет заблокирован до тех пор, пока процесс записи не запишет некоторые данные или не закроет свой конец.

Если вы запишете одну строку в fifo1 и две строки в fifo2 в каждом цикле, вы заполните буфер fifo2, в то время как буфер fifo1 будет заполнен только наполовину.

В зависимости от того, сколько данных ваша программа записывает в fifo и сколько sortхочет прочитать, это, очевидно, приводит к ситуации, когда sortтребуется прочитать что-то из fifo1, в котором просто пустой буфер, в то время как ваша программа хочет записать в fifo2 с полным буфером.

Результат является детерминированным, поскольку буфер канала имеет фиксированный размер, и, вероятно, ваша программа также sortиспользует фиксированные размеры буфера для чтения или записи данных.

Исходный код GNU можно посмотреть sortздесь
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

В начале он пытается заполнить входные буферы для всех входных файлов в цикле по всем файлам, используя функцию fillbuf.

Позже при некоторых условиях он fillbufснова запрашивает входной файл.

В функции fillbufесть комментарий

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

Видимо sortвыбирает один из входных файлов и хочет определенный объем данных. Он не переключает входные файлы, если чтение блокирует.

Реализация хорошо работает для обычных файлов, поскольку readоперация либо вернет некоторые данные, либо EOF через некоторое время, поэтому она не будет заблокирована навсегда.


Всегда сложно избежать взаимоблокировок, если у вас есть более одной вещи, которая может блокировать между двумя процессами/потоками. В вашем случае вам следует использовать только один канал. Использование неблокирующих операций может помочь, если у вас всегда есть данные для записи в fifo1, если fifo2 заблокирует или наоборот.

Использование двух каналов может сработать, если вы будете использовать два отдельных потока/процесса для записи в каналы, но только если потоки/процессы работают независимо друг от друга. Не поможет, если поток A, который должен писать в канал1, будет каким-то образом ждать поток B, который просто блокируется при записи в канал2.

Связанный контент