来自 APP下载 2019-09-18 03:20 的文章
当前位置: 新萄京娱乐手机版 > APP下载 > 正文

服务端主推消息

   因为本人打听Akka-http的要紧目标不是为了有关Web-Server的编制程序,而是想达成一套系统融合为一的api,所以也亟需思虑由服务端主动向顾客端发送指令的选取场景。比方二个零售店处理平台的服务端在做到了一些数据更新后须要文告各零售门市顾客端下载最新数据。尽管Akka-http也提供对websocket公约的支撑,但websocket的互联网连接是双向长久的,适合频仍的问答交互式服务端与客户端的沟通,消息结构也正如零碎。而小编辈面对的大概是批次型的大方数据库数据调换,只必要轻易的劳务端单向消息就行了,所以websocket不太适宜,而Akka-http的SSE应该相比吻合我们的渴求。SSE情势的基本原理是服务端统一聚焦发布音信,各客商端长久订阅服务端公布的消息并从音讯的剧情中筛选出属于本身应该施行的命令,然后开展对应的处理。客商端接收SSE是在二个单独的线程里不停开展的,不会默化潜移客商端当前的演算流程。当接受有效的新闻后就能够调用一个事务职能函数作为后台异步运算职责。

服务端的SSE发表是以Source[ServerSentEvent,NotUsed]来达成的。ServerSentEvent类型定义如下:

/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain n or r
 * @param id optional id, must not contain n or r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

这几个类其余参数代表事件信息的数据结构。顾客能够依照实际须求足够利用这些数据结构来传递消息。服务端是由此complete以SeverSent伊芙nt类为因素的Source来张开SSE的,如下:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

上述代码代表服务端定期运算processToServerSent伊芙nt重返ServerSent伊夫nt类型结果后揭橥给具有订阅的客商端。我们用八个函数processToServerSent伊芙nt模拟重复运算的事情职能:

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

本条函数模拟发表事件数量是某种业务运算结果,在这里代表顾客端需求下载文件名称。大家用客商端request来效仿设定那个文件名称:

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

客商端订阅SSE的诀要如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

每当顾客端收到SSE后即运转downloadFiles(filename)函数。downloadFiles函数定义:

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

上面是客商端程序的测量检验运算步骤:

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

上边是此番研商的示范源代码:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

 

 小编的博客将在联合至Tencent云+社区。邀我们一齐入驻

 

 

 

 

 

 

 

 

 

 

本文由新萄京娱乐手机版发布于APP下载,转载请注明出处:服务端主推消息

关键词: