크기가 고르지 않은 두 개의 정렬된 명명된 파이프를 어떻게 병합할 수 있나요?

크기가 고르지 않은 두 개의 정렬된 명명된 파이프를 어떻게 병합할 수 있나요?

내 질문은 "동일한 필드의 정렬 값을 기준으로 두 개의 정렬된 파일을 병합합니다."를 명명된 파이프로 확장합니다.

정렬된 정수가 포함된 두 개의 텍스트 파일이 있고 이를 병합하고 싶다고 가정해 보겠습니다. sort -nm file1.txt file2.txt > merged.txt원패스 비차단 병합을 수행하는 데 사용할 수 있습니다 .

이제 이 파일이 실제로 제가 만들고 Python 내에서 채우는 명명된 파이프(FIFO)라고 가정해 보겠습니다. 한 파이프에 번갈아 쓰고 다음 파이프에 쓰는 한 이 작업은 잘 수행할 수 있습니다. 이 코드는 두 개의 정렬된 정수 목록을 생성하고 sort하위 프로세스에서 읽은 명명된 파이프에 기록하여 병합된 결과를 단일 파일로 출력하는 데 사용됩니다.

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

정리된 결과를 얻었습니다. 하지만 이제 i % 2를 로 변경하여 목록 불균형을 만들어 보겠습니다 i % 3. 원본에서는 fifo1, fifo2, fifo1, fifo2 등으로 인쇄합니다. 수정된 버전에서는 두 파이프 중 하나에 두 배 많은 라인을 인쇄합니다.

이것을 실행하면 i % 3다음과 같은 결과가 나타납니다.

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

항상 같은 자리에 멈춥니다. strace를 사용하면 다음을 볼 수 있습니다.

Python 프로세스가 write스팟 4에 대한 호출로 인해 중단되었습니다. write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

그러나 프로세스는 스팟 3에 대한 호출 sort로 인해 중단됩니다 .readread(3,

lsof -n -p프로세스 의 출력을 보면 sort에 값이 오기를 기다리고 있는 fifo1반면 write프로세스는 에 값을 쓰기를 기다리고 있음 을 알 수 있습니다 fifo2.

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

따라서 어떤 이유로 sort프로세스가 *듣기를 중지하여 fifo2프로세스가 중단되었습니다.

fifo2이제, 그냥 실행하여 별도의 리스너를 켜면 cat fifo2... 프로세스가 다시 시작되어 수천 번의 반복을 거쳐... 이제 다른 임의 지점(반복 53733)에서 중지됩니다.

sort버퍼링 파이프에서 진행되는 작업과 한 스트림에서 다음 스트림으로 읽는 방법이 어떻게 바뀌는지 이해하지 못하는 것이 있을 것 같습니다 . 나에게 이상한 점은 결정론적이며 정확히 같은 지점에서 실패하고 목록의 균형이 맞지 않을 때만 실패하는 것처럼 보인다는 것입니다.

이 문제를 해결할 수 있는 방법이 있나요?

답변1

분명히 두 개의 명명된 파이프에 서로 다른 양의 데이터를 쓰면 프로그램에서 교착 상태가 발생합니다. 귀하의 프로그램은 writefifo2(버퍼가 가득 찬 상태)에 대해 차단되는 반면 sort프로세스는 fifo1(버퍼가 비어 있는 상태)에 대해 차단됩니다 read.

sort구현 방법을 모릅니다 . 아마도 더 큰 블록의 파일을 읽은 다음 효율성을 위해 메모리에서 데이터를 처리하려고 할 것입니다. 데이터를 읽기 위해 sort의 기능을 사용하는 경우 버퍼링이 자동으로 발생할 수도 있습니다 .stdio.h

명명된(및 명명되지 않은) 파이프는 데이터에 대한 버퍼를 사용합니다.
버퍼가 가득 차면 읽기 프로세스가 일부 데이터를 읽거나 끝을 닫을 때까지 쓰기 프로세스가 차단됩니다.
버퍼가 비어 있으면 쓰기 프로세스가 일부 데이터를 쓰거나 끝을 닫을 때까지 읽기 프로세스가 차단됩니다.

매 사이클마다 fifo1에 한 줄, fifo2에 두 줄을 쓰면 fifo1의 버퍼는 절반만 채워지는 반면 fifo2의 버퍼는 채워지게 됩니다.

프로그램이 fifo에 쓰는 데이터의 양과 읽고 싶은 양에 따라 프로그램이 전체 버퍼로 fifo2에 쓰려고 하는 동안 빈 버퍼만 있는 fifo1에서 무언가를 읽으려는 sort상황이 발생합니다. sort.

sort파이프 버퍼의 크기가 고정되어 있고 프로그램도 고정되어 있으며 데이터를 읽거나 쓰는 데 고정된 버퍼 크기를 사용하기 때문에 결과는 결정적입니다.

sort다음에서 GNU의 소스 코드를 볼 수 있습니다 .
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

처음에는 function 을 사용하여 모든 파일에 대한 루프를 통해 모든 입력 파일에 대한 입력 버퍼를 채우려고 합니다 fillbuf.

나중에 일부 조건에서는 fillbuf입력 파일을 다시 호출합니다.

기능에 fillbuf주석이 있습니다

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

분명히 sort입력 파일 중 하나를 선택하고 일정량의 데이터를 원합니다. 블록을 읽는 경우 입력 파일을 전환하지 않습니다.

read이 구현은 일정 시간이 지나면 작업이 일부 데이터나 EOF를 반환하므로 영구적으로 차단되지 않으므로 일반 파일에 잘 작동합니다 .


두 프로세스/스레드 사이를 차단할 수 있는 것이 두 개 이상인 경우 교착 상태를 피하는 것은 항상 어렵습니다. 귀하의 경우 하나의 파이프만 사용해야 합니다. fifo2가 차단되거나 그 반대의 경우 항상 fifo1에 쓸 데이터가 있는 경우 비차단 작업을 사용하면 도움이 될 수 있습니다.

파이프에 쓰기 위해 두 개의 개별 스레드/프로세스를 사용하려는 경우 두 개의 파이프를 사용하는 것이 작동할 수 있지만 스레드/프로세스가 서로 독립적으로 작동하는 경우에만 가능합니다. 파이프1에 써야 하는 스레드 A가 파이프2에 쓰기를 차단하는 스레드 B를 어떻게든 기다리는 경우에는 도움이 되지 않습니다.

관련 정보