Scala读取连续的http流[英] Scala read continuous http stream

本文是小编为大家收集整理的关于Scala读取连续的http流的处理/解决方法,可以参考本文帮助大家快速定位并解决问题,中文翻译不准确的可切换到English标签页查看源文。

问题描述

如何在Scala中连接并读取连续的(块)HTTP流?例如,如果我有这个简单的服务,用python/瓶写:

from gevent import monkey; monkey.patch_all()

import gevent
from bottle import route, run

@route('/stream')
def stream():
    while True:
        yield 'blah\n'
        gevent.sleep(1)

run(host='0.0.0.0', port=8100, server='gevent')

我打算使用akka-stream处理数据,我只需要一种检索数据.

推荐答案

这应该起作用.基本上,您向产生构成响应的URI提出了一个请求.响应实体包含一个数据库流.如果响应分数,这将是块的流.如果没有封锁的响应(httpentity.strict),这将是一个只有一个块的流.

显然,您也可以在实体上明确匹配,以查看它是否是httpentity.换成,但通常您还需要保留处理非锁定响应的能力.

在现实世界应用程序中,您不会使用runforeach执行副作用,而是对数据库流进行一些处理.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{Uri, HttpRequest}
import akka.stream.ActorMaterializer

object ChunkTestClient extends App {

  implicit val system = ActorSystem("test")
  import system.dispatcher

  implicit val materializer = ActorMaterializer()
  val source = Uri("https://jigsaw.w3.org/HTTP/ChunkedScript")
  val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
      println(chunk.utf8String)
    }
  }
}

本文地址:https://www.itbaoku.cn/post/2328060.html

问题描述

How can I connect to and read a continuous (chunked) http stream in scala? For example, if I have this simple service written in python/bottle:

from gevent import monkey; monkey.patch_all()

import gevent
from bottle import route, run

@route('/stream')
def stream():
    while True:
        yield 'blah\n'
        gevent.sleep(1)

run(host='0.0.0.0', port=8100, server='gevent')

I'm planning to use akka-stream to process the data, I just need a way to retrieve it.

推荐答案

This should work. Basically, you do a single request to an uri that produces a chunked response. The response entity contains a dataBytes stream. In case of a chunked response, this will be the stream of chunks. In case of a non-chunked response (HttpEntity.Strict), this will be a stream with just a single chunk.

Obviously you can also explicitly match on the entity to see if it is HttpEntity.Chunked, but usually you want to retain the ability to handle non-chunked responses as well.

In a real world application you would not use runForeach to execute a side effect, but do some processing with the dataBytes stream.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{Uri, HttpRequest}
import akka.stream.ActorMaterializer

object ChunkTestClient extends App {

  implicit val system = ActorSystem("test")
  import system.dispatcher

  implicit val materializer = ActorMaterializer()
  val source = Uri("https://jigsaw.w3.org/HTTP/ChunkedScript")
  val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
      println(chunk.utf8String)
    }
  }
}