Anmeldung mit dem multiprocessing-Wahnsinn
Ich versuche, mithilfe von python-Standard-logging-Modul in einem multiprocessing-Szenario.
Ich habe gelesen:
und andere mehrere Beiträge über multiprocessing -, logging -, python-Klassen und so.
Nach all dem Lesen habe ich kam zu diesem Stück code kann ich nicht machen es richtig laufen, die verwendet python ' s logutils QueueHandler:
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
class Worker(Process):
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
def listener_process(queue):
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
import sys, traceback
print >> sys.stderr, 'Whoops! Problem:'
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
mpq = mpQueue(-1)
root = logging.getLogger()
h = logging.StreamHandler()
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
listener = Process(target=listener_process,
args=(mpq,))
listener.start()
workers=[]
for i in xrange(1):
worker = Worker(i, mpq)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
mpq.put_nowait(None)
listener.join()
for i in xrange(10):
l.info("testing %i"%i)
print "Finish"
Wenn der code ausgeführt wird, die Ausgabe irgendwie wiederholt sich Zeilen wie:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO testing 9
Finish
In anderen questios es ist vorgeschlagen, dass der handler Hinzugefügt wird, mehr als einmal, aber, wie Sie sehen, kann ich nur hinzufügen, die streamhanlder einmal in der main Methode.
Ich habe bereits getestet die Einbettung der main Methode in einer Klasse mit dem gleichen Ergebnis.
BEARBEITEN:
wie @max vorgeschlagen (oder, was ich glaube, er sagte), die ich geändert habe den code von der Arbeiter-Klasse als:
class Worker(Process):
root = logging.getLogger()
qh = None
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
if not self.qh:
Worker.qh = QueueHandler(self.queue)
Worker.root.addHandler(self.qh)
Worker.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
print self.root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
Mit dem gleichen Ergebnis, Jetzt ist die queue-handler wird nicht erneut Hinzugefügt und wieder, aber es gibt immer noch doppelte Einträge im log, auch mit nur einem Arbeiter.
EDIT2:
Ich habe mich verändert, den code ein wenig. Ich habe die listener-Prozess und verwendet jetzt eine QueueListener (das ist, was ich beabsichtigte, in der Anfang jedenfalls), zog der Haupt-code zu einer Klasse.
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
root = logging.getLogger()
added_qh = False
class Worker(Process):
def __init__(self, logconf, n, qh):
super(Worker, self).__init__()
self.n = n
self.logconf = logconf
# global root
global added_qh
if not added_qh:
added_qh = True
root.addHandler(qh)
root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
#print root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
class Main(object):
def __init__(self):
pass
def start(self):
mpq = mpQueue(-1)
qh = QueueHandler(mpq)
h = logging.StreamHandler()
ql = QueueListener(mpq, h)
#h.setFormatter(f)
root.addHandler(qh)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
workers=[]
for i in xrange(15):
worker = Worker(logconf, i, qh)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
print "joining worker: {}".format(worker)
worker.join()
mpq.put_nowait(None)
ql.start()
# listener.join()
for i in xrange(10):
l.info("testing %i"%i)
if __name__ == "__main__":
x = Main()
x.start()
time.sleep(10)
print "Finish"
Nun meist funktioniert, bis ich eine bestimmte Anzahl von Arbeitern (~15) wenn aus irgendeinem Grund die Main-Klasse blockiert in de beitreten und der rest der Arbeiter nichts zu tun.
- Sind Sie auf windows oder *nix?
- Ich bin mit diesem code in Linux mit python 2.7.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ich bin zu spät kommt, so dass Sie wahrscheinlich nicht brauchen, die mehr zu beantworten. Das problem kommt von der Tatsache, dass Sie haben bereits eine Prozedur, die in Ihr Haupt-Prozess, und Ihre Arbeiter, die Sie hinzufügen eine andere. Dies bedeutet, dass Sie in Ihrem Arbeitsprozess -, zwei-Handler sind in der Tat verwalten Sie Ihre Daten, die man bei drücken der log-Warteschlange, und der man das schreiben in den stream.
Können Sie dieses Problem beheben, indem Sie einfach eine zusätzliche Zeile
self.root.handlers = []
zu Ihrem code. Von den ursprünglichen code, die__init__
- Methode des worker würde dann so Aussehen:Die Ausgabe sieht nun wie folgt aus:
Dachte ich, eine ziemlich einfache Lösung mit monkeypatching. Es ist wahrscheinlich nicht robust, und ich bin nicht ein Experte mit der logging-Modul, aber es schien die beste Lösung für meine situation. Nach dem Versuch einigen code-änderungen (zu ermöglichen, übergehend in eine vorhandene logger, von
multiprocess.get_logger()
) ich wusste nicht, wie viel der code wurde verändert und kam mit einem schnellen (gut es gewesen wäre, hätte ich dies getan, in den ersten Platz) leicht zu Lesen, hack/workaround:(Beispiel arbeiten, komplett mit multiprocessing-pool)
Natürlich ist dies wahrscheinlich nicht decken die ganze Bandbreite des
logging
Nutzung, aber ich denke, das Konzept ist einfach genug hier, um die Arbeit schnell und relativ schmerzlos. Und es sollte leicht sein, Sie zu ändern (zum Beispiel die lambda-func verwirft einen möglichen Präfix übergeben werden kann, ingetLogger
).Alle Ihre
Worker
s-Aktie die gleichen root-logger-Objekt (die inWorker.__init__
-- diegetLogger
nennen immer wieder die gleichen logger). Jedoch, jedes mal, wenn Sie erstellen einWorker
fügen Sie einen Ereignishandler (QueueHandler
) zu, dass der logger.Also, wenn Sie erstellen 10 Arbeitnehmer, Sie haben 10 (identisch) - Handler auf dem root-logger, was bedeutet, dass Ausgang wird 10 mal wiederholt.
Stattdessen sollten Sie den logger ein Modul-Attribut anstelle einer Instanz-Attribut aus, und konfigurieren Sie es einmal auf der Modul-Ebene-nicht auf Klassen-Niveau.
(eigentlich Logger konfiguriert werden soll einmal an der Programm Ebene)
listener_process
Funktion ist wahrscheinlich schuldig, einige wiederholen auch. Ich weiß nicht, was Ihr Programm tun soll, aber ich habe das Gefühl, dass Sie versuchen, Schuhlöffel zu viel Zeug in, vor allem soweit die Protokollierung betroffen ist... Was passiert, wenn Sie ersetzen Sie alle logging-Aufruf mit einem print? Können Sie verstehen, warum?added_qh
in der modifizierten Frage) sind nicht in verschiedenen Prozessen genutzt. So dass jeder Ihrer Mitarbeiter wird durch denif not added_qh
block. Wieder, den Sie erstellen möchten, und konfigurieren Sie Ihre logger auf das Modul/Programm-Ebene, nicht individuell in Ihren worker-threads.