SPARK SQL - update-MySql-Tabelle mithilfe von DataFrames und JDBC
Ich versuche, einfügen und aktualisieren von Daten auf MySql mit Spark SQL DataFrames und JDBC-Verbindung.
Ich habe erfolgreich das einfügen neuer Daten mit dem SaveMode.Append. Gibt es eine Möglichkeit, die Daten zu aktualisieren, die bereits in der MySql-Tabelle von Spark SQL?
Mein code zum einfügen ist:
myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)
Wenn ich auf SaveMode.Überschreiben löscht die gesamte Tabelle und erstellt eine neue, ich bin auf der Suche nach so etwas wie die "ON DUPLICATE KEY UPDATE" verfügbar in MySql
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ist es nicht möglich. So, jetzt (Funke 1.6.0 /2.2.0-SNAPSHOT) Funken
DataFrameWriter
unterstützt nur vier schreiben Modi:Können Sie manuell zum Beispiel mit
mapPartitions
(da willst du ein UPSERT-Vorgang idempotent sein muss und als solche einfach zu implementieren), schreiben in temporäre Tabelle und führen Sie upsert manuell oder mit Triggern.Im Allgemeinen erreichen upsert Verhalten für batch-Vorgänge und halten ordentliche Leistung ist keineswegs trivial. Sie haben sich daran zu erinnern, dass im Allgemeinen Fall gibt es mehrere Transaktionen gleichzeitig statt (eines pro-partition), so dass Sie haben, um sicherzustellen, dass es keine schreib-Konflikte (in der Regel durch die Verwendung von Anwendungs-spezifische Partitionierung) oder geben Sie die entsprechende recovery-Verfahren. In der Praxis kann es besser sein, zu führen und batch schreibt Sie in eine temporäre Tabelle und beheben upsert Teil direkt in der Datenbank.
zero323 s Antwort ist richtig, ich wollte nur hinzufügen, dass Sie verwenden könnten JayDeBeApi Paket zu umgehen:
https://pypi.python.org/pypi/JayDeBeApi/
zum aktualisieren von Daten in Ihrer mysql-Tabelle. Es könnte eine niedrig hängende Frucht, da Sie bereits mysql-jdbc-Treiber installiert.
Wir verwenden Anaconda distribution von Python und JayDeBeApi python-Paket serienmäßig.
Siehe Beispiele im link oben.
Schade, dass es keine
SaveMode.Upsert
- Modus in der Funke für diese durchaus häufigen Fällen wie upserting.zero322 ist Recht allgemein, aber ich denke, es sollte möglich sein (mit Abstrichen in der Leistung) anbieten zu können, ersetzen " - Funktion.
Wollte ich auch bieten einige java-code für diesen Fall.
Natürlich ist es nicht so performant wie die built-in eine von Funken - aber es sollte eine gute basis sein für Ihre Anforderungen. Nur ändern Sie es an Ihre Bedürfnisse anpassen:
überschreiben
org.apache.spark.sql.execution.datasources.jdbc
JdbcUtils.scala
insert into
zureplace into
Verwendung:
ps: achten Sie auf den Stillstand, nicht die Daten zu aktualisieren Häufig, einfach nur im re-run im Notfall, ich denke, das ist, warum die Funke nicht unterstützt, dies offiziell.
In PYSPARK ich war nicht in der Lage, das zu tun, also beschloss ich, die odbc verwenden.