Wie führe ich Multiprozessing innerhalb von Anfragen mit dem Python Tornado Server am besten aus?
Ich bin mit dem I/O non-blocking python-server-Tornado. Ich habe eine Klasse GET
Anforderungen, die möglicherweise eine erhebliche Menge an Zeit in Anspruch (man denke im Bereich von 5-10 Sekunden). Das problem ist, dass Tornado-Blöcken, die auf diese Anforderungen, so dass nachfolgende Anfragen schnell gehalten werden, bis der slow-Anforderung abgeschlossen ist.
Ich sah Sie an: https://github.com/facebook/tornado/wiki/Threading-and-concurrency und kam zu dem Schluss, dass ich wollte eine Kombination von #3 (von anderen Prozessen) und #4 (andere threads). #4 auf Ihre eigenen Probleme hatten und ich war nicht in der Lage, zuverlässige Kontrolle wieder an den ioloop, wenn es war ein anderer thread tun, die "heavy_lifting". (Ich gehe davon aus, dass dies durch die GIL und die Tatsache, dass die heavy_lifting Aufgabe hat eine hohe CPU-Auslastung und hält ziehen zu Steuern Weg von der Haupt-ioloop, aber das ist eine Vermutung).
Also ich habe seit prototyping, wie Sie dies lösen, indem Sie tun, "schweres heben" Aufgaben innerhalb dieser langsam GET
fordert in einem separaten Prozess und dann einen callback zurück in den Tornado ioloop, wenn der Prozess abgeschlossen ist, beenden Sie die Anfrage. Dies erleichtert die ioloop mit anderen Anforderungen.
Habe ich eine einfache Beispiel zeigt eine mögliche Lösung, aber bin mal gespannt auf das feedback aus der community auf Sie.
Meine Frage ist eine doppelte: Wie kann der derzeitige Ansatz vereinfacht werden? Welche Fallstricke potenziell existieren?
Der Ansatz
- Nutzen Tornado-builtin
asynchronous
decorator wodurch ein Wunsch offen bleiben und für die ioloop, um fortzufahren. - Erzeugen Sie einen separaten Prozess für das "heavy lifting" - Aufgaben mit python -
multiprocessing
Modul. Ich versuchte zunächst, verwenden Sie diethreading
Modul, aber war nicht in der Lage zu bekommen, eine zuverlässige Verzicht von Kontrolle wieder an den ioloop. Es scheint auch, dassmutliprocessing
würde auch die Vorteile von multicores. - Starten 'watcher' - thread in den Haupt-ioloop Prozess, der die
threading
Modul, die Aufgabe ist es zu beobachten, wiemultiprocessing.Queue
für die Ergebnisse der "schwerarbeit" - Aufgabe, wenn es abgeschlossen ist. Dies war nötig, denn ich brauchte einen Weg, um zu wissen, dass die heavy_lifting Aufgabe abgeschlossen hatten, während in der Lage, noch informieren die ioloop, dass dieser Antrag wurde nun beendet. - Sicher sein, dass die 'watcher' thread gibt die Kontrolle an die main-ioloop Schleife oft mit
time.sleep(0)
Anrufe, so dass andere Anfragen weiterhin gut verarbeiten. - Wenn es eine Folge in der Warteschlange, dann fügen Sie ein Rückruf von der "watcher" - thread mit
tornado.ioloop.IOLoop.instance().add_callback()
dokumentiert zu sein, der einzige sichere Weg, um call ioloop Instanzen aus anderen threads. - Sicher sein, dann rufen Sie
finish()
im callback, um die Anforderung abzuschließen und mit der hand über eine Antwort.
Unten finden Sie einige Beispiel-code zeigt diesen Ansatz. multi_tornado.py
ist der server, die die Umsetzung der oben genannten Gliederung und call_multi.py
ist ein Beispielskript, ruft der server in zwei verschiedenen Möglichkeiten, den server zu testen. Beide tests rufen Sie den server mit 3 langsam GET
Anfragen, gefolgt von 20 schnell GET
Anfragen. Die Ergebnisse sind für beide laufen mit und ohne threading.
In die Falle läuft es mit "keine Gruppierung" die 3 langsame-Anfragen-block (jeder Einnahme ein wenig mehr als eine Sekunde). Ein paar von den 20 Anfragen schnell squeeze durch in zwischen einige der langsame Anfragen innerhalb der ioloop (nicht ganz sicher, wie das geschieht - könnte aber ein Artefakt sein, dass ich mich mit dem server und die client-test-Skript auf der selben Maschine). Der Punkt ist hier, dass alle Anfragen schnell gehalten werden, bis zu den verschiedenen Graden.
In die Falle läuft es mit threading aktiviert die 20 schnell Anfragen, die alle erste sofort und die drei slow-Anforderungen abgeschlossen werden etwa zur gleichen Zeit danach, wie Sie jeder schon parallel laufen. Dies ist das gewünschte Verhalten. Die drei langsame Anfragen von 2.5 Sekunden in parallel - in der Erwägung, dass in der non-threaded Fall die drei langsame Anfragen nehmen etwa 3,5 Sekunden insgesamt. So gibt es etwa 35% speed-up insgesamt (ich nehme an wegen der multicore-sharing). Aber noch wichtiger ist - die schnellen Anfragen wurden sofort bearbeitet leu von der langsameren.
Habe ich nicht viel Erfahrung mit Multithread-Programmierung - also während dies scheinbar funktioniert, hier bin ich neugierig zu erfahren:
Gibt es eine einfachere Möglichkeit, dies zu erreichen? Was monster lauern können innerhalb dieses Ansatzes?
(Hinweis: Eine Zukunft Kompromiss könnte sein, führen Sie einfach mehrere Instanzen von Tornado mit einem reverse-proxy wie nginx tun Lastenausgleich. Egal was ich ausführen mehrerer Instanzen mit load-balancer - aber ich bin besorgt über die nur das werfen von hardware bei diesem problem, da es scheint, dass die hardware so direkt gekoppelt ist, um das problem in Bezug auf die Sperrung.)
Beispielcode
multi_tornado.py
(Beispiel-server):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(client-tester):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
Testergebnisse
Durch ausführen python call_multi.py slow
(das blockierende Verhalten):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
Durch ausführen python call_multi.py slow_threaded
(das gewünschte Verhalten):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
InformationsquelleAutor der Frage Rocketman | 2013-03-13
Du musst angemeldet sein, um einen Kommentar abzugeben.
Wenn Sie bereit sind, zu verwenden
gleichzeitige.futures.ProcessPoolExecutor
stattmultiprocessing
, dies ist eigentlich sehr einfach. Tornado ioloop unterstützt bereitsconcurrent.futures.Future
, so werden Sie spielen schön zusammen aus der box.concurrent.futures
enthalten ist in Python 3.2+, und wurde zurück portiert auf Python 2.x.Hier ein Beispiel:
Ausgabe:
ProcessPoolExecutor
hat eine eingeschränkte API alsmultiprocessing.Pool
, aber wenn Sie nicht brauchen, die erweiterten Funktionen dermultiprocessing.Pool
lohnt es sich mit, weil die integration viel einfacher.InformationsquelleAutor der Antwort dano
multiprocessing.Pool
integriert werden können in dietornado
I/O loop, aber es ist ein bisschen chaotisch. Eine wesentlich bessere integration getan werden kann, mitconcurrent.futures
(siehe meine andere Antwort für details), aber wenn Sie fest sind, auf Python 2.x und kann nicht installieren Sie dieconcurrent.futures
portieren, hier ist, wie Sie es tun können ausschließlichmultiprocessing
:Den
multiprocessing.Pool.apply_async
undmultiprocessing.Pool.map_async
Methoden, beide haben einen optionalencallback
parameter, was bedeutet, dass beide potentiell gesteckt in einetornado.gen.Task
. Also in den meisten Fällen das ausführen von code asynchron in einem sub-Prozess ist so einfach wie diese:Wie ich schon erwähnt habe, funktioniert dies auch in die meisten Fällen. Aber wenn
worker()
wirft eine Ausnahme, diecallback
nie aufgerufen wird, was bedeutet, dass diegen.Task
nie beendet hat, und Sie immer hängen. Nun, wenn Sie wissen, dass Ihre Arbeit nie eine exception werfen (weil Sie verpackt das ganze in einetry
/except
zum Beispiel), können Sie gerne diese Methode verwenden. Allerdings, wenn Sie möchten, lassen Sie Ausnahmen entfliehen Sie dem Arbeiter, die einzige Lösung, die ich fand, war Unterklasse einige multiprocessing-Komponenten, und machen Sie Sie zu nennencallback
auch wenn der Arbeiter sub-Prozess hob eine Ausnahme:Durch diese änderungen die Ausnahme-Objekt wird zurückgegeben von der
gen.Task
, anstatt diegen.Task
hängen auf unbestimmte Zeit. Ich habe auch meine aktualisiertenasync_run
Methode die exception erneut ausgelöst, wenn Ihr zurückgekehrt, und einige andere änderungen, um eine bessere tracebacks für Ausnahmen, die in der Arbeiter-sub-Prozesse. Hier ist der vollständige code:Hier ist, wie verhält es sich für den AUFTRAGGEBER:
, Und wenn ich senden zwei gleichzeitige curl Anfragen, wir können sehen, Sie sind asynchron verarbeitet, die auf der server-Seite:
Edit:
Beachten Sie, dass dieser code wird einfacher mit Python 3, denn es stellt ein
error_callback
Schlüsselwort-argument, um alle asynchronenmultiprocessing.Pool
Methoden. Dies macht es viel einfacher zu integrieren, mit Tornado:Alles, was wir tun müssen, in unserem überschrieben
apply_async
ist, rufen Sie die Eltern mit denerror_callback
keyword argument, zusätzlich zu dencallback
kwarg. Keine Notwendigkeit zum überschreibenApplyResult
.Wir können auch schicker durch die Verwendung einer Metaklasse in unserem
TornadoPool
zu ermöglichen, seine*_async
Methoden direkt aufgerufen werden, als wären Sie Coroutinen:InformationsquelleAutor der Antwort dano
Wenn Ihr die get-Anfragen sind unter dieser lang ist, dann tornado ist der falsche Rahmen.
Ich schlage vor, Sie verwenden nginx Weg die schnell zu tornado und die langsameren auf einen anderen server.
PeterBe hat einen interessanten Artikel, in dem er betreibt mehrere Tornado-Server und wird einer von Ihnen zu sein, 'der langsame' für den Umgang mit lang Laufenden Anfragen finden Sie unter: sich Gedanken-über-io-blockiert ich würde versuchen, diese Methode.
InformationsquelleAutor der Antwort andy boot