Como posso mesclar dois pipes nomeados classificados com tamanhos desiguais?

Como posso mesclar dois pipes nomeados classificados com tamanhos desiguais?

Minha pergunta é semelhante a "Mesclar dois arquivos classificados com base na classificação de valores no mesmo campo" mas estendendo-o para pipes nomeados.

Digamos que eu tenha dois arquivos de texto com números inteiros classificados e queira mesclá-los. Posso usar sort -nm file1.txt file2.txt > merged.txtpara fazer uma mesclagem sem bloqueio e de uma passagem.

Agora, digamos que esses arquivos sejam na verdade pipes nomeados (FIFOs) que estou criando e preenchendo a partir do python. Contanto que eu alterne a gravação em um canal e depois no próximo, posso fazer isso perfeitamente. Este código funciona para gerar duas listas ordenadas de inteiros, gravá-las em pipes nomeados lidos por um sortsubprocesso, que gera o resultado mesclado em um único arquivo:

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

Eu recebo um resultado classificado. MAS - agora vamos desequilibrar as listas alterando o i % 2para i % 3. No original, isso imprime apenas em fifo1, depois fifo2, depois fifo1, depois fifo2, etc. Na versão modificada, ele imprime o dobro de linhas em um dos dois tubos.

Executando isso, i % 3obtenho a seguinte saída:

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

Ele sempre para no mesmo lugar. Usando strace posso ver:

O processo python desligou em uma writechamada para o ponto 4: write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

Mas o sortprocesso é interrompido em uma readchamada para o spot 3: read(3,

Observar a lsof -n -psaída do sortprocesso mostra que ele está aguardando a chegada de um valor para fifo1, enquanto o writeprocesso aguarda para gravar um valor para fifo2:

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

Então, por algum motivo, o sortprocesso *parou de ouvir fifo2, fazendo com que o processo travasse.

Agora, se eu colocar um ouvinte separado fifo2apenas emitindo cat fifo2... o processo começa novamente e continua por milhares de iterações, até... parar agora em outro ponto aleatório (iteração 53733).

Acho que deve haver algo que não entendo acontecendo com os pipes de buffer e como sortestá mudando a leitura de um fluxo para outro. O que é estranho para mim é que é determinístico, falhando exatamente no mesmo ponto, e só parece falhar quando as listas estão desequilibradas.

Existe alguma maneira de resolver isso?

Responder1

Obviamente, seu programa cria um impasse quando você grava diferentes quantidades de dados nos dois pipes nomeados. Seu programa bloqueia em umwrite for one fifo2 (com buffer cheio) enquanto o sortprocesso bloqueia em readfor fifo1 (com buffer vazio).

Você não sabe comosort é implementado. Provavelmente deseja ler os arquivos em blocos maiores e depois processar os dados na memória para obter eficiência. O buffer pode até acontecer automaticamente se sortusar funções de stdio.hleitura dos dados.

Pipes nomeados (e não nomeados) usam um buffer para os dados.
Se o buffer estiver cheio, um processo de gravação será bloqueado até que um processo de leitura leia alguns dados ou feche seu final.
Se o buffer estiver vazio, um processo de leitura será bloqueado até que o processo de gravação tenha gravado alguns dados ou encerrado.

Se você escrever uma linha em fifo1 e duas linhas em fifo2 em cada ciclo, você preencherá o buffer do fifo2 enquanto o buffer do fifo1 estará preenchido apenas pela metade.

Dependendo de quantos dados seu programa grava no fifos e quanto sortdeseja ler, isso obviamente acaba em uma situação em que você sortdeseja ler algo do fifo1 que possui apenas um buffer vazio, enquanto seu programa deseja gravar no fifo2 com um buffer cheio .

O resultado é determinístico porque o buffer do pipe tem um tamanho fixo e provavelmente também o seu programa e sortusa tamanhos de buffer fixos para ler ou gravar os dados.

Você pode ver o código fonte do GNU sortem
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

No início, ele tenta preencher os buffers de entrada para todos os arquivos de entrada em um loop sobre todos os arquivos usando function fillbuf.

Mais tarde, sob algumas condições, ele solicita fillbufnovamente um arquivo de entrada.

Na função fillbufhá um comentário

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

Aparentemente sortseleciona um dos arquivos de entrada e deseja uma certa quantidade de dados. Ele não alterna os arquivos de entrada se estiver lendo blocos.

A implementação funciona bem para arquivos normais porque uma readoperação retornará alguns dados ou EOF após algum tempo, portanto não será bloqueada permanentemente.


É sempre difícil evitar conflitos se você tiver mais de uma coisa que pode bloquear entre dois processos/threads. No seu caso você deve usar apenas um tubo. Usar operações sem bloqueio pode ajudar se você sempre tiver dados para gravar no fifo1 se o fifo2 for bloqueado ou vice-versa.

Usar dois pipes pode funcionar se você usar dois threads/processos separados para gravar nos pipes, mas somente se os threads/processos funcionarem independentemente um do outro. Não ajudaria se o thread A, que deveria gravar no pipe1, esperasse de alguma forma pelo thread B, que apenas bloqueia a gravação no pipe2.

informação relacionada