python: Lesen Teilprozess output in threads

Ich habe eine ausführbare Datei, die rufe ich mit subprocess.Popen. Dann will ich ihn ernähren, einige Daten über stdin mit einem thread, liest den Wert aus einer Warteschlange, die später aufgefüllt werden, in einem anderen thread. Die Ausgabe sollte so gelesen werden, mit dem stdout-pipe in einem anderen thread und wieder sortiert werden, in eine Warteschlange.

Soweit ich das verstanden habe aus meiner früheren Forschung, die Verwendung von threads mit Warteschlange ist gute Praxis.

Externe ausführbare Datei, leider wird er so schnell nicht geben Sie mir eine Antwort für jede Zeile, die Pipeline, so dass der einfach zu schreiben, "readline" - Zyklen sind keine option. Die ausführbare Datei implementiert einige interne multithreading, und ich möchte die Ausgabe, sobald es verfügbar ist, wird daher die zusätzliche Leser-thread.

Als ein Beispiel für die Prüfung der ausführbaren Datei wird nur shuffle jeder Zeile (shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

Bitte beachten Sie, dass dies bereits als unbuffered wie möglich. Oder ist es nicht? Dies ist meine abgespeckte test-Programm:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

diese geben die folgende Ausgabe (python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

dann nichts für 2 Sekunden ...

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

Das interleaving am Anfang erwartet. Jedoch ist die Ausgabe der Teilprozess wird erst angezeigt, wenn nach der Teilprozess endet. Mit mehr Zeilen geleitet, in der ich eine Ausgabe, so ich vermute ein Cache-problem in den Standardausgabe-pipe. Nach anderen Fragen, die hier gepostet flushing stdout (in der Teilprozess) sollte funktionieren, zumindest auf Linux.

InformationsquelleAutor muckl | 2012-03-21
Schreibe einen Kommentar