Мой вопрос похож на "Объединить два отсортированных файла на основе сортировки значений в одном и том же поле" но распространяя его на именованные каналы.
Допустим, у меня есть два текстовых файла с отсортированными целыми числами, и я хочу их объединить. Я могу использовать sort -nm file1.txt file2.txt > merged.txt
для однопроходного, неблокирующего объединения.
Теперь предположим, что эти файлы на самом деле являются именованными каналами (FIFO), которые я создаю и заполняю изнутри python. Пока я чередую запись в один канал, а затем в другой, я могу делать это просто отлично. Этот код работает для генерации двух упорядоченных списков целых чисел, записывает их в именованные каналы, считываемые подпроцессом sort
, который выводит объединенный результат в один файл:
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
Я получаю отсортированный результат. НО — теперь давайте сделаем списки несбалансированными, изменив на i % 2
. i % 3
В оригинале это просто печатает в fifo1, затем fifo2, затем fifo1, затем fifo2 и т. д. В измененной версии это печатает в два раза больше строк в один из двух каналов.
Запустив это, i % 3
я получаю следующий вывод:
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
Он всегда останавливается в одном и том же месте. Используя strace я могу увидеть:
Процесс Python завис при write
вызове точки 4:
write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
Но sort
процесс зависает на read
вызове точки 3:
read(3,
Глядя на lsof -n -p
выходные данные sort
процесса, мы видим, что он ожидает, пока значение достигнет fifo1
, в то время как write
процесс ожидает записи значения в fifo2
:
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
Итак, по какой-то причине sort
процесс *перестал слушать fifo2
, что привело к зависанию процесса.
Теперь, если я включу отдельный прослушиватель, fifo2
просто выполнив cat fifo2
..., процесс начнется снова и продолжится тысячи итераций, пока... он не остановится в другой случайной точке (итерация 53733).
Я думаю, что должно быть что-то, чего я не понимаю, с буферизацией каналов и как это sort
меняется от чтения из одного потока к другому. Что мне кажется странным, так это то, что это детерминировано, дает сбой в одном и том же месте и, кажется, дает сбой только тогда, когда списки несбалансированы.
Можно ли как-то решить эту проблему?
решение1
Очевидно, что ваша программа создает взаимоблокировку, когда вы записываете разные объемы данных в два именованных канала. Ваша программа блокируется на write
for one fifo2 (с полным буфером), в то время как sort
процесс блокируется на read
for fifo1 (с пустым буфером).
Вы не знаете, как sort
это реализовано. Вероятно, он хочет читать файлы большими блоками, а затем обрабатывать данные в памяти для эффективности. Буферизация может даже происходить автоматически, если для чтения данных sort
используются функции из .stdio.h
Именованные (и неименованные) каналы используют буфер для данных.
Если буфер заполнен, процесс записи будет заблокирован до тех пор, пока процесс чтения не прочитает некоторые данные или не закроет свой конец.
Если буфер пуст, процесс чтения будет заблокирован до тех пор, пока процесс записи не запишет некоторые данные или не закроет свой конец.
Если вы запишете одну строку в fifo1 и две строки в fifo2 в каждом цикле, вы заполните буфер fifo2, в то время как буфер fifo1 будет заполнен только наполовину.
В зависимости от того, сколько данных ваша программа записывает в fifo и сколько sort
хочет прочитать, это, очевидно, приводит к ситуации, когда sort
требуется прочитать что-то из fifo1, в котором просто пустой буфер, в то время как ваша программа хочет записать в fifo2 с полным буфером.
Результат является детерминированным, поскольку буфер канала имеет фиксированный размер, и, вероятно, ваша программа также sort
использует фиксированные размеры буфера для чтения или записи данных.
Исходный код GNU можно посмотреть sort
здесь
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
В начале он пытается заполнить входные буферы для всех входных файлов в цикле по всем файлам, используя функцию fillbuf
.
Позже при некоторых условиях он fillbuf
снова запрашивает входной файл.
В функции fillbuf
есть комментарий
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
Видимо sort
выбирает один из входных файлов и хочет определенный объем данных. Он не переключает входные файлы, если чтение блокирует.
Реализация хорошо работает для обычных файлов, поскольку read
операция либо вернет некоторые данные, либо EOF через некоторое время, поэтому она не будет заблокирована навсегда.
Всегда сложно избежать взаимоблокировок, если у вас есть более одной вещи, которая может блокировать между двумя процессами/потоками. В вашем случае вам следует использовать только один канал. Использование неблокирующих операций может помочь, если у вас всегда есть данные для записи в fifo1, если fifo2 заблокирует или наоборот.
Использование двух каналов может сработать, если вы будете использовать два отдельных потока/процесса для записи в каналы, но только если потоки/процессы работают независимо друг от друга. Не поможет, если поток A, который должен писать в канал1, будет каким-то образом ждать поток B, который просто блокируется при записи в канал2.