Wie führe ich Multiprozessing innerhalb von Anfragen mit dem Python Tornado Server am besten aus?

Ich bin mit dem I/O non-blocking python-server-Tornado. Ich habe eine Klasse GET Anforderungen, die möglicherweise eine erhebliche Menge an Zeit in Anspruch (man denke im Bereich von 5-10 Sekunden). Das problem ist, dass Tornado-Blöcken, die auf diese Anforderungen, so dass nachfolgende Anfragen schnell gehalten werden, bis der slow-Anforderung abgeschlossen ist.

Ich sah Sie an: https://github.com/facebook/tornado/wiki/Threading-and-concurrency und kam zu dem Schluss, dass ich wollte eine Kombination von #3 (von anderen Prozessen) und #4 (andere threads). #4 auf Ihre eigenen Probleme hatten und ich war nicht in der Lage, zuverlässige Kontrolle wieder an den ioloop, wenn es war ein anderer thread tun, die "heavy_lifting". (Ich gehe davon aus, dass dies durch die GIL und die Tatsache, dass die heavy_lifting Aufgabe hat eine hohe CPU-Auslastung und hält ziehen zu Steuern Weg von der Haupt-ioloop, aber das ist eine Vermutung).

Also ich habe seit prototyping, wie Sie dies lösen, indem Sie tun, "schweres heben" Aufgaben innerhalb dieser langsam GET fordert in einem separaten Prozess und dann einen callback zurück in den Tornado ioloop, wenn der Prozess abgeschlossen ist, beenden Sie die Anfrage. Dies erleichtert die ioloop mit anderen Anforderungen.

Habe ich eine einfache Beispiel zeigt eine mögliche Lösung, aber bin mal gespannt auf das feedback aus der community auf Sie.

Meine Frage ist eine doppelte: Wie kann der derzeitige Ansatz vereinfacht werden? Welche Fallstricke potenziell existieren?

Der Ansatz

  1. Nutzen Tornado-builtin asynchronous decorator wodurch ein Wunsch offen bleiben und für die ioloop, um fortzufahren.
  2. Erzeugen Sie einen separaten Prozess für das "heavy lifting" - Aufgaben mit python - multiprocessing Modul. Ich versuchte zunächst, verwenden Sie die threading Modul, aber war nicht in der Lage zu bekommen, eine zuverlässige Verzicht von Kontrolle wieder an den ioloop. Es scheint auch, dass mutliprocessing würde auch die Vorteile von multicores.
  3. Starten 'watcher' - thread in den Haupt-ioloop Prozess, der die threading Modul, die Aufgabe ist es zu beobachten, wie multiprocessing.Queue für die Ergebnisse der "schwerarbeit" - Aufgabe, wenn es abgeschlossen ist. Dies war nötig, denn ich brauchte einen Weg, um zu wissen, dass die heavy_lifting Aufgabe abgeschlossen hatten, während in der Lage, noch informieren die ioloop, dass dieser Antrag wurde nun beendet.
  4. Sicher sein, dass die 'watcher' thread gibt die Kontrolle an die main-ioloop Schleife oft mit time.sleep(0) Anrufe, so dass andere Anfragen weiterhin gut verarbeiten.
  5. Wenn es eine Folge in der Warteschlange, dann fügen Sie ein Rückruf von der "watcher" - thread mit tornado.ioloop.IOLoop.instance().add_callback() dokumentiert zu sein, der einzige sichere Weg, um call ioloop Instanzen aus anderen threads.
  6. Sicher sein, dann rufen Sie finish() im callback, um die Anforderung abzuschließen und mit der hand über eine Antwort.

Unten finden Sie einige Beispiel-code zeigt diesen Ansatz. multi_tornado.py ist der server, die die Umsetzung der oben genannten Gliederung und call_multi.py ist ein Beispielskript, ruft der server in zwei verschiedenen Möglichkeiten, den server zu testen. Beide tests rufen Sie den server mit 3 langsam GET Anfragen, gefolgt von 20 schnell GET Anfragen. Die Ergebnisse sind für beide laufen mit und ohne threading.

In die Falle läuft es mit "keine Gruppierung" die 3 langsame-Anfragen-block (jeder Einnahme ein wenig mehr als eine Sekunde). Ein paar von den 20 Anfragen schnell squeeze durch in zwischen einige der langsame Anfragen innerhalb der ioloop (nicht ganz sicher, wie das geschieht - könnte aber ein Artefakt sein, dass ich mich mit dem server und die client-test-Skript auf der selben Maschine). Der Punkt ist hier, dass alle Anfragen schnell gehalten werden, bis zu den verschiedenen Graden.

In die Falle läuft es mit threading aktiviert die 20 schnell Anfragen, die alle erste sofort und die drei slow-Anforderungen abgeschlossen werden etwa zur gleichen Zeit danach, wie Sie jeder schon parallel laufen. Dies ist das gewünschte Verhalten. Die drei langsame Anfragen von 2.5 Sekunden in parallel - in der Erwägung, dass in der non-threaded Fall die drei langsame Anfragen nehmen etwa 3,5 Sekunden insgesamt. So gibt es etwa 35% speed-up insgesamt (ich nehme an wegen der multicore-sharing). Aber noch wichtiger ist - die schnellen Anfragen wurden sofort bearbeitet leu von der langsameren.

Habe ich nicht viel Erfahrung mit Multithread-Programmierung - also während dies scheinbar funktioniert, hier bin ich neugierig zu erfahren:

Gibt es eine einfachere Möglichkeit, dies zu erreichen? Was monster lauern können innerhalb dieses Ansatzes?

(Hinweis: Eine Zukunft Kompromiss könnte sein, führen Sie einfach mehrere Instanzen von Tornado mit einem reverse-proxy wie nginx tun Lastenausgleich. Egal was ich ausführen mehrerer Instanzen mit load-balancer - aber ich bin besorgt über die nur das werfen von hardware bei diesem problem, da es scheint, dass die hardware so direkt gekoppelt ist, um das problem in Bezug auf die Sperrung.)

Beispielcode

multi_tornado.py (Beispiel-server):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py (client-tester):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

Testergebnisse

Durch ausführen python call_multi.py slow (das blockierende Verhalten):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

Durch ausführen python call_multi.py slow_threaded (das gewünschte Verhalten):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
Kommentar zu dem Problem
Empfehlung - watch out für die Wand des Textes. Kommentarautor: xxmbabanexx
OK. Vorschläge? Nicht klar ist mir, zu vermitteln, alle Details, was Los ist viel mehr bündig. Kommentarautor: Rocketman
Normalerweise ist es am besten zu Fragen, lange Fragen, wie diese in mehrere kleinere. Aber ich könnte falsch sein. Also... ist die einzige Frage, wie das zu vereinfachen? Ich würde, dass an der Oberseite - mehr interessant. Kommentarautor: xxmbabanexx
Ich bin auf der Suche nach Vereinfachungen oder alternativen Ansatz. Bearbeitete ich die Frage etwas um ein wenig mehr von dem, was ich bin auf der Suche nach vorne. Kommentarautor: Rocketman
Es fällt mir auf, dass Sie vereinfachen könnten, indem Sie dieses mit einer Warteschlange, der füttert, ein Prozess-pool, z.B. wie das multiprocessing Modul. Siehe docs.python.org/2/library/... für die info. Kommentarautor: Rod Hyde

InformationsquelleAutor der Frage Rocketman | 2013-03-13

Schreibe einen Kommentar