垂直。x被动卡夫卡客户端:写作时链接不起作用?
我正在使用io.vertx.reactivex.kafka.client.producer.KafkaProducer客户端.客户有一个 rxWrite返回Single的函数.但是,在写操作期间,我需要记录错误.显然没有被执行. 我写了以下工作示例. test():函数测试链接和记录 fun test(): Single { val data = Single.just(ArrayList().apply { add("Hello") add("World") }) data.flattenAsObservable { list -> list } .flatMap { advertiser -> //does not work with writeKafka writeErro
2 2023-11-18
编程技术问答社区
如何模拟EntityBus.rxSend()?
io.vertx.reactivex.core.eventbus.EventBus.rxSend()方法具有以下签名: public Single> rxSend(String address, Object message, DeliveryOptions options) 模拟此方法的正确方法是什么,以便它返回包含真实对象的Single?问题在于,Message类除了一个构造函数外,它具有另一个Message对象的构造函数. 因此以下将编译: Mockito.when(eventBus.rxSend(Mockito.isA(String.class), Mockito.isA(JsonObject.class), Mockito.isA(DeliveryOptions.clas
2 2023-11-18
编程技术问答社区
节流传出的HTTP请求
我将Vert.x用作服务器来接收搜索请求,然后在服务器上处理到多个外部外部服务的HTTP请求. 对于异步行为,我选择使用vert.x本机HTTP客户端使用RXJAVA. 是,我使用的服务将我限制为每秒最多1个呼叫. thttle nio http传出请求的最佳/简单方法是什么? 在单个服务器环境中. 在聚类的服务器环境中. 解决方案 在单个服务器中,解决您的问题的幼稚方法是: 将请求加上结果处理程序推到队列 有一个定期处理程序,每秒都会从队列中弹出一个元素并执行请求 将结果处理程序调用结果或错误 以群集的方式,您可以扩展先前的概念并将其包装在事件总线地址上,以便以下方式: 用请求将消息发送到地址并设置回复处理程序 该服务将按照上一个说明工作 而不是致电处理程序,服务将依靠消息回复处理程序通过结果.
2 2023-11-18
编程技术问答社区
如何创建Single.just(Void)?
我正在为我的应用程序编写一些单位测试用例.我想模拟MongoClient update方法,但是更新返回Single. when(mongoClient.rxUpdate(anyString(), any(JsonObject.class), any(JsonObject.class))) .thenReturn(Single.just(Void)) 现在Single.just(Void)不起作用,正确的方法是什么? - 更新 - 所以我正在为updateUserProfile方法编写单元测试,为此我嘲笑了service.但是service.updateAccount方法返回是我无法模拟的. //Controller class public void updateUserProfile(RoutingContext routingContext){ // some code service.updateAccount(qu
2 2023-11-18
编程技术问答社区
RxAndroid 3主线
我试图在rx3 中找到订阅的主线程 Single.just(getHeavyData()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Data d) throws Throwable { setAdapters(d);
0 2023-11-11
编程技术问答社区
在UseCase对象中的UntTest-Mock RxJava对象
我想测试用户酶对象,在这种特定情况下,有一个loginusecase,看起来像这样: public class LoginUseCase implements RxUseCase { ApiManager mApiManager; public LoginUseCase(ApiManager apiManager) { mApiManager =apiManager; } @Override public Observable execute(final AuthCredentials authCredentials) { return Observable.just(1) .delay(750, TimeUnit.MILLISECONDS) .flat
2 2023-11-07
编程技术问答社区
在安卓的MVP演示器中对RxJava进行单元测试
我是TDD的新手.也是MVP和RXJAVA的新手.我只是深入研究,这是值得的.但是我坚持测试部分.我了解单位测试的基础.一开始我对我来说有点困难.但是我坚持在这里,所以如何测试演示者? 这是主持人类 - public class NewsPresenter { private final RxjavaService service; private final MainView view; private CompositeSubscription subscriptions; public NewsPresenter(RxjavaService service, MainView view) { this.service = service; this.view = view; subscriptions = new CompositeSubscription(); } public void getNewsList(String urlQ){
2 2023-11-07
编程技术问答社区
如何对RxJava服务器轮询中的 "repeatWhen "进行单元测试
这是我的代码 return Observable.defer(() -> mApi.verifyReceipt(username, password)) .subscribeOn(Schedulers.io()) .doOnNext(this::errorCheck) .repeatWhen(observable -> observable.delay(1, TimeUnit.SECONDS)) .takeUntil(Response::body) .filter(Response::body) .map(Response::body); 它不断进行投票,并收到"虚假"布尔响应,并在收到" true"时停止投票.我为Onnext()案例做了一个测试用例,例如: @Test public void testVerifyReceipt
4 2023-11-07
编程技术问答社区
RxJava android mvp单元测试 NullPointerException
我是MVP中单位测试的新手,我想向主持人进行基本测试,该测试负责登录,我只想断言 view.onLoginSuccess(); 这是PresenterCode: public LoginPresenter(LoginViewContract loginView, LoginModelContract loginModel, CurrentUserLoginModelContract currentUserLoginModel, CompositeDisposable subscriptions) { this.subscriptions = subscriptions; this.loginView = loginView; this.loginModel = loginModel; this.curr
2 2023-11-07
编程技术问答社区
无法验证来自RxJava订阅者的模拟方法调用
我正在尝试在我的Android应用程序中测试主持人.我正在尝试测试这样的方法: @Override public boolean loadNextPage() { if (!mIsLoading) { mIsLoading = true; if (mViewReference.get() != null) { mViewReference.get().showProgress(); } mService.search(mSearchQuery, ++mCurrentPage) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(itemsPage -> {
0 2023-11-07
编程技术问答社区
单元测试有延迟的Rxjava观察变量
我希望能够单位测试Observable,Observable的排放效果延迟,但实际上没有等待延迟时间.有没有办法做到这一点? 我目前正在使用倒计时来延迟断言,这效果很好,但是增加了测试运行时间. 示例: val myObservable: PublishSubject = PublishSubject.create() fun myObservable(): Observable = myObservable.delay(3, TimeUnit.SECONDS) @Test fun testMyObservable() { val testObservable = myObservable().test() myObservable.onNext(true) // val lock = CountDownLatch(1) // lock.await(3100, TimeUnit.M
4 2023-11-07
编程技术问答社区
如何在Retrofit 2的单元测试中创建 retrofit.Response对象
在使用RXJAVA和RETROFIT 2时,我正在尝试创建单元测试以覆盖我的应用程序时. 我遇到的问题是,对于Raturofit 2,我看不到创建改造的好方法.响应对象而不使用反射. @Test public void testLogin_throwsLoginBadRequestExceptionWhen403Error() { Request.Builder requestBuilder = new Request.Builder(); requestBuilder.get(); requestBuilder.url("http://localhost"); Response.Builder responseBuilder = new Response.Builder(); responseBuilder.code(403); responseBuilder.protocol(Protocol.HTTP_1_1);
0 2023-11-07
编程技术问答社区
Android RxJava 2 JUnit test-getMainLooper in android.os.Looper not mocked RuntimeException
尝试使用observeOn(AndroidSchedulers.mainThread())的主持人进行JUNIT测试时,我正在遇到RuntimeException. 由于它们是纯Junit测试而不是Android仪器测试,因此它们无法访问Android依赖性,导致我在执行测试时遇到以下错误: java.lang.ExceptionInInitializerError at io.reactivex.android.schedulers.AndroidSchedulers$1.call(AndroidSchedulers.java:35) at io.reactivex.android.schedulers.AndroidSchedulers$1.call(AndroidSchedulers.java:33) at io.reactivex.android.plugins.RxAndroidPlugins.callRequireNonNull(RxAnd
4 2023-11-07
编程技术问答社区
rxjava结合了可观察到的另一个可选的可观察到超时的
Asume我们有两个可观察到的A和B. A肯定会发布结果,而B的结果可能根本不发布(超时). 问题是如何映射A和B的结果,如果B在时间范围内返回,否则只需从A中返回结果. Observable A = getDatabaseElement(); Observable B = restApi.getElement(); 地图示例: map((databaseObject, networkObject) => { databaseObject.setData(networkObject); return databaseObject; }) 解决方案 为了超时B可观察使用take操作员与时间参数: B.take(10, TimeUnit.SECONDS) 为了接收A或B(如果B在超时内准备就绪)使用concatWith: A.concatWith(B.take(10, TimeU
8 2023-10-21
编程技术问答社区
RxJava中的超时
我是RXJAVA的新手,我需要以异步方式使用可观察的功能. 我还需要使用超时:在我的范围内,我希望每个过程在1秒或更少的时间内结束. 这是我现在所做的: public static void hello(String name) throws IOException { Observable obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()); obs2.subscribe(new Action1() { @Override public void call(String s) { if("CCCCC".equals(s)){ try { Thread.sleep(32
10 2023-10-21
编程技术问答社区
SerializedSubject对于RxJava中的线程安全是必需的吗
我在rxjava中创建了一个Subject实例,并从多个线程调用其onNext(): PublishSubject subject = PublishSubject.create(); //... subject.onNext("A"); //thread A subject.onNext("B"); //thread B rxjava文档说: 注意不要从多个线程中调用其onNext( )方法(或其他方法),因为这可能导致非序列化呼叫,这违反了可观察到的合同,并在结果Subject中产生了歧义. 我是否必须在这样的Subject上打电话toSerialized(),假设我不在乎"A"是否在"B"之前或之后?序列化将如何帮助? 无论如何是Subject线程安全,还是我会在没有toSerialized()的情况下打破rxjava? 文档提到的" 可观察合同"是什么? 解决方案 我是否必须在此主题上调用Toserialized(),假设
4 2023-10-20
编程技术问答社区
RxJava调度器的用例
在rxjava中有 5个不同的调度程序 立即():创建并返回一个调度程序,该调度程序立即在当前线程上执行工作. 蹦床():创建并返回一个调度程序,该调度程序在当前工作完成后要执行的当前线程中排队. newthread():创建并返回一个调度程序,为每个工作单位创建一个新线程. Computation():创建并返回用于计算工作的调度程序.这可用于事件环,处理回调和其他计算工作.请勿在此调度程序上执行IO-BOND工作.使用调度程序. io(). io():创建并返回用于IO-BOND工作的调度程序. 该实施是由将根据需要增长的执行者线程池的支持.这可用于异步执行阻塞IO.请勿在此调度程序上执行计算工作.使用调度程序. Computation()代替. 问题: 前3个调度程序是非常自我解释的.但是,我对计算和 io . 有点困惑. " IO-BOND BOND WORK"到底是什么?它是否用于处理流(java.io)和文件(java.nio
4 2023-10-20
编程技术问答社区
关于PublishSubject和TestScheduler的问题,项目没有被发射出来。
我一直在面临主题和测试人员的问题.如果我使用蹦床调度程序,我的测试通过,但由于某种原因,如果我使用Testscheduler,它们会失败. 这是我的样本测试和相关类. @RunWith(MockitoJUnitRunner::class) class DemoViewModelTest { //Error Mocks private val actionsStream: PublishSubject = PublishSubject.create() private lateinit var viewModel: DemoViewModel private val handler = mock(DemoContract.Handler::class.java) @Before fun setup() { viewModel = DemoViewModel
4 2023-10-18
编程技术问答社区
Robolectric + rxJava + retrofit Second call throws java.io.InterruptedIOException
我正在开发和Android应用程序.我正在使用Raturofit(带有OKCLIENT)进行API请求,并使用Robolectric进行测试.我的API看起来像这样: @GET("/v1/book/{bookId}") Observable getBook(@Path("bookId") int bookId); 仅用于robolectric,我强迫API呼叫同步. Restadapter构建器看起来像这样: RestAdapter.Builder builder = new RestAdapter.Builder().setEndpoint(environment.getServerEndpoint()) .setClient(new OkClient(client)) .setExecutors(new ImmediateExecutor(), null)
6 2023-10-17
编程技术问答社区
术语: 什么是功能性反应编程/RX 中的 "故障"?
在功能反应性编程的背景下,"小故障"的定义是什么? 我知道,在某些FRP框架中可能会发生"故障",而在其他FRP框架中则可能发生.例如,Reactfx是无故障的,例如RX是无故障的. clarity-in-a-si"> 1 ]. 有人可以给出一个非常简单的示例,说明使用Rx时如何以及何时出现故障并在同一示例上显示相应的ReactFX解决方案的方式以及为什么无毛孔. 感谢您的阅读. 解决方案 定义 我的(自己的)最喜欢的定义: a 小故障是可观察状态的暂时不一致. 定义来自 scala.rx.rx : 在FRP的背景下,小故障是数据流图中的暂时不一致.由于更新不会立即发生,而是花时间计算,因此在更新过程中,FRP系统中的值可能会瞬时不同步.此外,根据FRP系统的性质,可以在传播中多次更新节点. 示例 考虑整数变量a,b.定义sum和prod使得 sum := a + b, prod := a * b. 让我们将此示例重
4 2023-10-17
编程技术问答社区