Minha pergunta é semelhante a "Mesclar dois arquivos classificados com base na classificação de valores no mesmo campo" mas estendendo-o para pipes nomeados.
Digamos que eu tenha dois arquivos de texto com números inteiros classificados e queira mesclá-los. Posso usar sort -nm file1.txt file2.txt > merged.txt
para fazer uma mesclagem sem bloqueio e de uma passagem.
Agora, digamos que esses arquivos sejam na verdade pipes nomeados (FIFOs) que estou criando e preenchendo a partir do python. Contanto que eu alterne a gravação em um canal e depois no próximo, posso fazer isso perfeitamente. Este código funciona para gerar duas listas ordenadas de inteiros, gravá-las em pipes nomeados lidos por um sort
subprocesso, que gera o resultado mesclado em um único arquivo:
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
Eu recebo um resultado classificado. MAS - agora vamos desequilibrar as listas alterando o i % 2
para i % 3
. No original, isso imprime apenas em fifo1, depois fifo2, depois fifo1, depois fifo2, etc. Na versão modificada, ele imprime o dobro de linhas em um dos dois tubos.
Executando isso, i % 3
obtenho a seguinte saída:
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
Ele sempre para no mesmo lugar. Usando strace posso ver:
O processo python desligou em uma write
chamada para o ponto 4:
write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
Mas o sort
processo é interrompido em uma read
chamada para o spot 3:
read(3,
Observar a lsof -n -p
saída do sort
processo mostra que ele está aguardando a chegada de um valor para fifo1
, enquanto o write
processo aguarda para gravar um valor para fifo2
:
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
Então, por algum motivo, o sort
processo *parou de ouvir fifo2
, fazendo com que o processo travasse.
Agora, se eu colocar um ouvinte separado fifo2
apenas emitindo cat fifo2
... o processo começa novamente e continua por milhares de iterações, até... parar agora em outro ponto aleatório (iteração 53733).
Acho que deve haver algo que não entendo acontecendo com os pipes de buffer e como sort
está mudando a leitura de um fluxo para outro. O que é estranho para mim é que é determinístico, falhando exatamente no mesmo ponto, e só parece falhar quando as listas estão desequilibradas.
Existe alguma maneira de resolver isso?
Responder1
Obviamente, seu programa cria um impasse quando você grava diferentes quantidades de dados nos dois pipes nomeados. Seu programa bloqueia em umwrite
for one fifo2 (com buffer cheio) enquanto o sort
processo bloqueia em read
for fifo1 (com buffer vazio).
Você não sabe comosort
é implementado. Provavelmente deseja ler os arquivos em blocos maiores e depois processar os dados na memória para obter eficiência. O buffer pode até acontecer automaticamente se sort
usar funções de stdio.h
leitura dos dados.
Pipes nomeados (e não nomeados) usam um buffer para os dados.
Se o buffer estiver cheio, um processo de gravação será bloqueado até que um processo de leitura leia alguns dados ou feche seu final.
Se o buffer estiver vazio, um processo de leitura será bloqueado até que o processo de gravação tenha gravado alguns dados ou encerrado.
Se você escrever uma linha em fifo1 e duas linhas em fifo2 em cada ciclo, você preencherá o buffer do fifo2 enquanto o buffer do fifo1 estará preenchido apenas pela metade.
Dependendo de quantos dados seu programa grava no fifos e quanto sort
deseja ler, isso obviamente acaba em uma situação em que você sort
deseja ler algo do fifo1 que possui apenas um buffer vazio, enquanto seu programa deseja gravar no fifo2 com um buffer cheio .
O resultado é determinístico porque o buffer do pipe tem um tamanho fixo e provavelmente também o seu programa e sort
usa tamanhos de buffer fixos para ler ou gravar os dados.
Você pode ver o código fonte do GNU sort
em
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
No início, ele tenta preencher os buffers de entrada para todos os arquivos de entrada em um loop sobre todos os arquivos usando function fillbuf
.
Mais tarde, sob algumas condições, ele solicita fillbuf
novamente um arquivo de entrada.
Na função fillbuf
há um comentário
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
Aparentemente sort
seleciona um dos arquivos de entrada e deseja uma certa quantidade de dados. Ele não alterna os arquivos de entrada se estiver lendo blocos.
A implementação funciona bem para arquivos normais porque uma read
operação retornará alguns dados ou EOF após algum tempo, portanto não será bloqueada permanentemente.
É sempre difícil evitar conflitos se você tiver mais de uma coisa que pode bloquear entre dois processos/threads. No seu caso você deve usar apenas um tubo. Usar operações sem bloqueio pode ajudar se você sempre tiver dados para gravar no fifo1 se o fifo2 for bloqueado ou vice-versa.
Usar dois pipes pode funcionar se você usar dois threads/processos separados para gravar nos pipes, mas somente se os threads/processos funcionarem independentemente um do outro. Não ajudaria se o thread A, que deveria gravar no pipe1, esperasse de alguma forma pelo thread B, que apenas bloqueia a gravação no pipe2.