python: Lesen Teilprozess output in threads
Ich habe eine ausführbare Datei, die rufe ich mit subprocess.Popen. Dann will ich ihn ernähren, einige Daten über stdin mit einem thread, liest den Wert aus einer Warteschlange, die später aufgefüllt werden, in einem anderen thread. Die Ausgabe sollte so gelesen werden, mit dem stdout-pipe in einem anderen thread und wieder sortiert werden, in eine Warteschlange.
Soweit ich das verstanden habe aus meiner früheren Forschung, die Verwendung von threads mit Warteschlange ist gute Praxis.
Externe ausführbare Datei, leider wird er so schnell nicht geben Sie mir eine Antwort für jede Zeile, die Pipeline, so dass der einfach zu schreiben, "readline" - Zyklen sind keine option. Die ausführbare Datei implementiert einige interne multithreading, und ich möchte die Ausgabe, sobald es verfügbar ist, wird daher die zusätzliche Leser-thread.
Als ein Beispiel für die Prüfung der ausführbaren Datei wird nur shuffle jeder Zeile (shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Bitte beachten Sie, dass dies bereits als unbuffered wie möglich. Oder ist es nicht? Dies ist meine abgespeckte test-Programm:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
diese geben die folgende Ausgabe (python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
dann nichts für 2 Sekunden ...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
Das interleaving am Anfang erwartet. Jedoch ist die Ausgabe der Teilprozess wird erst angezeigt, wenn nach der Teilprozess endet. Mit mehr Zeilen geleitet, in der ich eine Ausgabe, so ich vermute ein Cache-problem in den Standardausgabe-pipe. Nach anderen Fragen, die hier gepostet flushing stdout (in der Teilprozess) sollte funktionieren, zumindest auf Linux.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Dein problem hat nichts damit zu tun, die
subprocess
Modul, oder threads (problematisch, wie Sie sind), oder sogar mischen Teilprozesse und threads (a sehr schlechte Idee, noch schlimmer als die Verwendung von Fäden, mit zu beginnen, es sei denn, du bist mit dem portieren von Python 3.2 s subprocess-Modul, das Sie bekommen können aus code.google.com/p/python-subprocess32) oder den Zugriff auf die gleichen Dinge von mehreren threads (wie Ihrprint
Aussagen machen).Was passiert, ist, dass Ihr
shuffleline.py
Programm Puffer. Nicht im Ausgang, sondern in den Eingang. Es ist zwar nicht sehr offensichtlich, wenn Sie Durchlaufen ein fileobject, Python wird in Blöcken gelesen, in der Regel 8k bytes. Dasys.stdin
ist ein fileobject, Ihrefor
Schleife Puffer, bis EOF oder einem vollen block:Wenn Sie möchten, dies nicht tun, verwenden Sie entweder eine while-Schleife aufrufen
sys.stdin.readline()
(die gibt''
für EOF):oder verwenden Sie die zwei-argument-form
iter()
erzeugt einen iterator, ruft die ersten argument bis zum zweiten argument ("sentinel") zurückgegeben:Ich wäre auch nachlässig, wenn ich nicht schlage vor, nicht mit threads, aber nicht-blockierende I/O auf den Teilprozess der Rohre statt, oder auch so etwas wie
twisted.reactor.spawnProcess
hat viele Möglichkeiten dafür Prozesse und andere Dinge zusammen, wie Konsumenten und Produzenten.