einfache dask map_partitions Beispiel
Lese ich Folgendes SO thead und nun versuche, es zu verstehen. Hier ist mein Beispiel:
import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random
df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })
def test_f(col_1, col_2):
return col_1*col_2
ddf = dd.from_pandas(df, npartitions=8)
ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
Generiert die folgende Fehlermeldung unten. Was mache ich falsch? Auch ich bin nicht klar, wie die übergabe weiterer Parameter an Funktion in map_partitions
?
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
136 try:
--> 137 yield
138 except Exception as e:
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132
TypeError: test_f() got an unexpected keyword argument 'columns'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-9-913789c7326c> in <module>()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs)
469 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP
470 """
--> 471 return map_partitions(func, self, *args, **kwargs)
472
473 @insert_meta_param_description(pad=12)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs)
3163
3164 if meta is no_default:
-> 3165 meta = _emulate(func, *args, **kwargs)
3166
3167 if all(isinstance(arg, Scalar) for arg in args):
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3129 """
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132
3133
~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback)
75 value = type()
76 try:
---> 77 self.gen.throw(type, value, traceback)
78 except StopIteration as exc:
79 # Suppress StopIteration *unless* it's the same exception that
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
148 ).format(" in `{0}`".format(funcname) if funcname else "",
149 repr(e), tb)
--> 150 raise ValueError(msg)
151
152
ValueError: Metadata inference failed in `test_f`.
Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)
Traceback:
---------
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error
yield
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
Du musst angemeldet sein, um einen Kommentar abzugeben.
Es gibt ein Beispiel in
map_partitions
docs zu erreichen, genau das, was sind Sie versuchen zu tun:Beim Aufruf
map_partitions
(genau wie beim Aufruf.apply()
aufpandas.DataFrame
), die Funktion, die Sie versuchen zumap
(oderapply
) gegeben werden dataframe als erstes argument.Im Falle von
dask.dataframe.map_partitions
dieses erste argument wird eine partition und im Falle vonpandas.DataFrame.apply
- eine ganze dataframe.Was bedeutet, dass Ihre Funktion hat, zu akzeptieren dataframe(partition) als erstes argument und und in deinem Fall könnte wie folgt Aussehen:
Beachten Sie, dass die Zuweisung einer neuen Spalte in diesem Fall passiert (d.h. es wird geplant geschehen), BEVOR Sie anrufen
.compute()
.In deinem Beispiel ordnen Sie die Spalte, NACH der Sie rufen
.compute()
, welche Art von Niederlagen der Zweck der Verwendung dask. I. e. nach Aufruf.compute()
die Ergebnisse, dass die operation in den Speicher geladen werden wenn genug Platz vorhanden ist, für diese Ergebnisse (wenn nicht bekommst du nurMemoryError
).So für Sie Beispiel Sie arbeiten könnte:
1) Funktion (mit den Spaltennamen als Argumente):
2) Verwenden
lambda
(mit Spaltennamen hardcoded in die Funktion):Update:
Anwenden-Funktion auf einer Zeile-für-Zeile-basis, hier ist ein Zitat aus dem Beitrag, den Sie verlinkt:
I. e. für die Beispiel-Funktion in Frage, es könnte so Aussehen:
Da werden Sie Ihre Anwendung auf einer Zeile-für-Zeile-basis die Funktion das erste argument wird sein, eine Reihe (d.h. in jeder Zeile des dataframe ist eine Serie).
Anwendung dieser Funktion dann könnten Sie es so nennen:
Dieser gibt eine Serie namens
'result'
.Ich denke, man könnte auch anrufen
.apply
auf jeder partition eine Funktion, aber es schaut nicht mehr effizient rufen dann.apply
auf dataframe direkt. Kann aber sein, dass Ihre tests nicht das Gegenteil beweisen.dask
werde ich brauchen, um erstellen Sie zwei Funktionen: man wird annehmen Partitionen und die zweite wirdapply
mein Ziel-Funktion auf jede Zeile (der partition) in regelmäßigenpandas
Sinn.apply
ausdask
berechnet parallel?Ihre
test_f
nimmt zwei Argumente:col_1
undcol_2
. Sie übergeben ein einzelnes argument,ddf
.Versuchen Sie so etwas wie
ValueError: Metadata inference failed in
test_f.
ValueError: Metadata inference failed in..
durch hinzufügen einermeta='dtype'
argument für die obige Aussage. Wo dtype ist Ihre erwartete Datentyp.