Schreiben in eine Datei mit multiprocessing

Ich habe Folgendes problem in python.

Muss ich tun, einige Berechnungen parallel, deren Ergebnisse ich müssen geschrieben werden sequentiell in eine Datei. So habe ich eine Funktion, die Sie empfängt, eine multiprocessing.Queue und ein Datei-handle, die Berechnung und drucken Sie das Ergebnis in der Datei:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

Aber die Datei endet leer, nachdem das Skript ausgeführt wird. Ich habe versucht, ändern Sie die Arbeiter () - Funktion:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

und übergeben Sie den Dateinamen als parameter. Dann klappt es so wie ich gedacht. Wenn ich versuche zu tun, die gleiche Sache hintereinander, ohne multiprocessing, es funktioniert auch normalerweise.

Warum es nicht funktionierte in der ersten version? Ich sehe nicht das problem.

Außerdem: kann ich garantieren, dass zwei Prozesse nicht versuchen, eine Datei zu schreiben gleichzeitig?


EDIT:

Dank. Ich hab es jetzt. Dies ist die funktionierende version:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()
  • Bitte konzentrieren. Ein Satz von code - nur. Bitte entfernen Sie veraltete oder irrelevante code. Bitte vermeiden Sie "Bearbeiten". Einfach nur die Frage zu sein, vollkommen klare, vollständige und konsistente, bitte.
Schreibe einen Kommentar