¿Cómo puedo fusionar dos tuberías con nombres ordenados con tamaños desiguales?

¿Cómo puedo fusionar dos tuberías con nombres ordenados con tamaños desiguales?

Mi pregunta es similar a "Fusionar dos archivos ordenados según los valores de clasificación en el mismo campo" pero extendiéndolo a canalizaciones con nombre.

Digamos que tengo dos archivos de texto con números enteros ordenados y quiero fusionarlos. Puedo usarlo sort -nm file1.txt file2.txt > merged.txtpara realizar una fusión sin bloqueo de una sola pasada.

Ahora, digamos que estos archivos en realidad son canalizaciones con nombre (FIFO) que estoy creando y luego completando desde Python. Siempre que alterne la escritura en una tubería y luego en la siguiente, puedo hacerlo bien. Este código funciona para generar dos listas ordenadas de números enteros, escribirlas en canalizaciones con nombre leídas por un sortsubproceso, que genera el resultado combinado en un solo archivo:

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

Obtengo un resultado ordenado. PERO, ahora hagamos que las listas se desequilibren cambiando i % 2a i % 3. En el original, esto simplemente imprime en fifo1, luego en fifo2, luego en fifo1, luego en fifo2, etc. En la versión modificada, imprime el doble de líneas en una de las dos tuberías.

Al ejecutar esto i % 3obtengo el siguiente resultado:

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

Siempre se detiene en el mismo lugar. Usando strace puedo ver:

El proceso de Python se colgó en una writellamada al punto 4: write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

Pero el sortproceso se bloquea ante una readllamada al punto 3: read(3,

Al observar el lsof -n -presultado del sortproceso, se muestra que está esperando que llegue un valor fifo1, mientras que el writeproceso espera escribir un valor en fifo2:

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

Entonces, por alguna razón, el sortproceso *dejó de escuchar fifo2, lo que provocó que el proceso se bloqueara.

Ahora, si pongo un oyente separado fifo2simplemente emitiendo cat fifo2... el proceso comienza de nuevo y continúa durante miles de iteraciones, hasta... ahora se detiene en otro punto aleatorio (iteración 53733).

Creo que debe haber algo que no entiendo que sucede con las tuberías de almacenamiento en búfer y cómo sortse cambia de la lectura de una secuencia a la siguiente. Lo que me resulta extraño es que es determinista, falla exactamente en el mismo lugar y sólo parece fallar cuando las listas están desequilibradas.

¿Hay alguna manera de que pueda solucionar esto?

Respuesta1

Obviamente, su programa crea un punto muerto cuando escribe diferentes cantidades de datos en las dos canalizaciones con nombre. Su programa se bloquea en a writefor one fifo2 (con el búfer lleno) mientras que el sortproceso se bloquea en a readfor fifo1 (con el búfer vacío).

No sabes cómo sortse implementa. Probablemente quiera leer los archivos en bloques más grandes y luego procesar los datos en la memoria para mayor eficiencia. El almacenamiento en búfer puede incluso ocurrir automáticamente si sortse utilizan funciones stdio.hpara leer los datos.

Las canalizaciones con nombre (y sin nombre) utilizan un búfer para los datos.
Si el búfer está lleno, un proceso de escritura se bloqueará hasta que un proceso de lectura haya leído algunos datos o haya cerrado su final.
Si el búfer está vacío, un proceso de lectura se bloqueará hasta que el proceso de escritura haya escrito algunos datos o haya cerrado su final.

Si escribe una línea en fifo1 y dos líneas en fifo2 en cada ciclo, llenará el búfer de fifo2 mientras que el búfer de fifo1 solo está medio lleno.

Dependiendo de la cantidad de datos que su programa escriba en fifos y de cuánto sortquiera leer, esto obviamente termina en una situación en la que sortquiere leer algo de fifo1 que solo tiene un búfer vacío mientras su programa quiere escribir en fifo2 con un búfer lleno. .

El resultado es determinista porque el búfer de canalización tiene un tamaño fijo y probablemente también su programa sortutiliza tamaños de búfer fijos para leer o escribir los datos.

Puedes consultar el código fuente de GNU sorten
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

Al principio intenta llenar los buffers de entrada para todos los archivos de entrada en un bucle sobre todos los archivos usando la función fillbuf.

Más tarde, bajo algunas condiciones, vuelve a solicitar fillbufun archivo de entrada.

En función fillbufhay un comentario.

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

Aparentemente sortselecciona uno de los archivos de entrada y quiere una cierta cantidad de datos. No cambia los archivos de entrada si lee bloques.

La implementación funciona bien para archivos normales porque una readoperación devolverá algunos datos o EOF después de un tiempo, por lo que no se bloqueará permanentemente.


Siempre es difícil evitar puntos muertos si tiene más de una cosa que puede bloquearse entre dos procesos/hilos. En tu caso sólo deberías utilizar una tubería. El uso de operaciones sin bloqueo puede ser útil si siempre tiene datos para escribir en fifo1 si fifo2 se bloqueara o viceversa.

El uso de dos tuberías podría funcionar si utilizara dos subprocesos/procesos separados para escribir en las tuberías, pero solo si los subprocesos/procesos funcionan de forma independiente entre sí. No ayudaría si el subproceso A, que debería escribir en la tubería1, esperara de alguna manera al subproceso B, que simplemente bloquea la escritura en la tubería2.

información relacionada