可观察的订阅没有被调用[英] Observable subscription not getting called

本文是小编为大家收集整理的关于可观察的订阅没有被调用的处理/解决方法,可以参考本文帮助大家快速定位并解决问题,中文翻译不准确的可切换到English标签页查看源文。

问题描述

我有一个可观察的,我用来将承诺转换为订阅.这导致我需要迭代每个元素的HTTP服务.我正在使用forkjoin等待所有这些电话完成,以便我可以做其他事情,但是不幸的是,我的订阅没有被调用.你看到我在这里缺少什么吗?

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser).subscribe(
        results => {
          this.progress += 1;
        },
        err => {
          this.progress += 1;
          this.handleError(<any>err);
        })
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

推荐答案

一件事是您不订阅每个可观察到的可观察到的传递给forkJoin().操作员必须自己做.

如果您想在每个可观察到完成时都会被通知,则可以使用.do(undefined, undefined, () => {...}).

let observables = [
  Observable.of(42).do(undefined, undefined, () => console.log('done')),
  Observable.of('a').delay(100).do(undefined, undefined, () => console.log('done')),
  Observable.of(true).do(undefined, undefined, () => console.log('done')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

此打印到控制台:

done
done
done
[ 42, 'a', true ]

最终还有.finally()操作员.但是,它与使用.do().

不同

编辑:

当任何源可观察到失败时,forkJoin()操作员会重新启用错误(这也意味着它也失败了).
这意味着您需要分别观察到每个源中的错误(例如,使用catch()操作员).

let observables = [
  Observable.throw(new Error())
    .catch(() => Observable.of('caught error 1'))
    .do(undefined, undefined, () => console.log('done 1')),

  Observable.of('a')
    .delay(100).catch(() => Observable.of('caught error 2'))
    .do(undefined, undefined, () => console.log('done 2')),

  Observable.of(true)
    .catch(() => Observable.of('caught error 3'))
    .do(undefined, undefined, () => console.log('done 3')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

打印:

done 1
done 3
done 2
[ 'caught error 1', 'a', true ]

其他推荐答案

我认为您不需要在地图中订阅.

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser))
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

请注意,在RXJS示例中:

https .com/vactive-extensions/rxjs/blob/master/doc/api/core/operators/forkjoin.md

他们不订阅单个可观察到的东西 - forkjoin让他们全部走了,然后等待所有人返回(在您的订阅中)

.

编辑:

forkjoin来源在这里:

本文地址:https://www.itbaoku.cn/post/2288300.html

问题描述

I have an Observable that I'm using to convert a promise into a subscription. This results in a collection that I need to iterate through to call an HTTP Service on each element. I'm using forkJoin to wait for all those calls to finish so that I can do something else, but unfortunately, my subscription is not being called. Do you see what I'm missing here?

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser).subscribe(
        results => {
          this.progress += 1;
        },
        err => {
          this.progress += 1;
          this.handleError(<any>err);
        })
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

推荐答案

One thing is you don't subscribe to each Observable passed to forkJoin(). The operator has to do it itself.

If you want to be notified when each Observable completes you can use .do(undefined, undefined, () => {...}).

let observables = [
  Observable.of(42).do(undefined, undefined, () => console.log('done')),
  Observable.of('a').delay(100).do(undefined, undefined, () => console.log('done')),
  Observable.of(true).do(undefined, undefined, () => console.log('done')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

This prints to console:

done
done
done
[ 42, 'a', true ]

Eventually there's also .finally() operator. However, it's not the same as using .do().

EDIT:

When any of the source Observables fail the forkJoin() operator reemits the error (which means it also fails).
This means you need to catch errors in each source Observable separately (with catch() operator for example).

let observables = [
  Observable.throw(new Error())
    .catch(() => Observable.of('caught error 1'))
    .do(undefined, undefined, () => console.log('done 1')),

  Observable.of('a')
    .delay(100).catch(() => Observable.of('caught error 2'))
    .do(undefined, undefined, () => console.log('done 2')),

  Observable.of(true)
    .catch(() => Observable.of('caught error 3'))
    .do(undefined, undefined, () => console.log('done 3')),
];

Observable.forkJoin(observables)
  .subscribe(results => console.log(results));

Which prints:

done 1
done 3
done 2
[ 'caught error 1', 'a', true ]

其他推荐答案

I don't think you need to subscribe in the map.

Observable.fromPromise(this.users.getElements()).subscribe(results => {
  Observable.forkJoin(
    results.map(
      aUser => this.HttpService.submitUser(aUser))
    ).subscribe(
      //it never gets to either of these calls after all service calls complete
      data => {
        debugger;
        console.log(data);
        this.reset();
      },
      err => {
        debugger;
        console.log(err);
        this.reset();
      }
    ));
});

Note that in the rxjs example here:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

they don't subscribe to the individual observables -- ForkJoin gets them all going, then waits for all of them to return (in your subscribe.)

EDIT:

The forkjoin source is here:

https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/forkjoin.js

and it doesn't look like it has hooks to find out when each finishes. I think the best way to approach a UI bar would be to have each of the mapped observables subscribed to individually that all call a function to increment the UI counting bar variable and have some test for "completeness" that allows you to use the data.