Wie eine Kette Sellerie Aufgabe, die gibt eine Liste in einer Gruppe?
Ich eine Gruppe erstellen möchten, aus einer Liste zurückgegeben, die von einem Sellerie Aufgabe, so dass für jedes Element im task-Ergebnis-set, ein task wird zu der Gruppe Hinzugefügt.
Hier ein einfaches code-Beispiel, um zu erklären, die use-case. Die ???
sollte das Ergebnis aus der vorherigen Aufgabe.
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Ich bin wohl nicht sich das richtig, aber ich bin mir ziemlich sicher, dass es nicht sicher ist zu nennen Aufgaben, die von innen Aufgaben:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
Ich brauche nicht das Ergebnis von Sekunden die Aufgabe.
- In der Tat, nicht nennen Sie eine Aufgabe aus, eine Aufgabe. Dadurch werden deadlocks vermieden werden. Sagen Sie einem Arbeiter. Sie nennen Ihre Aufgabe, die Bande, bis worker 1, ruft dann eine zweite Aufgabe. Es gibt keine Arbeitnehmer-Prozess, die Aufgabe und alles, was hängen bleiben wird. Diese Bösartigkeit wird leicht besser, wenn Sie Arbeitnehmer, aber man wird immer binden Sie mehrere Mitarbeiter mit einer Aufgabe (und verlieren Parallelität).
Du musst angemeldet sein, um einen Kommentar abzugeben.
Können Sie diese Art von Verhalten mithilfe einer intermediate Aufgabe. Hier ist eine demonstration der Erstellung einer "Landkarte", wie die Methode, die funktioniert wie du vorgeschlagen hast.
Kredit zu Fragen, Solem, dass Sie mir diesen Vorschlag, wenn ich ihn fragte, um die Hilfe für ein ähnliches Problem.
callback = subtask(callback)
in die for-Schleife erstellen die Funktionen und löschen Sie dieclone
..clone()
. Gruppen und Akkorde können sehr Komplex (eine Gruppe von Gruppen usw.). In diesem Fall können Sie nicht einfach verwenden.clone
, denn das erzeugt nur eine flache Kopie Ihrer Rückruf-Signatur. Dies bedeutet, dass die Argumente nicht übergeben werden, korrekt. Um sicherzustellen, dass alles wie erwartet funktioniert, müssen Sie verwendendeepcopy
, wie bereits in meinem ursprünglichen Kommentar – das macht es klarer? wenn nicht, werde ich versuchen Sie es erneut.dmap
Funktion ausführen wird die Gruppe synchron, also zwei Aufgaben für die durch den Makler, in der Erwägung, dass normalerweise diegroup()
würde bewirken, dass die einzelnenprocess_item
Funktionen aufgerufen werden, die in parallel. Wenn das stimmt, gibt es keinen Unterschied zureturn [process_item(i) for i in it]
?[i for i in range(n)]
Umgerechnet nurrange(n)
? Und sollte die Liste[arg,]
werden Tupel(arg,)
?