broadcast - 如何在SparkSQL的UDF中使用外部全局变量?
问 题 测试代码如下: object GuangBoTest { var y: Broadcast[String] = null def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("x").getOrCreate() y = sparkSession.sparkContext.broadcast("y") sparkSession.sqlContext.udf.register("test", test _) //行号14 val testSQL = "select test('x') as result" sparkSession.sql(testSQL).show sparkSession.stop } def test(s: String): String = { s +
836 2022-07-19
编程技术问答社区
scala - spark core 有没有类似连表查询的操作
问 题 问题是这样的 val list= List((1,2),(3,5),(6,9)) val list2= List((3,4),(5,9),(9,12)) 这两个list我写入rdd,然后想让list每一个元素的value查找list2的key进行合并,像这样 (3,5)(5,9)=> (5,9) 请问如何可以做到 解决方法: val list= List((1,2),(3,5),(6,9)) val list2= List((5,4),(5,9),(9,12)) val spark = new SparkContext(conf) val rdd1 = spark.parallelize(list).map(x=>(x._2,x._1)) val rdd2 = spark.parallelize(list2) val union3=rdd1.join(rdd2).values
160 2022-07-19
编程技术问答社区
使用本地R(sparkr)接口在远程spark(EC2)上执行命令挂起
你好, 我正在尝试使用 SparkR(来自本地 R-GUI)运行一些 spark 命令.为了在 EC2 上设置 spark 集群,我使用了来自 (https://edgarsdatalab.com/2016/08/25/setup-a-spark-2-0-cluster-r-on-aws/) 的大部分命令稍加修改即可安装最新版本.我所要做的就是使用 SparkR 包从我的本地 R-GUI 与远程 spark(在 EC2-Ubuntu 上)进行交互. **这是我的设置(一步一步):** 1. 我的 PC 上装有 Windows 8.1,带有 R3.3.3 和 SparkR 包. 2. 我创建了一个 AWS-EC2 实例(免费套餐帐户)并使用来自 Amazon 的现有 Ubuntu 映像. 3. 在我的本地 PC 上安装 PuTTy.使用 PuTTy 终端连接到 Ubuntu-16(在 EC2 上)并将其用于下面的步骤 4 到 10. 4.在EC2上安装Java然后spa
140 2022-07-19
编程技术问答社区
Spark scala从文件中计数偶数
大家好, 我是大数据世界的新手.需要我们的帮助才能使它成为现实.这是我的问题 我正在从 txt 文件中读取数据(1,2,3,4,4,4,4) var file=sc.textFile("file:///home/cloudera/MyData/Lab1/numbers.txt") var number=file.flatMap(line=>line.split(",")) var intNumbers=number.map(num=>num.toInt)//错误 intNumbers.collect() java.lang.NumberFormatException:对于输入字符串:“" 由于数组的最后一个元素无法转换为 Int ,因此出现错误 请帮忙 提前致谢. 问候, 学习火花 我尝试过的: 在文件中添加数据,如 (1,2,3,4,4,4,4,) 但仍然显示相同的错误 Array[String] = Array(1,2,3,4
440 2022-07-19
编程技术问答社区
Spark里一行scala代码看不懂?
问 题 就是KMeans.scala里的 val sums = Array.fill(runs, k)(Vectors.zeros(dims)) 这句, 前面fill就是生成一个长度为runs,每个值是k的数组,后面传入一个Vector是什么意思? 解决方案 函数的currying,你可以当成三个参数
214 2022-07-19
编程技术问答社区
macos - Spark目录下没有lib目录,-bash: sbt/sbt: Permission denied
1、想要解决的问题: Spark目录下没有lib目录。 2、一句话描述: Spark目录下没有lib目录, 于是我使用“sbt/sbt assembly”命令打包,但是mac终端提示-bash: sbt/sbt: Permission denied 3、详细情况: 在用Intellij创建Scala项目时,我参考的《Spark开发环境配置及流程》提到: “ 选择菜单中的“File”→“project structure”→“Libraries”,然后点击“+”导入spark-assembly-1.2.0-hadoop2.4.0.jar。 这个jar包包含Spark的所有依赖包和Spark源码。一开始我们下载的Spark版本是预编译版本的(见《Spark on yarn搭建过程》 ),所以这个包可以在解压的Spark目录下lib目录里找到,假如说下载的没有编译的,需要通过sbt/sbt assembly命令打包。 ” 于是我在spark源代码目录
804 2022-07-19
编程技术问答社区
scala - Spark1.6用Maven编译,总是在MQTT这里就失败
问 题 [INFO] Spark Project External Flume ....................... SUCCESS [ 8.670 s] [INFO] Spark Project External Flume Assembly .............. SUCCESS [ 3.832 s] [INFO] Spark Project External MQTT ........................ FAILURE [ 11.449 s] [INFO] Spark Project External MQTT Assembly ............... SKIPPED [INFO] Spark Project External ZeroMQ ...................... SKIPPED [ERROR] Failed to execute goal on project spark-streaming-mqtt_2.10
326 2022-07-19
编程技术问答社区
scala - 如何把Spark RDD中的内容按行打印出来?
问 题 请问我想把最后wordcounts里的内容按行打印出来要怎样编写代码?,向下面这样: means 1 under 2 this 3 ... Hadoop 流行的一个通用的数据流模式是 MapReduce。Spark 能很容易地实现 MapReduce: scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 这里,我们结合 flatMap, map 和 reduceByKey 来计算文件里每个单词出现的数量,它的结果是包含一组(String, Int) 键值对 的 RDD。我们可以使用 [collect] 操作在我们的 sh
1016 2022-07-19
编程技术问答社区
scala - Spark命令行中正常执行的代码,用Maven编译报错
Hi,all 我在idea中编写Spark程序,使用Maven编译会报错,但是将这段报错的代码贴到Spark-shell中能正常执行。这个问题很奇怪,求各位大神帮忙看下! spark-shell中执行效果 使用Maven编译的报错结果信息 代码如下: case class ontime_table_schema(ratio: Double, score: Int) //准时率配置表 val ontime_table_arr = Array( Array(0.00, 0), Array(0.90, 80), Array(0.97, 120) ) //生成准时率DataFrame val ontime_table_rdd = sc.parallelize(ontime_table_arr) val ontime_table_df = ontime_table_rdd.
508 2022-07-19
编程技术问答社区
spark-shell - I have some question on spark.
问 题 I'm running an example using this command: ./bin/run-example org.apache.spart.examples.SparkPi local[3] Spark Command: /usr/lib/jvm/java-7-openjdk-amd64/bin/java -cp /usr/local/spark/spark-1.6.2-bin-hadoop2.6/conf/:/usr/local/spark/spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar:/usr/local/spark/spark-1.6.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/spark-1.6.2-bin-hadoop2.6/lib/datanucleus-rdbm
160 2022-07-19
编程技术问答社区
如果我通过spark访问数据,我可以用impala在列级别控制数据库表访问吗
有人可以帮我解决这个问题吗:). 我们使用 Impala 查询数据,使用 Sentry 限制列级别的数据访问. 我们使用 Spark 编写代码来查询存储在文件中的数据.我的理解是,与 Spark 一起使用时,哨兵角色无法控制列级别的访问.但是,有人建议有一种方法可以将 Spark 与 Impala 一起使用,以允许编写代码以通过 Spark 访问数据,但仍应用 Sentry 角色来控制列级别的访问.这是正确的吗,因为我在任何地方都找不到这方面的任何信息. 我尝试过的: 这是一个理论上的问题,我已经尝试搜索信息但找不到任何东西. 解决方案 Impala 和 Spark 是两个单独的 SQL 引擎,用于与Hadoop...一个不能使用另一个的特性!!! 所以,不,如果你使用 Impala,就没有 Spark,如果你使用 Spark,就没有 Impala...
564 2022-07-19
编程技术问答社区
scala - 如何将多个dataframe合并
问 题 我在计算数据的feature,id 是每个item的唯一标志,在数据库里是主键. 我每次计算feature的时候是一个一个计算的。也就是我得到的 dataframe1 = (id,feature1) dataframe2 = (id,feature2) .... 我希望把数据最后整合成(id,feature1,feature2,feature3,....) 除了手动join,有没有快捷的办法?(feature有很多大概快100个,我觉得这样太没效率了) 解决方案 map成pairRdd,union起来,reduce合并json,然后通过map把id添加到json里头变成rdd再通过sqlContext.read().json转回df。 当然你也可以做sql拼装……
1742 2022-07-19
编程技术问答社区
scala - Spark: 如果我想使用美元号来表示数据列,应该引入哪个包?
问 题 我看到文档里写 Spark 中可以用 $"name" 的形式表示 name 这一列。 我的代码类似这样的: initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_") 但是编译错误,提示我: value $ is not a member of StringContext 我应该引入哪个包才能使用这种语法? PS:我用的 spark 版本是 1.5,scala 版本是 2.10.4 解决方案 spark2.0下面:参考 import spark.implicits._ 老版本(未亲测) import sqlContext.implicits._
194 2022-07-19
编程技术问答社区
scala - Spark Distinct操作的DAG问题
问题 我创建了一个分成两个节点的List val list = sc.parallelize(List(1,1,1,1,2,2,2,3,3,4),2) 现在对其进行distinct操作 list.distinct.collect 执行时的DAG图如下所示 请问,这个Stage0中的distinct与Stage1中的distinct的区别。 我是这么理解的,首先distinct在各个节点做一次transformation,然后再shuffle做一次transformation。不知道是否正确。
228 2022-07-19
编程技术问答社区
spark-streaming - spark streaming 集成 kafka,使用window时出现错误
问 题 当使用 spark streaming 2.0.0 集成 kafka 0.10.0时出现 KafkaConsumer 多线程争用的问题。 部分代码如下: val ssc = new StreamingContext(sc, Seconds(5)) val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)) val data = stream.map(_.value()) val word = data.map(word => (word,1)) val result = data.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y },
820 2022-07-17
编程技术问答社区
spark-shell - spark sql和hive的问题?
问 题 看到spark sql兼容hive,并有一个hive on spark的项目 那完全使用spark sql自己的解析,和hive on spark 让hive借助spark运行有什么不同? 哪个更高效? hive是否能提供比spark sql原生更好的sql支持,是否需要学习hive语法? 解决方案 总体上差不太多,hive on spark只是相当于给hive提供了一个新的计算引擎,将hive的sql解析成spark的DAG去计算。spark sql也差不多。不过,hive在sql上的优势是可以直接利用 hive cli 来进行交互操作,比起spark sql要方便不少。
254 2022-07-17
编程技术问答社区