Wie kann ich zwei sortierte Named Pipes mit unterschiedlichen Größen zusammenführen?

Wie kann ich zwei sortierte Named Pipes mit unterschiedlichen Größen zusammenführen?

Meine Frage ist ähnlich wie "Zusammenführen zweier sortierter Dateien basierend auf Sortierwerten im gleichen Feld", sondern erweitert es auf benannte Pipes.

Angenommen, ich habe zwei Textdateien mit sortierten Ganzzahlen und möchte sie zusammenführen. Ich kann sort -nm file1.txt file2.txt > merged.txteine einmalige, nicht blockierende Zusammenführung durchführen.

Nehmen wir nun an, diese Dateien sind eigentlich benannte Pipes (FIFOs), die ich erstelle und dann aus Python heraus befülle. Solange ich abwechselnd in eine Pipe schreibe und dann in die nächste, ist das kein Problem. Dieser Code generiert zwei geordnete Listen von Ganzzahlen, schreibt sie in benannte Pipes, die von einem sortUnterprozess gelesen werden, der das zusammengeführte Ergebnis in einer einzigen Datei ausgibt:

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

Ich erhalte ein sortiertes Ergebnis. ABER – jetzt bringen wir die Listen ins Ungleichgewicht, indem wir das in ändern i % 2. i % 3Im Original wird einfach in fifo1, dann in fifo2, dann in fifo1, dann in fifo2 usw. gedruckt. In der geänderten Version werden doppelt so viele Zeilen in eine der beiden Pipes gedruckt.

Wenn ich dies ausführe, i % 3erhalte ich die folgende Ausgabe:

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

Es bleibt immer an der gleichen Stelle stehen. Mit strace kann ich sehen:

writeDer Python-Prozess ist bei einem Aufruf von Stelle 4 hängen geblieben :write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

Der Vorgang bleibt jedoch bei einem Aufruf von Stelle 3 sorthängen :readread(3,

Ein Blick auf die lsof -n -pAusgabe des sortProzesses zeigt, dass dieser darauf wartet, dass ein Wert in ankommt fifo1, während der writeProzess darauf wartet, einen Wert in zu schreiben fifo2:

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

Aus irgendeinem Grund sorthat der Prozess *aufgehört fifo2, was dazu geführt hat, dass der Prozess hängen geblieben ist.

Wenn ich jetzt einen separaten Listener einrichte, fifo2indem ich einfach cat fifo2... ausgebe, beginnt der Prozess erneut und wird über Tausende von Iterationen fortgesetzt, bis ... er nun an einem anderen zufälligen Punkt (Iteration 53733) stoppt.

Ich glaube, es muss etwas geben, das ich nicht verstehe, was mit den Pufferpipes passiert und wie sortsich das Lesen von einem Stream zum nächsten ändert. Was ich seltsam finde, ist, dass es deterministisch ist, an genau derselben Stelle fehlschlägt und nur dann fehlschlägt, wenn die Listen nicht ausgeglichen sind.

Gibt es eine Möglichkeit, das zu lösen?

Antwort1

Offensichtlich verursacht Ihr Programm einen Deadlock, wenn Sie unterschiedliche Datenmengen in die beiden benannten Pipes schreiben. Ihr Programm blockiert auf einem writefür Fifo2 (mit vollem Puffer), während der Prozess auf einem für Fifo1 (mit leerem Puffer) sortblockiert .read

Sie wissen nicht, wie sortes implementiert wird. Wahrscheinlich möchte es die Dateien in größeren Blöcken lesen und die Daten dann aus Effizienzgründen im Speicher verarbeiten. Das Puffern kann sogar automatisch erfolgen, wenn sortFunktionen stdio.hzum Lesen der Daten verwendet werden.

Benannte (und unbenannte) Pipes verwenden einen Puffer für die Daten.
Wenn der Puffer voll ist, wird ein Schreibvorgang blockiert, bis ein Lesevorgang Daten gelesen oder sein Ende geschlossen hat.
Wenn der Puffer leer ist, wird ein Lesevorgang blockiert, bis der Schreibvorgang Daten geschrieben oder sein Ende geschlossen hat.

Wenn Sie in jedem Zyklus eine Zeile in fifo1 und zwei Zeilen in fifo2 schreiben, füllen Sie den Puffer von fifo2, während der Puffer von fifo1 nur zur Hälfte gefüllt ist.

Je nachdem, wie viele Daten Ihr Programm in die FIFOs schreibt und wie viele es sortlesen möchte, führt dies offensichtlich dazu, dass es sortetwas aus FIFO1 lesen möchte, das nur einen leeren Puffer hat, während Ihr Programm in FIFO2 mit vollem Puffer schreiben möchte.

Das Ergebnis ist deterministisch, da der Pipe-Puffer eine feste Größe hat und wahrscheinlich auch Ihr Programm sortfeste Puffergrößen zum Lesen oder Schreiben der Daten verwendet.

Den Quellcode von GNU finden Sie sortunter
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

Zu Beginn versucht es, in einer Schleife über alle Dateien mit der Funktion die Eingabepuffer für alle Eingabedateien zu füllen fillbuf.

Später wird unter bestimmten Umständen fillbuferneut eine Eingabedatei angefordert.

In der Funktion fillbufgibt es einen Kommentar

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

Wählt offenbar sorteine der Eingabedateien aus und benötigt eine bestimmte Datenmenge. Beim Lesen von Blöcken werden die Eingabedateien nicht gewechselt.

Die Implementierung funktioniert gut für normale Dateien, da ein readVorgang nach einiger Zeit entweder einige Daten oder EOF zurückgibt und daher nicht dauerhaft blockiert.


Es ist immer schwierig, Deadlocks zu vermeiden, wenn Sie mehr als eine Sache haben, die zwischen zwei Prozessen/Threads blockieren kann. In Ihrem Fall sollten Sie nur eine Pipe verwenden. Die Verwendung nicht blockierender Operationen kann hilfreich sein, wenn Sie immer Daten zum Schreiben in fifo1 haben, wenn fifo2 blockieren würde oder umgekehrt.

Die Verwendung von zwei Pipes könnte funktionieren, wenn Sie zwei separate Threads/Prozesse zum Schreiben in die Pipes verwenden würden, aber nur, wenn die Threads/Prozesse unabhängig voneinander arbeiten. Es würde nichts helfen, wenn Thread A, der in Pipe1 schreiben soll, irgendwie auf Thread B warten würde, der das Schreiben in Pipe2 einfach blockiert.

verwandte Informationen