Meine Frage ist ähnlich wie "Zusammenführen zweier sortierter Dateien basierend auf Sortierwerten im gleichen Feld", sondern erweitert es auf benannte Pipes.
Angenommen, ich habe zwei Textdateien mit sortierten Ganzzahlen und möchte sie zusammenführen. Ich kann sort -nm file1.txt file2.txt > merged.txt
eine einmalige, nicht blockierende Zusammenführung durchführen.
Nehmen wir nun an, diese Dateien sind eigentlich benannte Pipes (FIFOs), die ich erstelle und dann aus Python heraus befülle. Solange ich abwechselnd in eine Pipe schreibe und dann in die nächste, ist das kein Problem. Dieser Code generiert zwei geordnete Listen von Ganzzahlen, schreibt sie in benannte Pipes, die von einem sort
Unterprozess gelesen werden, der das zusammengeführte Ergebnis in einer einzigen Datei ausgibt:
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
Ich erhalte ein sortiertes Ergebnis. ABER – jetzt bringen wir die Listen ins Ungleichgewicht, indem wir das in ändern i % 2
. i % 3
Im Original wird einfach in fifo1, dann in fifo2, dann in fifo1, dann in fifo2 usw. gedruckt. In der geänderten Version werden doppelt so viele Zeilen in eine der beiden Pipes gedruckt.
Wenn ich dies ausführe, i % 3
erhalte ich die folgende Ausgabe:
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
Es bleibt immer an der gleichen Stelle stehen. Mit strace kann ich sehen:
write
Der Python-Prozess ist bei einem Aufruf von Stelle 4 hängen geblieben :write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
Der Vorgang bleibt jedoch bei einem Aufruf von Stelle 3
sort
hängen :read
read(3,
Ein Blick auf die lsof -n -p
Ausgabe des sort
Prozesses zeigt, dass dieser darauf wartet, dass ein Wert in ankommt fifo1
, während der write
Prozess darauf wartet, einen Wert in zu schreiben fifo2
:
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
Aus irgendeinem Grund sort
hat der Prozess *aufgehört fifo2
, was dazu geführt hat, dass der Prozess hängen geblieben ist.
Wenn ich jetzt einen separaten Listener einrichte, fifo2
indem ich einfach cat fifo2
... ausgebe, beginnt der Prozess erneut und wird über Tausende von Iterationen fortgesetzt, bis ... er nun an einem anderen zufälligen Punkt (Iteration 53733) stoppt.
Ich glaube, es muss etwas geben, das ich nicht verstehe, was mit den Pufferpipes passiert und wie sort
sich das Lesen von einem Stream zum nächsten ändert. Was ich seltsam finde, ist, dass es deterministisch ist, an genau derselben Stelle fehlschlägt und nur dann fehlschlägt, wenn die Listen nicht ausgeglichen sind.
Gibt es eine Möglichkeit, das zu lösen?
Antwort1
Offensichtlich verursacht Ihr Programm einen Deadlock, wenn Sie unterschiedliche Datenmengen in die beiden benannten Pipes schreiben. Ihr Programm blockiert auf einem write
für Fifo2 (mit vollem Puffer), während der Prozess auf einem für Fifo1 (mit leerem Puffer) sort
blockiert .read
Sie wissen nicht, wie sort
es implementiert wird. Wahrscheinlich möchte es die Dateien in größeren Blöcken lesen und die Daten dann aus Effizienzgründen im Speicher verarbeiten. Das Puffern kann sogar automatisch erfolgen, wenn sort
Funktionen stdio.h
zum Lesen der Daten verwendet werden.
Benannte (und unbenannte) Pipes verwenden einen Puffer für die Daten.
Wenn der Puffer voll ist, wird ein Schreibvorgang blockiert, bis ein Lesevorgang Daten gelesen oder sein Ende geschlossen hat.
Wenn der Puffer leer ist, wird ein Lesevorgang blockiert, bis der Schreibvorgang Daten geschrieben oder sein Ende geschlossen hat.
Wenn Sie in jedem Zyklus eine Zeile in fifo1 und zwei Zeilen in fifo2 schreiben, füllen Sie den Puffer von fifo2, während der Puffer von fifo1 nur zur Hälfte gefüllt ist.
Je nachdem, wie viele Daten Ihr Programm in die FIFOs schreibt und wie viele es sort
lesen möchte, führt dies offensichtlich dazu, dass es sort
etwas aus FIFO1 lesen möchte, das nur einen leeren Puffer hat, während Ihr Programm in FIFO2 mit vollem Puffer schreiben möchte.
Das Ergebnis ist deterministisch, da der Pipe-Puffer eine feste Größe hat und wahrscheinlich auch Ihr Programm sort
feste Puffergrößen zum Lesen oder Schreiben der Daten verwendet.
Den Quellcode von GNU finden Sie sort
unter
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
Zu Beginn versucht es, in einer Schleife über alle Dateien mit der Funktion die Eingabepuffer für alle Eingabedateien zu füllen fillbuf
.
Später wird unter bestimmten Umständen fillbuf
erneut eine Eingabedatei angefordert.
In der Funktion fillbuf
gibt es einen Kommentar
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
Wählt offenbar sort
eine der Eingabedateien aus und benötigt eine bestimmte Datenmenge. Beim Lesen von Blöcken werden die Eingabedateien nicht gewechselt.
Die Implementierung funktioniert gut für normale Dateien, da ein read
Vorgang nach einiger Zeit entweder einige Daten oder EOF zurückgibt und daher nicht dauerhaft blockiert.
Es ist immer schwierig, Deadlocks zu vermeiden, wenn Sie mehr als eine Sache haben, die zwischen zwei Prozessen/Threads blockieren kann. In Ihrem Fall sollten Sie nur eine Pipe verwenden. Die Verwendung nicht blockierender Operationen kann hilfreich sein, wenn Sie immer Daten zum Schreiben in fifo1 haben, wenn fifo2 blockieren würde oder umgekehrt.
Die Verwendung von zwei Pipes könnte funktionieren, wenn Sie zwei separate Threads/Prozesse zum Schreiben in die Pipes verwenden würden, aber nur, wenn die Threads/Prozesse unabhängig voneinander arbeiten. Es würde nichts helfen, wenn Thread A, der in Pipe1 schreiben soll, irgendwie auf Thread B warten würde, der das Schreiben in Pipe2 einfach blockiert.