如何使用Spark'的MLLib对Tweets进行矢量化?
我想将推文变成用于机器学习的向量,因此我可以使用Spark的K-均值聚类根据内容对它们进行分类. ex,所有与亚马逊有关的推文都被放入一个类别. 我尝试将推文分为单词,并使用HashingTF创建向量,这不是很成功. 还有其他方法可以矢量化推文吗? 解决方案 您可以尝试以下管道: 首先,将输入推文(位于列text)中.基本上,它创建了一个新的列rawWords作为从原始文本中获取的单词列表.要获取这些单词,它将输入文本通过字母数字单词(.setPattern("\\w+").setGaps(false)) 划分 val tokenizer = new RegexTokenizer() .setInputCol("text") .setOutputCol("rawWords") .setPattern("\\w+") .setGaps(false) 其次,您可以考虑删除停止单词以删除文本中不太重要的单词,例如 a , ,,,等. val st
2 2023-11-17
编程技术问答社区
火花管线向量装配器放弃其他列
spark VectorAssembler id | hour | mobile | userFeatures | clicked | features ----|------|--------|------------------|---------|----------------------------- 0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5] 您可以看到最后一列包含所有以前的功能.如果除去其他列,是否更好/更具性能.仅保留标签/ID和功能,或者这是不必要的开销,并且仅将标签/ID和功能馈入估算器就足够了吗? 当管道中使用VectorAssembler时会发生什么?如果未手动删除原始列,则只能使用最后一个功能,或者它会引入共线性(重复列)? 解决方案 请仔细阅读文档.每个分类器均通过特征列(featuresCol)进行参数.它不考虑任何其他列或列的顺序.
2 2023-11-17
编程技术问答社区
VectorUDT的使用
我必须获取数据类型并进行案例匹配并将其转换为某些必需的格式.但是org.apache.spark.ml.linalg.VectorUDT的用法显示VectorUDT是private.我也需要特别需要使用org.apache.spark.ml.linalg.VectorUDT而不是org.apache.spark.mllib.linalg.VectorUDT.有人可以建议如何解决这个问题吗? 解决方案 对于org.apache.spark.ml.linalg类型您应该使用org.apache.spark.ml.linalg.SQLDataTypes ,它提供了私人UDT typess singleton实例: " apache.spark.sql.types.datatype" rel =" nofollow noreferrer"> MatrixType for矩阵(org.apache.spark.ml.linalg.Matrix). scala> org.apac
0 2023-11-17
编程技术问答社区
在Spark中使用梯度提升树的输出来预测类的概率
众所周知,迄今为止,Spark中的GBT S为您提供了预测的标签. 我正在考虑试图计算一个类的预测概率(例如,所有落在某个叶子下的实例) 构建GBT的代码 import org.apache.spark.SparkContext import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.tree.GradientBoostedTrees import org.apache.spark.mllib.tree.configuration.BoostingStrategy import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel import org.apache.spark.mllib.util.MLU
0 2023-10-26
编程技术问答社区
如何使用Scala运行具有分类特征集的Spark决策树?
我的功能集具有相应的分类featuresInfo:map [int,int].但是,在我的一生中,我无法弄清楚我应该如何让决策室班级工作.它不会接受任何东西,而是标记为数据.但是,标记点需要(double,向量),其中矢量需要双倍. val LP = featureSet.map(x => LabeledPoint(classMap(x(0)),Vectors.dense(x.tail))) // Run training algorithm to build the model val maxDepth: Int = 3 val isMulticlassWithCategoricalFeatures: Boolean = true val numClassesForClassification: Int = countPossibilities(labelCol) val model = DecisionTree.train(LP, Classification, Gini,
2 2023-10-26
编程技术问答社区
java.lang.NoSuchMethodException。<类>.<init>(java.lang.String)在复制自定义Transformer时出现异常。
目前使用Spark 2.0.1和2.2.1. 在编写自定义ML变压器时,为了将其添加到管道中,我注意到复制方法的覆盖物存在问题. 在我的情况下,通过trainvalidationsplit的拟合方法调用复制方法. 我遇到的错误: java.lang.NoSuchMethodException: Custom.(java.lang.String) at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getConstructor(Class.java:1825) at org.apache.spark.ml.param.Params$class.defaultCopy(params.scala:718) at org.apache.spark.ml.PipelineStage.defaultCopy(Pipeline.scala:42) at Cust
28 2023-10-25
编程技术问答社区
如何使用 spark Naive Bayes 分类器进行 IDF 的文本分类?
我想使用tf-idf将文本文档转换为特征向量,然后训练幼稚的贝叶斯算法对其进行分类. 我可以轻松地加载文本文件,而无需标签,并使用hashingtf()将其转换为向量,然后使用IDF()根据其重要性来加重单词.但是,如果我这样做,我就摆脱了标签,即使订单相同,也似乎不可能将标签重组. 另一方面,我可以在每个单独的文档上致电Hashingtf()并保留标签,但是随后我无法在其上调用IDF(),因为它需要整个文档(并且标签都会进入)方式). 幼稚贝叶斯的火花文档只有一个示例,其中这些点已经被标记和矢量化,因此没有太大帮助. 我还看过本指南:/spark/train_a_machine_learning_model 但是在这里,他只在没有IDF的每个文档上应用哈希功能. 所以我的问题是,是否有一种方法不仅可以矢量化,而且还可以使用IDF来为幼稚的贝叶斯分类器加重单词吗?主要问题似乎是Sparks坚持仅接受标签的RDD作为NaiveBayes的输入. def
10 2023-10-19
编程技术问答社区
如何保存输入到Spark HashingTF()函数的键或索引?
基于1.4(documents = sc.textFile("...").map(lambda line: line.split(" ")) hashingTF = HashingTF() tf = hashingTF.transform(documents) 我想做这样的事情: documents = sc.textFile("...").map(lambda line: (UNIQUE_LINE_KEY, line.split(" "))) hashingTF = HashingTF() tf = hashingTF.transform(documents) 并具有结果tf变量在某处包含UNIQUE_LINE_KEY值.我只是错过了明显的东西吗?从示例中看来,没有一个好方法将document rdd与tf rdd链接. 解决方案 如果您使用Commit 85b96372cf0fd055f89fc639f45c1f2cb02a378f之后的SPARK版本(这包
0 2023-10-19
编程技术问答社区
如何使用Spark创建一个用于文本分类的TF-IDF?
我有一个具有以下格式的CSV文件: product_id1,product_title1 product_id2,product_title2 product_id3,product_title3 product_id4,product_title4 product_id5,product_title5 [...] product_idx是一个整数,product_titlex是字符串,示例: 453478692, Apple iPhone 4 8Go 我正在尝试从文件中创建TF-IDF,以便可以将其用于MLLIB的天真贝叶斯分类器. 到目前为止,我正在使用Spark为Scala使用Spark,并使用我在官方页面上找到的教程和Berkley Ampcamp > 3 和 4 . 所以我正在阅读文件: val file = sc.textFile("offers.csv") 然后我将其映射在元组RDD[Array[String]] val tu
8 2023-10-19
编程技术问答社区
用于LogisticRegression的Spark MLLib TFIDF实现
我尝试使用Spark 1.1.0提供的新的TFIDF算法.我正在Java中为MLLIB编写我的工作,但我不知道如何使TFIDF实现工作.由于某些原因>仅接受a 注意:文档行以格式[label;文字] 到目前为止,我的代码: // 1.) Load the documents JavaRDD data = sc.textFile("/home/johnny/data.data.new"); // 2.) Hash all documents HashingTF tf = new HashingTF(); JavaRDD> tupleData = data.map(new Function>() { @Override publ
10 2023-10-19
编程技术问答社区
如何在Spark ML Lib中从TF Vector RDD中获得单词的详细信息?
我在火花中使用HashingTF创建了术语频率.我对每个单词的tf.transform都使用了术语. 但结果以这种格式显示. [, ...] ,[termFrequencyofWord1, termFrequencyOfWord2 ....] eg: (1048576,[105,3116],[1.0,2.0]) 我可以使用tf.indexOf("word"). 但是,如何使用索引来获取这个词? 解决方案 好吧,你不能.由于哈希是非注射的,因此没有逆函数.换句话说,无限的令牌可以映射到一个水桶,因此无法分辨实际上是哪一个. 如果您使用的是大哈希,并且唯一令牌数量相对较低,则可以尝试从数据集中创建一个从存储桶到可能的令牌的查找表.这是一对一的映射,但是如果要满足上述条件,则需要相对较低的冲突. 如果您需要可逆转换,则可以使用c
2 2023-10-19
编程技术问答社区
在Scala Spark中使用数据框架的Naive-Bayes多指标文本分类器
我正在尝试构建一个NaiveBayes分类器,将数据库中的数据加载为包含(标签,文本)的DataFrame. 这是数据示例(多项式标签): label| feature| +-----+--------------------+ | 1|combusting prepar...| | 1|adhesives for ind...| | 1| | | 1| salt for preserving| | 1|auxiliary fluids ...| 我已使用以下转换进行令牌化,停止字,n-gram和hashtf: val selectedData = df.select("label", "feature") // Tokenize RDD val tokenizer = new Tokenizer().setInputCol("feature").setOutputCol("wor
4 2023-10-19
编程技术问答社区
为什么我的Spark SVM总是预测同一个标签?
我很难让我的SVM预测0和1的预测.看来,在我训练它并提供更多数据后,它总是想预测1或0,但是它将预测所有1或全部0,而从不组合两者.我想知道你们中的一个人是否可以告诉我我在做错了什么. 我已经搜索了" SVM总是预测相同的价值"和相似的问题,而且对于我们刚开始的机器学习的人来说,这很常见.恐怕我不明白我遇到的答案. 所以我从此开始,它或多或少有效: from pyspark.mllib.regression import LabeledPoint cooked_rdd = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [1])]) from pyspark.mllib.classification import SVMWithSGD model = SVMWithSGD.train(cooked_rdd) 我说"或多或少",因为 model.predict([0]) Out[47]: 0 是我
0 2023-10-06
编程技术问答社区
Spark MLLib SVM输出的分数是什么意思?
我不了解Spark MLLIB算法的SVM分类器的输出.我想将分数转换为概率,以便我获得属于某个类别的数据点的概率(在该类别上训练了SVM,又称多级问题)(另请参见此线程).目前尚不清楚分数的含义.它是到达超平面的距离吗?我如何从中获得概率? 解决方案 该值是边缘 - 分离超平面的距离.这不是概率,SVM通常不会给您带来概率.但是,如@CFH注释的评论,您可以尝试根据此保证金来学习概率.但这与SVM分开. 其他解决方案 import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils // Load training data in LIBSVM format. val data
4 2023-10-06
编程技术问答社区
在Spark Stream中创建一个DataFrame
我已经将Kafka流连接到了火花.除了我训练Apache Spark MLIB模型以基于流文本的预测.我的问题是,得到一个预测,我需要通过数据帧. //kafka stream val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) //load mlib model val model = PipelineModel.load(modelPath) stream.foreachRDD { rdd => rdd.foreach { record => //to get a prediction need to pass DF val toPredict
如何用Java在Spark中结合或合并两个稀疏向量?
我使用了Java的API,即Apache-Spark 1.2.0,并创建了两个解析向量,如下所示. Vector v1 = Vectors.sparse(3, new int[]{0, 2}, new double[]{1.0, 3.0}); Vector v2 = Vectors.sparse(2, new int[]{0, 1}, new double[]{4,5}); 如何获得通过组合v1和v2形成的新向量v3,因此结果应为:(5, [0,2,3,4],[1.0, 3.0, 4.0, 5.0]) 解决方案 我发现问题已经一年了,仍在待定.在这里,我自己编写了辅助功能,如下所示. public static SparseVector combineSparseVectors(SparseVector... svs) { int size = 0; int nonzeros = 0; for (SparseVector sv : svs)
34 2023-09-08
编程技术问答社区
有流媒体来源的查询必须用writeStream.start();来执行。
我正在尝试使用Spark结构化流媒体读取KAFKA的数据,并预测形式的传入数据.我正在使用使用Spark ML训练的模型. val spark = SparkSession .builder() .appName("Spark SQL basic example") .master("local") .getOrCreate() import spark.implicits._ val toString = udf((payload: Array[Byte]) => new String(payload)) val sentenceDataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe", "topicname1") .load().selectExpr("CAST(value AS STR
为什么foreachRDD不能使用StreamingContext.textFileStream用新内容填充DataFrame?
我的问题是,当我将代码更改为流媒体模式并将数据框放在foreach循环中时,数据框架显示空表!我没有填补!我也不能将其放入汇编器()中.错误是: Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U]. Unspecified value parameter mapFunc. val dataFrame = Train_DStream.map() 我的train.csv文件如下: import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.mllib.linalg.Vectors import org
4 2023-09-07
编程技术问答社区
pyspark : ml + streaming
根据与comminity spark流 + mllib 结合起来在Spark中的输入流进行预测. 给定示例的问题(在我的群集上起作用)是testdata是正确格式的给定权利. 我正在尝试基于数据字符串来设置客户端服务器TCP交换. 我不知道如何以正确格式转换字符串. 当它起作用时: sep = ";" str_recue = '0.0;0.1;0.2;0.3;0.4;0.5' rdd = sc.parallelize([str_recue]) chemin = "hdfs://xx.xx.xx.xx:8020/cart_model_for_cycliste_v2" model = DecisionTreeClassificationModel.load(chemin) # travail sur la string rdd2 = rdd.map( lambda data : data.split(sep)) rdd3
4 2023-09-07
编程技术问答社区
Spark Streaming-基于过滤器Param分割输入流的最佳方式
我目前尝试创建某种监视解决方案 - 一些数据写给Kafka,我使用Spark流并处理此数据. 为了预处理机器学习和异常检测数据,我想根据某些过滤器参数将流分开.到目前为止,我了解到Dstreams本身不能分为几个流. 我主要面临的问题是,许多算法(如Kmeans)仅采用继续数据,而不是像E.G. URL或其他一些字符串. 我的要求理想是: 阅读来自Kafka的数据,并根据我阅读的内容生成字符串列表 基于该字符串列表生成多个流 - (拆分流,过滤器流或任何最佳实践) 使用这些流来训练每个流的不同模型,以获得基线,然后将所有内容与以后的所有内容进行比较 我很乐意得到任何建议如何解决我的问题.我无法想象这种情况还没有被火花覆盖 - 但是直到现在我还没有发现工作解决方案. 解决方案 我认为,使用过滤器和映射从原始创建派生的dstream应该足够: val numericFeaturesDStream = originalDStream.filter