在spark结构化流中反序列化kafka avro主题时无效的int编码
我正在尝试使用Spark结构化流(版本2.3.1)从KAFKA处理流avro数据,因此我尝试使用 this de-serialize的示例. 它仅在主题value部分包含StringType的情况下起作用,但是在我的情况下,模式包含long and integers如下: public static final String USER_SCHEMA = "{" + "\"type\":\"record\"," + "\"name\":\"variables\"," + "\"fields\":[" + " { \"name\":\"time\", \"type\":\"long\" }," + " { \"name\":\"thnigId\", \"type\":\"string\" }," + " { \"name\":\"controller\", \"type\":\"int
6 2024-04-05
编程技术问答社区
Spark SQL 1.5构建失败
我已经在Ubuntu 14.04 LTS上安装了 Spark 1.5 . 使用命令build/mvn -Dscala-2.11 -DskipTests clean package运行构建时,我会在Project Spark SQL期间获得以下构建错误: [error] missing or invalid dependency detected while loading class file 'WebUI.class'. [error] Could not access term eclipse in package org, [error] because it (or its dependencies) are missing. Check your build definition for [error] missing or conflicting dependencies. (Re-run with `-Ylog-
0 2024-04-04
编程技术问答社区
为spark sql数据框架序列化/反序列化现有类
使用Spark 1.6.0 说我有这样的课 case class MyClass(date: java.util.Date, oid: org.bson.types.ObjectId) 如果我有 //rdd: RDD[MyClass] rdd.toDF("date", "oid") 我得到java.lang.UnsupportedOperationException: Schema for type java.util.Date/org.bson.types.ObjectId is not supported 现在我知道我可以使它成为java.sql.Date,但假设MyClass在其他地方依赖于其他地方,无法解决问题. 我也知道UserDefinedType选项.但是,似乎只有在您还创建一个新类以与之合作的情况下(再次,MyClass的签名需要保持相同) 是否没有办法仅注册java.util.Date和org.bson.types.ObjectId
0 2024-04-04
编程技术问答社区
spark sql中sc.broadcast和broadcast函数的区别
我已经使用sc.broadcast查找文件来提高性能. 我也知道在Spark SQL功能中有一个称为broadcast的函数. 两个之间有什么区别? 我应该使用哪一个来广播参考/查找表? 解决方案 如果要在Spark SQL中获得广播Join,则应使用broadcast函数(与所需的spark.sql.autoBroadcastJoinThreshold配置结合).它将: 标记给定广播的关系. 调整SQL执行计划. 评估产出关系时,将需要收集数据,广播以及应用正确的加入机制. SparkContext.broadcast用于处理本地对象,可用于Spark DataFrames. 其他解决方案 一个单词答案: 1)org.apache.spark.sql.functions.broadcast()函数是用户提供的,给定SQL Join的明确提示 2)sc.broadcast用于广播可读取的共享变量. 有关broadcast
2 2024-04-03
编程技术问答社区
在Spark中,一个广播对象的最大尺寸是多少?
使用dataframe 广播函数,可以将最大对象大小用于所有执行者? 解决方案 默认值为10MB,但我们已经使用了直到300 MB,由 spark.sql.autobroadcastjointhreshold . afaik,这完全取决于可用的内存.因此,对此没有明确的答案.我要说的是,它应该比大数据框架要少,您可以估计大小的数据框架大小如下... import org.apache.spark.util.SizeEstimator logInfo(SizeEstimator.estimate(yourlargeorsmalldataframehere)) 基于此,您可以通过broadcast提示到框架. 也可以看一下 Scala Doc来自 说.... 广播:如果联接的一侧具有估计的物理尺寸,则比用户可配置小 [[sqlconf.auto_broadcastjoin_threshold]]阈值 Side具有明确的广播提示(例如,用户应用了 [[o
2 2024-04-03
编程技术问答社区
读取FASTQ文件到Spark数据框中
我正在尝试将FASTQ文件读取到Spark DataFrames中.我有一些困难,因为FastQ是一种多行格式. 示例: @seq1 AGTCAGTCGAC + ?@@FFBFFDDH @seq2 CCAGCGTCTCG + ?88ADA?BDF8 是否有一种方法可以在 之类的火花数据框架中获取这些数据 +-------------+-------------+------------+ | identifier | sequence | quality | +-------------+-------------+------------+ |seq1 |AGTCAGTCGAC |?@@FFBFFDDH | |seq2 |CCAGCGTCTCG |?88ADA?BDF8 | +-------------+-------------+------------+ 感谢您的时间 解决方案 我会滑动 imp
2 2024-03-31
编程技术问答社区
在Apache Spark中根据数组中的单词过滤数据帧
我试图通过仅获取数组中包含单词的行来过滤数据集. 我使用的是包含方法,它适用于字符串,但不适用于数组.以下是代码 val dataSet = spark.read.option("header","true").option("inferschema","true").json(path).na.drop.cache() val threats_path = spark.read.textFile("src/main/resources/cyber_threats").collect() val newData = dataSet.select("*").filter(col("_source.raw_text").contains(threats_path)).show() 它不起作用,因为威胁是字符串的数组,并且包含用于字符串的工作.任何帮助将不胜感激. 解决方案 您可以在列上使用isin udf 它会像 一样 val threats_path = s
0 2024-03-30
编程技术问答社区
如何使用Spark Scala连接3个RDD's
我想使用spark rdd加入3 tables.我使用Spark SQL实现了目标,但是当我尝试使用RDD加入时,我没有得到所需的结果.以下是我使用spark SQL和output的查询: scala> actorDF.as("df1").join(movieCastDF.as("df2"),$"df1.act_id"===$"df2.act_id").join(movieDF.as("df3"),$"df2.mov_id"===$"df3.mov_id"). filter(col("df3.mov_title")==="Annie Hall").select($"df1.act_fname",$"df1.act_lname",$"df2.role").show(false) +---------+---------+-----------+ |act_fname|act_lname|ro
0 2024-03-30
编程技术问答社区
在一个分区/分组的窗口上对时间窗口进行汇总
我是新手的火花和学习. 我有这个火花数据框架.我想按日期订购,并获取由'id1','id2'和'record_type'的最新记录. 我的输入就像 data = [ ("ACC.PXP", "7246", "2018-10-18T16:20:00", "Hospital", None, "IN"), ("ACC.PXP", "7246", "2018-10-18T16:20:00", None, "Foundation", "IN"), ("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"), ("ACC.PXP", "7246", "2018-11-11T00:00:00", None, "Washington", "OUT"), ("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital"
2 2024-03-30
编程技术问答社区
如何将{键,值}对RDD的键追加到值,如何将其转换为RDD?
假设我在file1中有2个文件,数据集目录中的file2: val file = sc.wholeTextFiles("file:///root/data/dataset").map((x,y) => y + "," + x) 在上面的代码中,我试图获得一个具有值的rdd: - >值,键为单个值 假设文件名是file1并说2个记录: file1: 1,30,ssr 2,43,svr 和 file2: 1,30,psr 2,43,pvr 所需的RDD输出为: (1,30,ssr,file1),(2,43,svr,file1),(1,30,psr,file2),(2,43,pvr,file2) 我们可以实现这一目标吗? 如果可能的话,请帮助我! 解决方案 var files = sc.wholeTextFiles("file:///root/data/dataset") var yourNeededRdd = files
0 2024-03-30
编程技术问答社区
Spark 将一个 Scala 对象的所有方法注册为 UDF
例如,我有一个对象A和B object ObjectA{ def funcA1(a:String):String = "#" + a + "#" def funcA2(a:String, b: Int): String = a * b } object ObjectB{ def funcB1(a:String):String = "&" + a + "&" def funcB2(a:String, b: Int): String = a.sum + b } 我想在其他地方定义一种方法,函数如下: def registeredAllMethod(className:String):Unit = { // How to implement ? // How to implement ? } 我希望注册Allmethod的功能通过类名来传递,然后将本类中的所有方法注册到Spark的UDF中.使用如下: // If I use
2 2024-03-30
编程技术问答社区
如何在解析过程中获得无效数据的计数
我们正在使用Spark解析一个大型CSV文件,该文件可能包含无效的数据. 我们想将有效的数据保存到数据存储中,还返回我们导入了多少个有效数据以及多少无效数据. 我想知道我们如何在火花中做到这一点,读取数据时的标准方法是什么? 我当前的方法使用Accumulator,但是由于Accumulator在Spark中的工作方式. 是不准确的. // we define case class CSVInputData: all fields are defined as string val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData] val newDS = csvInput .flatMap { row => Try { val data = new DomainData() data.setScore(row.score.tri
2 2024-03-30
编程技术问答社区
如何在spark sql中把json数组<String>转换为csv?
我尝试了此查询以从LinkedIn数据获得所需的经验. Dataset filteredData = spark .sql("select full_name ,experience from (select *, explode(experience['title']) exp from tempTable )" + " a where lower(exp) like '%developer%'"); 但是我有一个错误: 最后我尝试了,但我有更多的行. Dataset filteredData = spark .sql("select full_name ,explode(experience) from (select *, explode(experience['title']) exp from te
0 2024-03-30
编程技术问答社区
如何优化下面的火花代码(scala)?
我有一些巨大的文件(19GB,40GB等).我需要在这些文件上执行遵循算法: 阅读文件 根据1列对其进行排序 获取数据的第170%: a)列出列的子集的所有不同的记录 b)将其写入火车文件 获取最后30%的数据: a)列出列的子集的所有不同的记录 b)将其写入测试文件 我尝试在SPARK中运行以下代码(使用Scala). import scala.collection.mutable.ListBuffer import java.io.FileWriter import org.apache.spark.sql.functions.year val offers = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") // Use first line of all files as header
0 2024-03-30
编程技术问答社区
spark unix_timestamp数据类型不匹配
有人可以帮助我以哪种数据类型或格式从_unixtime提交spark frof_unixtime()函数才能工作? 当我尝试以下操作时,它可以正常工作,但对current_timestamp却没有响应. from_unixtime(current_timestamp()) 响应如下: fromunixtime(currenttimestamp(),yyyy-MM-dd HH:mm:ss) 当我尝试输入 时 from_unixtime(1392394861,"yyyy-MM-dd HH:mm:ss.SSSS") 上面的类型不匹配只是失败: 错误:键入不匹配; 发现:INT(1392394861) 必需:org.apache.spark.sql.column from_unixTime(1392394861," yyyy-mm-dd HH:MM:SS.SSSS") 我想念什么?我已经尝试了许多不同的事情,并尝试阅读有关在Spark中使用日期/时间
在spark.sql中用group by选择多个元素
是否有任何方法可以在SQL Spark中逐个表分组,以选择多个元素 我正在使用的代码: val df = spark.read.json("//path") df.createOrReplaceTempView("GETBYID") 现在像以下方式进行小组: val sqlDF = spark.sql( "SELECT count(customerId) FROM GETBYID group by customerId"); 但是当我尝试时: val sqlDF = spark.sql( "SELECT count(customerId),customerId,userId FROM GETBYID group by customerId"); Spark给出了一个错误: org.apache.spark.sql.sql.analysisexception:expression'getByid.userId' 既不存在于组中,也不是聚集函数.
0 2024-03-30
编程技术问答社区
有没有人知道我如何在R中处理大数据?
在rstudio中分析推文: 我的CSV文件包含4,000,000条带有五列的推文:screet_name,text,create_at,faly_count和retweet_count. 我正在尝试使用以下代码识别主题标签的频率,但是它的运行速度太慢了几天,有时rstudio崩溃了. mydata %>% unnest_tokens(word, text, token ="tweets") %>% anti_join(stop_words, by= "word") 我已经使用其他方法来处理R中的大数据,例如: 在Spark中,我进行以下操作,但是Rstudio无法将我的数据集复制以火花.我看到我的rstudio在我的rstudio中呆了一天,而没有复制我的数据集以火花. 连接到您的火花集群: spark_conn
0 2024-03-30
编程技术问答社区
如何在Spark Scala中把NAN或Infinite的空值替换为默认值?
我正在在CSV中阅读Spark,并且将模式设置为所有DecimalType(10,0)列.当我查询数据时,我会收到以下错误: NumberFormatException: Infinite or NaN 如果我的数据框架中有NAN/NAN/NULL/INFINITE值,我想将它们设置为0.我该怎么做?这就是我试图加载数据的方式: var cases = spark.read.option("header",false). option("nanValue","0"). option("nullValue","0"). option("positiveInf","0"). option("negativeInf","0"). schema(schema). csv(... 任何帮助将不胜感激. 解决方案 如果您的NaN在多列中具有NaN值,则可以使用na.fill()填充默认值 示例: val spark = SparkSession.b
0 2024-03-30
编程技术问答社区
SparkSQL是RDBMS还是NOSQL?
最近,当我们遇到这个问题时,我正在与朋友讨论SparkSQL的功能.它们是酸性交易吗? SparkSQL是否遵循CAP定理? 我对这个领域有些新,请帮助我.预先感谢. 解决方案 SparkSQL是一种查询语言,而不是Hive或MySQL之类的存储.尽管可以注册其他人可以使用的表,但它唯一的临时性. SparkSQL支持基础数据库支持的内容. 其他解决方案 SparkSQL遵循关系数据库模型. 它不支持Hive交易("酸"). 以下是几个有用的文章: http://db-engines.com/en/system/oracle+nosql%3BSAP+iq%3Bspark+sql
0 2024-03-30
编程技术问答社区
用特殊格式压缩的火花阅读
我有一个文件.gz我需要读取此文件并将时间和文件名添加到此文件中,我有一些问题,需要您的帮助来为此点推荐一种方法. 因为文件被压缩了,第一行是用我认为由于编码问题而不是正确的格式读取的,所以我尝试了以下代码,但不起作用 implicit val codec = Codec("UTF-8") codec.onMalformedInput(CodingErrorAction.REPLACE) codec.onUnmappableCharacter(CodingErrorAction.REPLACE) 文件具有特殊的格式,我需要使用REGEX读取DataFame ==>我发现的唯一方法是使用RDD读取并将其映射到Regex,是否有任何方法可以将其直接读取到DF并通过正则态度? val Test_special_format_RawData = sc.textFile("file://"+filename.toString()) .map(line ⇒ line.rep
0 2024-03-30
编程技术问答社区