Rx根据条件将流划分为段(列表)
我有一个RX生产商,它创建了类似的字符串流(真实流的简化版本): A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6.... 流是无穷无尽的,但订购了.因此,在以A开始的字符串耗尽后,B开始.当B用完时,C开始... Z用完时,我们移至AA1等.每封信10-30个实例. 我正在寻找一种将此流分为所有A的块的方法:A1 A2 A3,所有B的:B1 B2,所有C:C1 C2 C3 C4 C5 C6等.每个块都可以观察到(我'将变成列表)或简单的列表. 我使用RXJAVA尝试了几种不同的方法,所有方法都失败了.不起作用的事情是: 组成的组:由于流是无止境的,因此每个字母可观察到无法完成.因此,当A耗尽并且B的开始时,A的可观察到无法完成.因此,观察到的数量越来越多. 窗口/缓冲区具有独特的untilchanged - 我在原始流上使用"独特的untilchanged"来输出每个组的第一项(第一个A,第一个B等).然后,我将该流作为window的
16 2024-04-19
编程技术问答社区
如何使用Observables实现轮询?
我有一个参数化的rest调用,应每五秒钟执行不同的参数: Observable restCall = api.method1(param1); 我需要创建一个Observable,该>将每5秒对RESTCALL进行一次对param1值的不同值.如果API调用失败,我需要遇到错误并在5秒内进行下一个呼叫.只有在RESTCALL完成(成功/错误)时才能测量呼叫之间的间隔. 我目前正在使用rxjava,但是一个.net示例也很好. 解决方案 简介 首先,我是一个.net的家伙,我知道这种方法使用了一些在Java中没有直接等效的成语.但是,我愿意接受您的话,并以.net家伙会喜欢的一个很好的问题,希望这将使您走上RX-Java的正确道路,这是我从未看过的.这是一个很长的答案,但这主要是解释 - 解决方案代码本身很短! 使用 我们将首先对一些工具进行分类以帮助解决此解决方案.首先是使用Either
12 2024-04-13
编程技术问答社区
使用transformClassesWithDesugarForDebug的错误
我很难编译APK(调试或发布). Android Studio 3.0 Beta 5 这是我的 build.gradle (app) buildscript { repositories { maven { url 'https://maven.fabric.io/public' } } dependencies { classpath 'io.fabric.tools:gradle:1.+' } } repositories { maven { url 'https://maven.fabric.io/public' } } apply plugin: 'com.android.application' apply plugin: 'io.fabric' android { compileSdkVersion 26 buildToolsVersion "26.0.1"
14 2024-04-04
编程技术问答社区
RxAndroidBle保持一个持久的连接+写/通知处理
我正在构建一个关于蓝牙低能的特定要求的Android应用程序. 我需要写入仅写入特征并在单独的通知特征上接收响应,我需要在许多许多活动中进行.是否有RX在第一个特征上发送请求的方法,请在第二个特征上等待答案,然后继续进行另一个请求? 另外,要分享我的rxandroidble实例,我想到了做某种blemanager singleton,我可以在其中揭露观察力,这样我就可以轻松地在主持人中订阅它们.我只想避免复制每个活动的连接逻辑,并具有(理想情况下)持久连接.这样,我只能公开Connection Observable并订阅它,因此我可以轻松地发送书写请求并获取通知,但是我确定有更好的方法可以做到. 这就是我现在拥有的: @Singleton public class BleManager { private PublishSubject disconnectTriggerSubject = PublishSubject.create(); pr
10 2024-04-01
编程技术问答社区
如何处理auth0 403错误而不需要到处添加特定的代码(Retrofit/okhttp/RxAndroid)。
我正在使用auth0,这给了我一个JWT(JSON Web令牌)和一个刷新.我在HTTP标题中使用此JWT与后端进行通信. 可能会发生服务器给我一个403,当它决定JWT已过期时.在这种情况下,我可以使用Refreshtoken要求Auth0向我发出新的JWT.这意味着我将Auth0 Backend称为"刷新",它给了我一个新的JWT,然后可以在我的请求中使用. 我的问题是,如何在我的所有网络代码中有效地编写此行为?我将有几个终点可以交谈,他们都可能返回403. 我认为我应该首先制作一个将JWT添加到所有请求中的拦截器. 然后应该有一个行为来检测403,悄悄地对Auth0进行网络电视,检索新的JWT.然后,应将原始请求再次尝试,并在其标题中使用新的JWT. 所以我希望将这个403处理在我的其他代码上看不见的地方,绝对不必在任何地方重写. 关于如何实现这一目标的任何指示. - 要清楚,我基本上是在寻找有关如何使用Rxandroid可观察物来实
18 2024-03-19
编程技术问答社区
如何用RxJava Observer替换Asynctask?
我有一个带有Room数据库的测试项目.使用Asynctask,我可以成功地将带有一些测试数据的对象插入数据库.我正在尝试学习RxJava,然后用RxJava's observer替换Asynctask,但它行不通.我已经阅读了很多文档并观看了教程,但我认为我不明白.这是相关代码: 在这里,我设置了我的Room对象,其中List的数据: for(ObjectForArray item: listToDatabase) { myRoomEntity.setName( item.getName() ); Log.d( "TAG", myRoomEntity.getName() ); } 然后,我尝试使用RxJava Observable将数据插入数据库.最初是使用Asynctask成功完成的: Observable myRX = Observable .just(myRoomEnt
22 2024-03-18
编程技术问答社区
使用RxJava/Jersey2的异步RestAPIs。线程问题?
我们正在使用反应性编程来制作REST API. 如图所示,我们将3层与以前的同步API设计相同. http://oi59.tinypic.com/339hhhki.jpg 使用jersey2实施的API层,该层将处理请求/应对json并移交到服务层. 使用反应性编程(rxjava)实现业务遗传的服务层 dao层,用于按服务层进行持续操作.由于我们使用couchbase,这将使用couchbase rxclient. 我理解流量如下: a) http请求是出现的,汉TER将在"容器线程池"中处理请求/parse json/delelialize请求模型. b)在jersey2 async支持的情况下,RequestThread将返回到容器线程池,并且服务层将在Schedulers.computation()Scheduler中执行. @Path("/resource") public class AsyncUserResource { @GET
12 2024-03-18
编程技术问答社区
合并同一类型的观察者(RxJava)。
我有一个代码流,它可以生成相同类型的可观察到的值.然后,我全部浏览它们,将它们结合在一起,并将结果作为可观察的.目前,我将Zip与Funcn一起使用,这似乎很恐怖,我想我错过了某个地方的重点.这是一个使用地图的示例,显然是胡说八道,但您明白了. final ImmutableList.Builder> observables = ImmutableList.builder(); for (String key: keys) { if (someTest(key)) { observables.add(generateObservableMap(key)); } } return Observable.zip(observables.build(), data -> { final Map result = Maps.newHashMap();
26 2024-03-18
编程技术问答社区
如何在出现特定条件时停止 Kotlin 流程
我想取消kotlin流程,如果代码中发生某些条件. 假设我有一个如下的方法 fun test(): Flow = flow { val lst = listOf("A", "B", "C") while(true) { lst.forEach { emit(it) } //If some condition occurs, need to return from here, else continue //How to stop flow here } } 并将其称为 test().collect { println(it)} 问题是,如何停止流动以在某些条件下(来自流量构建器或外部)产生任何东西? 解决方案 fun test(): Flow = flow { val lst = listOf("A", "B", "C") while(true)
12 2024-03-18
编程技术问答社区
如何用RxJava发出多个API请求并将其合并?
我必须进行N REST API调用并将所有结果的结果组合起来,或者如果至少一个呼叫失败(返回错误或超时),则失败. 我想使用rxjava,并且有一些要求: 在某些情况下,能够配置每个API调用的重试.我的意思是,如果我有一个重试= 2,并且我提出3个请求,每个请求最多必须2次,总共有6个请求. 失败快!如果一个API调用失败n次(n是重试的配置),则如果剩余请求尚未结束,则不会产生元信息,我想返回错误. 如果我想用一个线程提出所有请求,我需要一个异步http客户端,不会? 谢谢. 解决方案 您可以使用Zip操作员在结束后一起邮递所有请求,并检查所有这些都成功 是否所有请求 private Scheduler scheduler; private Scheduler scheduler1; private Scheduler scheduler2; /** * Since every observable into the zip is creat
14 2024-03-18
编程技术问答社区
在RxJava中等待多个异步调用完成
我的场景很简单,但我似乎无法在任何地方找到它. 我有一组我想要迭代的元素,每个人都称为异步函数,然后等待所有元素完成(这再次以异步方式发生,以函数的逻辑实现) .我对RXJAVA的新手相对较新,并且过去通过将回调传到功能并在末尾等待来轻松地在nodejs中执行此操作. 这是我需要的伪代码(元素的迭代器不需要同步或订购): for(line in lines){ callAsyncFunction(line); } WAIT FOR ALL OF THEM TO FINISH 您的帮助非常感谢! 解决方案 使用RX: Observable .from(lines) .flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io()) .ignoreElements(); 此时,根据您想做的事情,您可以使用.switchIfEmpty(...)订
12 2024-03-18
编程技术问答社区
RxJava中的flatZip
我将多个可观察到的曲线串在一起,然后以一种可观察到的方式转换它们: final Observable> result = Observable.zip(obs1, obs2, transformFunc); 我想做的是: final Observable result = Observable.flatZip(obs1, obs2, transformFunc); 鉴于Flatzip不存在(也许我应该提交),最干净的方法是什么.目前,我必须在本身上贴上结果. 解决方案 public class RxHelper { public static Observable flatZip(Observable o1, Observable o2, final Func2
8 2024-03-18
编程技术问答社区
CompletableFuture、Future和RxJava'的Observable之间的区别
我想知道 CompletableFuture,Future和Observable RxJava. 我所知道的都是异步,但 Future.get()阻止线程 CompletableFuture给出回调方法 RxJava Observable ---与CompletableFuture相似,还有其他好处(不确定) 例如:如果客户端需要进行多个服务调用,并且当我们使用Futures(Java)Future.get()时,将被顺序执行...想知道它在rxjava中如何更好. 和文档 http://reactivex.io/intro.html 说 很难使用期货来最佳组成有条件的异步执行流(或不可能,因为每个请求的潜伏期在运行时有所不同).当然,这可以做到这一点,但是它很快变得复杂(因此容易出错)或过早地阻止了future.get(),这消除了异步执行的好处. 真的很想知道RxJava如何解决这个问题.我发现从文档中很难理解. 解决方案 期货
24 2024-03-18
编程技术问答社区
如何通过使用RxJava-Android来等待多个嵌套的异步调用?
我是RXJAVA的新手,这是我的情况, 发送请求a并将获得List返回 对于每个a,发送请求AA并将恢复AA,绑定A和AA然后 有类似逻辑的B&BB 只有在所有请求完成后才做一些事情 示例: request(url1, callback(List listA) { for (A a : listA) { request(url2, callback(AA aa) { a.set(aa); } } } a和b是独立的 如何构建代码?我还将改造用作网络客户端. 解决方案 好吧,我认为这应该解决您问题的第一部分: 请注意,对flatMap的第二个调用给出了2个参数 - flatMap的版本不仅为每个输入项目可观察到一个可观察的,而且还采用第二个功能,又会从中从可观察到相应的输入项目的可观察. 查看此标题下的第三个图形以获得直观的理解: -Abserva
14 2024-03-18
编程技术问答社区
rxjava 合并不同类型的观察变量
我是RXJAVA的新手.我需要结合两个发出不同类型对象的观察值.类似Observable和Observable并获得Observable.我找不到任何操作员.这样做的RX方式是什么?请注意,Milk和Cereals是异步. 解决方案 很难不知道您需要什么,但可能 bombineLatest(). zip将同时服用Observable和Observable,让您通过提供的功能将它们组合到CerealsWithMilk中.每次获得 a Milk 和 a Cereals. combineLatest与zip相似,除非它会发出新的CerealsWithMilk,即使只是 新的Milk Milk 或只是一个新的Cereals发出. 其他解决方案 如果要合并不同类型的可观察结果,则需要使用Observable o1 = Observable.just("a", "b"
10 2024-03-18
编程技术问答社区
RxJava并行获取可观察变量
我需要在RXJAVA中实现平行异步调用时需要一些帮助.我已经选择了一个简单的用例,其中第一个调用获取(而不是搜索)要显示的产品(图块)列表.随后的电话输出并获取(a)评论和(b)产品图像 几次尝试后,我到达了这个地方. 1 Observable searchTile = searchServiceClient.getSearchResults(searchTerm); 2 List allTiles = new ArrayList(); 3 ClientResponse response = new ClientResponse(); 4 searchTile.parallel(oTile -> { 5 return oTile.flatMap(t -> { 6 Observable reviews = reviewsServiceClient.getSelle
14 2024-03-18
编程技术问答社区
如何处理微服务架构中的网络调用
我们使用的是Micro Service Architecture,其中使用顶级服务来公开REST API的最终用户和后端服务进行查询数据库的工作. 当我们获得 1用户请求时,我们提出了〜30k的后端服务请求.我们正在使用RXJAVA作为顶级服务,因此所有30K请求并行执行. 我们正在使用Haproxy在后端服务之间分配负载. 但是,当我们获得3-5个用户请求时,我们正在获得网络连接异常,没有托管异常路线,套接字连接异常. 这种用例的最佳实践是什么? 解决方案 您最终得到了经典的微服务混乱.这完全无关紧要您采用的技术 - 问题在您应用微服务概念的方式中存在! 在此架构中,服务是自然的,服务互相呼叫(最好是异步发生!!).由于我只对您的服务API了解一无所知,因此我必须对您的后端出了什么问题做出一些假设: 我假设用户向一项服务提出请求.该服务现在(显然是同步)查询另一个服务,并接收您描述的30k记录.由于您可能必须了解更多有关这些记录的信息,因此您现在必须每个
12 2024-03-03
编程技术问答社区
RxJava -2个观察者,随时接受更多的观察者?
我目前正在使用RX-Java 2,并且具有一个用例,其中需要单个骆驼路由订户使用多个可观察到的东西. 使用此解决方案作为参考,我有一个部分工作解决方案. rxjava-合并的可观察到可以接受更多观察到的时间吗? 我打算使用PublishProcessor将订阅一个骆驼反应流订阅者,然后维护ConcurrentHashSet>,我可以动态添加新的可观察到. 目前,我遇到了如何与PublishProcessor添加/管理Flowable实例? 我真的是RX Java的新手,因此任何帮助都将受到赞赏!这就是我到目前为止的: PublishProcessor publishProcessor = PublishProcessor.create(); CamelReactiveStreamsService camelReactiveStreamsService = CamelReactiveStreams.get(camelContext)
16 2024-02-28
编程技术问答社区
为反应式管道编写方面的内容
我正在编写回报承诺方法的方面.考虑以下方法: public Mono publishToKafka(Stream s) { //publishToKafka is asynchronous return Mono.just(s).flatMap(worker::publishToKafka); } 我想缓存发布是否成功.由于这是一个横切问题,因此一个方面看起来像是最好的设计.这是我的方面. @Around("@annotation....") public Object cache() { //get the data to cache from the annotation Object result = pjp.proceed(); cache.cache("key","data"); return result; } 现在,由于publishToKafka是异步的,因此目标方法在螺纹开关发生并调用c
12 2024-02-27
编程技术问答社区