Akka.net启动和停止时都没有活动
我正在尝试将Scala(2.4.11)中的typesafe Akka Actor发送消息到C#(1.0.4)中的Akka.net演员 我的.NET演员有一个奇怪的问题,它一直说开始了,然后停止了,但是我不知道引擎盖下发生了什么: akka.net日志: 2015-11-18 16:23:57.6168|DEBUG|Akka.Remote.Transport.ProtocolStateActor|Started (Akka.Remote.Transport.ProtocolStateActor) 2015-11-18 16:24:27.6578|DEBUG|Akka.Remote.Transport.ProtocolStateActor|Stopped 2015-11-18 16:24:42.6344|DEBUG|Akka.Remote.Transport.AkkaProtocolManager|now supervising akka://converter/syste
18 2024-04-25
编程技术问答社区
C#如何实施标记的枚举工作,Scala如何解决同一问题?
我试图了解标语在C#中的工作方式,以及如何在Scala中实现类似功能. 我感到惊讶的是,Enum是一种特殊的数据类型,它使变量能够成为一组预定义常数实际上可以是枚举的组合,并在C#中编译为C#: [Flags] enum Permissions { None = 0, Read = 1, Write = 2, Execute = 4 } // ... Permissions x = Permissions.Read | Permissions.Write; # How | Is Implemented??? 但是,当尝试在Scala中创建枚举或ADT时,我无法实现|操作员,因为我无法从中构建新的许可证和其他:权限. 目前,我知道诸如枚举和枚举以及ADT和Scala 3之类的库,但我还没有找到使用这些方法来创建标记的枚举的方法. 具体来说,我想知道|操作员如何与C#中的枚举一起工作,以及如何在Scala中实现. 有人可以解释C#中标记的
20 2024-04-25
编程技术问答社区
如何使用 Spark 结构化流逐块处理文件?
我正在处理大量文件,我想通过块来处理这些文件,假设在每批期间,我想分别处理每个50个文件. 我该如何使用火花结构化流? 我已经看到Jacek Laskowski( >)在一个类似的问题中说( spark可以从json文件中处理RDD块,并发布到Kafka主题),可以使用Spark结构化流媒体,但是我找不到任何示例. 非常感谢, 解决方案 如果使用文件源: maxfilespertrigger:每个触发器中要考虑的新文件数量的最大数量(默认:no max) spark .readStream .format("json") .path("/path/to/files") .option("maxFilesPerTrigger", 50) .load 如果使用kafka源,它将相似,但是带有maxOffsetsPerTrigger选项.
22 2024-04-23
编程技术问答社区
定期更新静态数据集的结构化流式传输
将流与静态数据集合并是结构化流的重要功能.但是在每批批次上,数据集将从数据源中刷新.由于这些来源并不总是那么动态,因此在指定的时间段内(或批处理数)缓存静态数据集将是性能增益. 在指定的期间/批处理数之后,从缓存中从源中重新加载了数据集. . 在Spark流中,我使用一个缓存的数据集对此进行了管理,并在指定数量的批量运行后取消了它,但是由于某种原因,这不再与结构化流有关. 有任何建议使用结构化流? 解决方案 我有一个开发的解决方案流静态加入:如何刷新(unpersist/persist)静态数据帧定期,这也可能有助于解决您的问题: 您可以通过使用结构化流提供的流程调度功能来做到这一点. 您可以通过创建人工"速率"流来定期刷新静态数据集的人工"速率"流来触发静态数据框架的刷新(毫无意义的 - > load-> persist).这个想法是: 最初加载staticdataframe并将其保留为var 定义一种刷新静态数据框的方法 使用以所需间隔触发
18 2024-04-23
编程技术问答社区
java.lang.ClassNotFoundException: java.time.temporal.TemporalField 当运行 Spark 代码时
这个问题与以下授予的代码提供了错误(见下文). settings = ssc.sparkContext.broadcast(Map( "metadataBrokerList_OutputQueue" -> metadataBrokerList_OutputQueue, "topicOutput" -> topicOutput)) val spec = StateSpec.function(Utils.updateState _).timeout(Minutes(2)) val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => { //trans
30 2024-04-23
编程技术问答社区
DStream 所有相同的键都应按顺序处理
i具有(键,值)类型的dstream. mapped2.foreachRDD(rdd => { rdd.foreachPartition(p => { p.foreach(x => { } )}) }) 我需要确保所有带有相同密钥的项目都在一个分区中处理,并且用一个核心处理. 如何做?我可以使用效率低下的GroupByKey吗? 解决方案 您可以使用PairDStreamFunctions.combineByKey: import org.apache.spark.HashPartitioner import org.apache.spark.streaming.dstream.DStream /** * Created by Yuval.Itzchakov on 29/11/2016. */ object GroupingDStream { def main(args: Array[String]): Unit = {
28 2024-04-23
编程技术问答社区
org.apache.spark.sql.AnalysisException: 在流数据帧/数据集上不支持非基于时间的窗口;;尽管有基于时间的窗口
我正在为火花结构化流的基于窗口的排序: val filterWindow: WindowSpec = Window .partitionBy("key") .orderBy($"time") controlDataFrame=controlDataFrame.withColumn("Make Coffee", $"value"). withColumn("datetime", date_trunc("second", current_timestamp())). withColumn("time", current_timestamp()). withColumn("temp_rank", rank().over(filterWindow)) .filter(col("temp_rank") === 1) .drop("temp_rank"). withColumn("digitalTwinId", lit(digitalTwinId)).
22 2024-04-23
编程技术问答社区
如何使用 mapWithState 提取超时的会话
我正在将代码更新到从updateStateByKey切换到mapWithState,以便基于2分钟的超时(仅用于测试目的)来获得用户的会话.每个会话都应在超时之前的会话中汇总所有流数据(JSON字符串). 这是我的旧代码: val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecord => { val parsed = Utils.parseJSON(eventRecord) val member_id = parsed.getOrElse("member_id", "") val timestamp = parsed.getOrElse("timestamp", "").toLong //The timestamp is returned twice because the first one will be used as the start time
16 2024-04-23
编程技术问答社区
Scala 库方法 Vector.sorted 使用的是什么算法?
我一直在寻找 scala文档但是到目前为止,我还没有对我的问题找到答案,即方法 使用了哪种排序算法 scala.collection.immutable.Vector.sorted 文档说这是一种稳定的形式,但不是使用的实际算法.这是合并吗? 解决方案 sorted方法是在SeqLike中实现的,似乎使用java.util.Arrays.sort进行排序.它似乎从向量构建一个数组,然后调用Arrays.sort,然后将其转换回.根据 java 6文档因此,它使用 QuickSort : 排序算法是一种调整后的QuickSort,改编自Jon L. Bentley和M. Douglas McIlroy的"工程式功能",软件实践和经验,第1卷. 23(11)P.1249-1265(1993年11月).该算法在许多数据集上提供了n*log(n)性能,这些数据集导致其他QuickSorts降低二次性能. 对于Java 7,该算法似乎有更改(同样,引用 docs ):
20 2024-04-23
编程技术问答社区
如何使用 Scala Slick 对来自不同数据库(数据源)的表执行 leftJoin?
我有2个数据库(database1和database2). database1具有带有字段ID的Table1 database2具有带有字段ID的Table2 现在如何使用Slick? SELECT tb1.`id` FROM `database1`.`table1` t1 LEFT JOIN `database1`.`table2` t2 ON t1.`id`=t2.`id` 解决方案 我在这里可能是错误的,但是大多数现有的关系数据库不允许您在单个操作中跨越多个数据库.但是,您可以使用schema可以很容易地实现您所显示的(我坚信这是您想真正实现的 - 粘贴的SQL判断). 让我们有一个例子.假设我们在Slick相关的代码中定义了两个表: // student case class Student(name: String, middleName: Option[String],
18 2024-04-22
编程技术问答社区
尝试制作 HCons List CaseClassShape
好吧,所以我想创建一个hlistcaseclassshape,让我创建克服22个Arity限制的案例类.因此,从Stefan Zeiger的代码开始 final class HListShape[Level
18 2024-04-22
编程技术问答社区
Scala Slick 3 高查询延迟
我正在使用Slick运行一个非常简单的查询.根据日志,实际的数据库调用仅需约500µs,但是我的DB呼叫与结果之间的时间更高(约200ms) 请在下面找到运行查询的代码段.它非常天真地打印在查询执行之前和之后的时间戳;)预测表是一个非常简单的4列表映射到案例类. def getPredictionById(predictionId: Int) = { println(new Date().getTime()) val r = Await.result(db.run(Tables.Predictions.filter(p => p.predictionId === predictionId).result.head), 1 second) println(new Date().getTime()) r } 调试模式下的日志很长,所以我把它们放在粘贴中. http://pastebin.com/upugqjkd 我希望总的延迟可能会
12 2024-04-22
编程技术问答社区
如何将具有非默认字段类型的案例类映射到 Slick 中的表格?
我可以将案例类映射到光滑数据库表: - case class SomeTimeStamp(id: Option[Long], timestamp: java.sql.Timestamp ) class TimeStampTable(tag: Tag) extends Table[SomeTimeStamp](tag, "TSTAMP_TABLE") { def id = column[Long]("ID", O.AutoInc, O.PrimaryKey) def time = column[java.sql.Timestamp]("TIME") def * = (id.?, time) (SomeTimeStamp.tupled, SomeTimeStamp.unapply) } 案例类的字段默认为Slick默认转换为数据库类型,因此一切都很好. ,但这不起作用,如果我的案例类中的字段不是默认数据库类型. Slick允许我使用MappedC
26 2024-04-22
编程技术问答社区
捕捉唯一密钥异常 Scala Slick
以下是我在插入数据库中使用的代码 override def create(groups: GroupEntity): Future[GroupEntity] = db.run{groupsTableQuery returning groupsTableQuery += groups} 解决方案 感谢您的更新,但是我找到了一种更简单的方法来解决该问题,这在我的路线中解决了.以下是解决方案 val saved : Future[GroupEntity] = groupRepositoryImpl.create(group) onComplete(saved){ case Success(value) => complete(saved.map(_.toJson)) case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))} 结果=>发生错误:
34 2024-04-22
编程技术问答社区
如何在普通 SQL 中按名称引用列?
我想在getResult中使用命名的参考而不是位置,因此不是这样: implicit val getCoffeeResult = GetResult(r => Coffee(r. Coffee(r.get("name"), r.get("supID"),r.get("price"))) 我可以命名结果? 解决方案 您可以通过结果集获取它: r.rs.getString("name")
22 2024-04-22
编程技术问答社区
悬挂在相位调节器中的鳞片
我遇到了2.10.3的问题,而Slick(Codegen)生成的代码.看起来与 非常相似 scalac悬挂在regexparserser regexparser regexparserser > scalac上 CodeGen生成的其他文件,但该文件仅在" Scalac:phase typer foo.scala" 中永远悬挂 我唯一可以看到的是表中的列数,导致大量val和大量的列表 def * = WordRootID :: WordID :: WordHeadID :: SynonymID :: PronunciationID :: Rank :: BNCFrequency :: CompassDifficulty :: DifficultyNormalized :: DifficultySourceCode :: COCARank :: PartOfSpeech :: AttributeNounProper :: AttributeNounGerund ::
30 2024-04-22
编程技术问答社区
在 slick 中对多个对象进行分组会生成无效 SQL
我正在编写一个查询,该查询可以计算出一个问题的分数,而执行查询时,我会得到一个psqlexception 有关模型的信息 一个Questionanswer可以有几个(至少一个)问题,因为有多种方法可以正确回答问题. 每个Questionanswerpibility都有几个问题,在下面的查询中,我们查询每个问题的分数. 有问题的查询 查询本身确实会生成SQL,但是无法执行SQL def queryMogelijkePuntenByVragenViaOpenVragen()(implicit session: Session) = { (for{ ovam
18 2024-04-22
编程技术问答社区
Scala Slick 3.0 创建表格然后插入行
我已经编写了此代码来创建一个表,然后插入几行,然后打印插入的行数. package com.example import tables._ import scala.concurrent.{Future, Await} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import slick.backend.DatabasePublisher import slick.driver.H2Driver.api._ object Hello { def main(args: Array[String]): Unit = { val db = Database.forConfig("h2mem1") try { val people = TableQuery[Persons] val set
28 2024-04-22
编程技术问答社区
巧妙连接子查询
我构建以下sql查询 SELECT v.uuid, d.start_time, d.end_time FROM visits v INNER JOIN visit_dates d ON v.uuid = d.visit_uuid WHERE v.study_environment_site_uuid = (SELECT study_environment_site_uuid FROM visits WHERE uuid = 'e4663612-39f9-4c43-bd86-c4c5a9235b03') AND v.uuid != 'e4663612-39f9-4c43-bd86-c4c5a9235b03' AND d.start_time
28 2024-04-22
编程技术问答社区