python dask DataFrame, Unterstützung für (trivial parallelisierbare) Zeile anwenden?
Vor kurzem fand ich dask Modul zielt darauf ab, dass ein einfach zu bedienendes python-parallel-processing-Modul. Großer Pluspunkt für mich ist, dass es funktioniert mit pandas.
Nach der Lektüre ein wenig auf seiner manual-Seite, ich kann nicht einen Weg finden, dies zu tun trivial parallelisierbare Aufgabe:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Im moment, dies zu erreichen in dask, AFAIK,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
die hässliche syntax und ist tatsächlich langsamer als geradezu
df.apply(func, axis = 1) # for pandas DF row apply
Jede Anregung?
Edit: Danke @MRocklin für die map-Funktion. Es scheint, langsamer zu sein als nur pandas gelten. Ist das mit den pandas GIL-releasing-Problem oder mache ich es falsch?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
- Ich bin nicht vertraut mit
dask
Modul. Für mulit-processing, python-Modulmultiprocessing
funktioniert gut für mich, wenn ich zum verarbeiten einer großen dataframe Zeile-für-Zeile. Die Idee ist auch sehr einfach: verwenden Sienp.array_split
um split große dataframe in 8 Stücke schneiden und verarbeiten Sie gleichzeitig mitmultiprocessing
; Sobald es fertig ist, verwenden Siepd.concat
zu concat Sie zurück zu der ursprünglichen Länge. Für einen entsprechenden post mit vollständige Codebeispiel finden Sie unter stackoverflow.com/questions/30904354/... - Danke, sehr nett. Das problem der multiprocessing-Modul ist, dass Sie brauchen, um eine benannte Funktion (nicht lambda) und legte es der name=="main" block. Das macht die Forschung-codes schlecht strukturiert ist.
- Wenn Sie nur wollen, um eine bessere Nutzung multiprocessing kann man sich unter Multiprozess von @mike-mckerns . Sie könnten auch versuchen, aus dask core statt dask.dataframe und erstellen Sie Wörterbücher oder verwenden Sie so etwas wie github.com/ContinuumIO/dask/pull/408
Du musst angemeldet sein, um einen Kommentar abzugeben.
map_partitions
Können Sie Ihre Funktion, um alle Partitionen Ihres dataframe mit den
map_partitions
Funktion.Beachten Sie, dass func wird nur ein Teil des dataset an eine Zeit, nicht den gesamten Datenbestand wie mit
pandas apply
(die vermutlich Sie würde nicht wollen, wenn Sie wollen, um die Parallelität.)map
/apply
Können Sie anzeigen eine Funktion zeilenweise über eine Reihe mit
map
Können Sie anzeigen eine Funktion zeilenweise über ein dataframe mit
apply
Threads vs. Prozesse
Ab version 0.6.0
dask.dataframes
parallelizes mit threads. Benutzerdefinierte Python-Funktionen werden nicht erhalten viel nutzen von thread-basierte Parallelisierung. Sie könnten versuchen, Prozesse stattAber vermeiden
apply
Allerdings sollte man wirklich vermeiden
apply
mit benutzerdefinierten Python-Funktionen, sowohl in der Pandas und in Dask. Dies ist oft eine Quelle der schlechten Leistung. Könnte es sein, dass, wenn Sie einen Weg finden, um Ihren Betrieb in eine vektorisierte Art und Weise, dann könnte es sein, dass Ihr Pandas code 100x schneller und Sie müssen nicht dask.dataframe an alle.Betrachten
numba
Für Ihr spezielles problem könnten Sie in Erwägung ziehen
numba
. Dies verbessert deutlich die Leistung.Disclaimer, ich arbeite für die Firma, die macht beide
numba
unddask
und beschäftigt viele derpandas
Entwickler.Als der v
dask.dataframe
.gelten Delegierten Verantwortung zumap_partitions
: