发布来自流的解码信息,同时在Rx中保持封装状态[英] Publishing decoded messages from a Stream while keeping encapsulation in Rx

本文是小编为大家收集整理的关于发布来自流的解码信息,同时在Rx中保持封装状态的处理/解决方法,可以参考本文帮助大家快速定位并解决问题,中文翻译不准确的可切换到English标签页查看源文。

问题描述

本文来自:IT宝库(https://www.itbaoku.cn)

我是不熟悉RX的新手,我一直在尝试重写我的MVC W/服务层( sasp !),以使用这种很棒的新范围RX.我有一个称为Remote的类,它封装了NetworkStream. Remote使用rx从NetworkStream中收听字节,一旦奏效了,它已收到一条完整的数据,它将数据解码为IMessage.

.

我可以从Stream中读取如何连续使用Remote内部的RX阅读,但是如何从Remote中从该流到外界发布解码IMessage?我应该在C#中使用经典事件样式,并让活动的消费者使用Observable.FromEvent?

我只问,因为我已经阅读了IObservable不再需要实现.

推荐答案

我应该在C#中使用经典的事件样式,并有 事件的消费者使用Observable.FromEvent?

如果您没有积极地强迫这样做, 不要使用C#样式事件制作API . IObservable<T>是一个强大的,通用的, 广泛支持的 接口,使我们能够将事件视为一流的公民,同时轻松管理订阅.即使您的消费者不使用RX,他们也能够比使用C#事件更容易理解和使用IObservable<T>.他们对这些事件的作用取决于他们,但是IObservable<T>抽象更清晰,更简单.

我已经读到,iObservable不再需要实现.

现实,我们的意思是,可能没有理由独自实施IObservable<T>,因为我们有工具为我们创建该类型的实例.

我们有Observable.Create(...),它允许我们从头开始创建可观察力.我们有不同类型的主题,例如Subject<T>,BehaviorSubject<T>,ReplaySubject<T>等,可以用作代理,并允许我们将值数量为多个消费者,并且我们有操作员可以使我们能够转换/构成任何<任何IObservable<T>进入另一种类型或类型的IObservable<T>.

但是如何从Remote?

发布从该流到外界的解码iMessage

您在类/接口上公开IObservable<T>

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }
}

您可以以多种方式实施.首先,您可以做到这一点,以便每次订阅Messages获得您自己的基础逻辑的订阅...

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  public IObservable<IMessage> Message {
    get {
      return message;
    }
  }
}

,或者您可以确保只有一个基础逻辑订阅...

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  private IObservable<IMessage> _refCountedMessages
    = this._messages
        .Publish()
        .RefCount();

  public IObservable<IMessage> Message {
    get {
      return this._refCountedMessages;
    }
  }
}

,或者您可以使连接过程本质上非常明确.

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }

  public IDisposable Connect();
}

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  private IObservable<IMessage> _connectableMessages
    = this._messages
        .Publish();

  public IObservable<IMessage> Message {
    get {
      return this._connectableMessages;
    }
  }

  public IDisposable Connect()
  {
    return this._connectableMessages.Connect();
  }
}

其他推荐答案

我假设您的问题与此问题相似如何使用rx ?

从serialport读取数据的数据线"

您不会将字符串推向您,而是将您更改为消息,而是会得到字节.没问题,您可以使用相同的窗口概念将字节顺序切成窗口,然后可以翻译/转换/映射/任何内容IMessage.

.

建立在克里斯托弗·哈里斯(Christopher Harris)的回答上.这是他提出的界面的实现.这里的目的是表明您可以揭示可观察到的序列,这些序列只是在基本可观察的序列顶部构建的查询.在这种情况下,消息序列只是网络序列的查询.通过分层,我们获得了消费者想要的抽象水平.

void Main()
{
    var networkStream = new NetworkStream();
    var remote = new Remote(networkStream);
    remote.GetMessages().Dump("remote.GetMessages()");
}

// Define other methods and classes here
public class NetworkStream
{
    //Fake getting bytes off the wire or disk
    public IObservable<byte> GetNetworkStream()
    {
        var text = @"Line 1.
Hello line 2.
3rd and final line!";

        return Observable.Zip(
            UTF8Encoding.UTF8.GetBytes(text).ToObservable(),
            Observable.Interval(TimeSpan.FromMilliseconds(100)),
            (character, time)=>character);
    }
}
public interface IMessage
{
    string Content {get;}
}
public class Message : IMessage
{
    public Message(string content)
    {
        Content = content;
    }
    public string Content {get; private set;}
}
public interface IRemote
{
    IObservable<IMessage> GetMessages();
}
public class Remote : IRemote
{
    private readonly NetworkStream _networkStream;
    private readonly byte[] _delimiter = UTF8Encoding.UTF8.GetBytes(Environment.NewLine);
    public Remote(NetworkStream networkStream)
    {
        _networkStream = networkStream;
    }
    public IObservable<IMessage> GetMessages()
    {
    return  _networkStream.GetNetworkStream()
                            .WindowByExclusive(b => _delimiter.Contains(b))
                            .SelectMany(window=>window.ToArray().Select(bytes=>UTF8Encoding.UTF8.GetString(bytes)))
                            .Select(content=>new Message(content));
    }
    //TODO Add IDispose and clean up your NetworkStream
}

public static class ObservableEx
{
    public static IObservable<IObservable<T>> WindowByExclusive<T>(this IObservable<T> input, Func<T, bool> isWindowBoundary)
    {
        return Observable.Create<IObservable<T>>(o=>
        {
            var source = input.Publish().RefCount();
            var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default);
            return left.GroupJoin(
                            source.Where(c=>!isWindowBoundary(c)),
                            x=>source.Where(isWindowBoundary),
                            x=>Observable.Empty<Unit>(),
                            (_,window)=>window)
                        .Subscribe(o);
        });
    }
}

本文地址:https://www.itbaoku.cn/post/2352361.html

问题描述

I'm new to using Rx and I've been trying to re-write my MVC w/ Service Layer (NOT ASP!) to use this awesome new-fangled Rx. I have a class called Remote which encapsulates a NetworkStream. The Remote uses Rx to listen to bytes from the NetworkStream and once it works out it's received a full message worth of data, it decodes that data into an IMessage.

I get how I can read from the Stream continuously using Rx from inside the Remote, but how do I publish decoded IMessage from that stream to the outside world from the Remote? Am I supposed to use the classic evented style in C# and have the consumers of the events use Observable.FromEvent?

I only ask because I've read around that IObservable is not meant to be implemented anymore.

推荐答案

Am I supposed to use the classic evented style in C# and have the consumers of the events use Observable.FromEvent?

If you're not positively forced to do so, do not make an API using C# style events. The IObservable<T> is a powerful, generic, widely supported interface which allows us to treat events as first class citizens while easily managing subscriptions. Even if your consumer isn't using Rx, they'll be able to understand and use IObservable<T> more easily than using C# events. What they do with those events is up to them, but the IObservable<T> abstraction is clearer and simpler.

I've read around that IObservable is not meant to be implemented anymore.

It reality, what we mean is that there's probably not reason to implement IObservable<T> on your own, because we have tools to create instances of that type for us.

We have Observable.Create(...) which allows us to create observables from scratch. We have different types of Subjects like Subject<T>, BehaviorSubject<T>, ReplaySubject<T>, etc, which can be used as proxies and allow us to multicast values to multiple consumers, and we have operators which allow us to transform/compose any IObservable<T> into another type or kind of IObservable<T>.

but how do I publish decoded IMessage from that stream to the outside world from the Remote?

You expose an IObservable<T> on your class / interface.

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }
}

You could implement this in any number of ways. First, you could make it so each subscription to Messages gets it's own subscription to your underlying logic...

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  public IObservable<IMessage> Message {
    get {
      return message;
    }
  }
}

Or you could make sure that there's only ever one subscription to the underlying logic...

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  private IObservable<IMessage> _refCountedMessages
    = this._messages
        .Publish()
        .RefCount();

  public IObservable<IMessage> Message {
    get {
      return this._refCountedMessages;
    }
  }
}

Or you could make the connection process extremely explicit in nature.

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }

  public IDisposable Connect();
}

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;

  private IObservable<IMessage> _connectableMessages
    = this._messages
        .Publish();

  public IObservable<IMessage> Message {
    get {
      return this._connectableMessages;
    }
  }

  public IDisposable Connect()
  {
    return this._connectableMessages.Connect();
  }
}

其他推荐答案

I assume you problem is similar to this question How to "reconstruct lines" of data read from SerialPort using Rx?

Instead of getting strings pushed at you that you then change into messages, you will get bytes. No problem, you can use the same WindowBy concept to slice up your sequence of bytes into windows that can then be translated/converted/mapped/whatever into your IMessage.

Building on Christopher Harris' answer. Here is an implementation of his proposed interfaces. The point here is to show that you can expose observable sequences that are just queries built on top of an underlying observable sequence. In this case the Messages sequence is just a query over the network sequence. With layering we get the level of abstraction that the consumer wants.

void Main()
{
    var networkStream = new NetworkStream();
    var remote = new Remote(networkStream);
    remote.GetMessages().Dump("remote.GetMessages()");
}

// Define other methods and classes here
public class NetworkStream
{
    //Fake getting bytes off the wire or disk
    public IObservable<byte> GetNetworkStream()
    {
        var text = @"Line 1.
Hello line 2.
3rd and final line!";

        return Observable.Zip(
            UTF8Encoding.UTF8.GetBytes(text).ToObservable(),
            Observable.Interval(TimeSpan.FromMilliseconds(100)),
            (character, time)=>character);
    }
}
public interface IMessage
{
    string Content {get;}
}
public class Message : IMessage
{
    public Message(string content)
    {
        Content = content;
    }
    public string Content {get; private set;}
}
public interface IRemote
{
    IObservable<IMessage> GetMessages();
}
public class Remote : IRemote
{
    private readonly NetworkStream _networkStream;
    private readonly byte[] _delimiter = UTF8Encoding.UTF8.GetBytes(Environment.NewLine);
    public Remote(NetworkStream networkStream)
    {
        _networkStream = networkStream;
    }
    public IObservable<IMessage> GetMessages()
    {
    return  _networkStream.GetNetworkStream()
                            .WindowByExclusive(b => _delimiter.Contains(b))
                            .SelectMany(window=>window.ToArray().Select(bytes=>UTF8Encoding.UTF8.GetString(bytes)))
                            .Select(content=>new Message(content));
    }
    //TODO Add IDispose and clean up your NetworkStream
}

public static class ObservableEx
{
    public static IObservable<IObservable<T>> WindowByExclusive<T>(this IObservable<T> input, Func<T, bool> isWindowBoundary)
    {
        return Observable.Create<IObservable<T>>(o=>
        {
            var source = input.Publish().RefCount();
            var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default);
            return left.GroupJoin(
                            source.Where(c=>!isWindowBoundary(c)),
                            x=>source.Where(isWindowBoundary),
                            x=>Observable.Empty<Unit>(),
                            (_,window)=>window)
                        .Subscribe(o);
        });
    }
}