Schreiben in eine Datei mit multiprocessing
Ich habe Folgendes problem in python.
Muss ich tun, einige Berechnungen parallel, deren Ergebnisse ich müssen geschrieben werden sequentiell in eine Datei. So habe ich eine Funktion, die Sie empfängt, eine multiprocessing.Queue
und ein Datei-handle, die Berechnung und drucken Sie das Ergebnis in der Datei:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
Aber die Datei endet leer, nachdem das Skript ausgeführt wird. Ich habe versucht, ändern Sie die Arbeiter () - Funktion:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
und übergeben Sie den Dateinamen als parameter. Dann klappt es so wie ich gedacht. Wenn ich versuche zu tun, die gleiche Sache hintereinander, ohne multiprocessing, es funktioniert auch normalerweise.
Warum es nicht funktionierte in der ersten version? Ich sehe nicht das problem.
Außerdem: kann ich garantieren, dass zwei Prozesse nicht versuchen, eine Datei zu schreiben gleichzeitig?
EDIT:
Dank. Ich hab es jetzt. Dies ist die funktionierende version:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join ()
for p in calcProc:
p.join()
writProc.join ()
- Bitte konzentrieren. Ein Satz von code - nur. Bitte entfernen Sie veraltete oder irrelevante code. Bitte vermeiden Sie "Bearbeiten". Einfach nur die Frage zu sein, vollkommen klare, vollständige und konsistente, bitte.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Sollten Sie wirklich zwei queues und drei separaten Arten der Verarbeitung.
Put stuff in Queue #1.
Sachen aus Queue #1, und tun Berechnungen, setzen Sachen in der Warteschlange #2. Sie können viele von diesen, da Sie sich von einer Warteschlange aus und legen Sie in eine andere Warteschlange sicher.
Sachen aus Queue #2 und schreiben es in eine Datei. Sie müssen genau 1 von diesen und nicht mehr. Es ist "Eigentümer" der Datei, garantiert der atomic access, und absolut versichert, dass die Datei geschrieben wird sauber und konsequent.
Wenn jemand sucht für einen einfachen Weg, das gleiche zu tun, dies kann Ihnen dabei helfen.
Ich glaube nicht, es gibt keine Nachteile, es zu tun auf diese Weise. Wenn es gibt, bitte lasst es mich wissen.
Quelle: Python: das Schreiben auf eine einzige Datei mit dem queue während der Verwendung von multiprocessing Pool
Es ist ein Fehler in der schreib-Arbeiter-code, wenn der block falsch ist, die Arbeiter werden nie alle Daten. Sollte wie folgt sein:
Können Sie es durch hinzufügen der Zeile
nach der
queueOut.put((par,res))
Mit block=False würden Sie sich immer Zunehmender Länge der Warteschlange, bis es sich füllt, im Gegensatz zu block=True, wobei Sie immer die "1".