将Spark DataFrame写入Postgres数据库[英] Write spark dataframe to postgres Database

本文是小编为大家收集整理的关于将Spark DataFrame写入Postgres数据库的处理方法,想解了将Spark DataFrame写入Postgres数据库的问题怎么解决?将Spark DataFrame写入Postgres数据库问题的解决办法?那么可以参考本文帮助大家快速定位并解决问题。

问题描述

火花集群设置如下:

conf['SparkConfiguration'] = SparkConf() \
.setMaster('yarn-client') \
.setAppName("test") \
.set("spark.executor.memory", "20g") \
.set("spark.driver.maxResultSize", "20g") \
.set("spark.executor.instances", "20")\
.set("spark.executor.cores", "3") \
.set("spark.memory.fraction", "0.2") \
.set("user", "test_user") \
.set("spark.executor.extraClassPath", "/usr/share/java/postgresql-jdbc3.jar")

当我尝试使用以下代码将数据帧写入Postgres DB时:

from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df)

url_connect = "jdbc:postgresql://198.123.43.24:1234"
table = "test_result"
mode = "overwrite"
properties = {"user":"postgres", "password":"password"}

my_writer.jdbc(url_connect, table, mode, properties)

我遇到以下错误:

Py4JJavaError: An error occurred while calling o1120.jdbc.   
:java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

任何人都可以为此提供一些建议吗? 谢谢!

推荐答案

尝试write.jdbc,然后传递write.jdbc()之外创建的参数. 还要检查Postgres 9.6和Postgres 8.4的Postgres 9.6和5433的Postgres的端口.

mode = "overwrite"
url = "jdbc:postgresql://198.123.43.24:5432/kockpit"
properties = {"user": "postgres","password": "password","driver": "org.postgresql.Driver"}
data.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

其他推荐答案

您是否下载了PostgreSQL JDBC驱动程序?在此处下载: https://jdbc.postgresql.org/download.htload.html . p>

对于pyspark shell,您使用spark_classpath环境变量:

$ export SPARK_CLASSPATH=/path/to/downloaded/jar
$ pyspark

用于通过Spark-Submit提交脚本的使用-Driver-class-Path标志:

$ spark-submit --driver-class-path /path/to/downloaded/jar script.py

其他推荐答案

也许您可以尝试显式通过JDBC驱动程序类(请注意,您可能需要将驱动程序jar放入所有SPARK节点的class Path):

df.write.option('driver', 'org.postgresql.Driver').jdbc(url_connect, table, mode, properties)

本文地址:https://www.itbaoku.cn/post/1763880.html