>

服务端首荐消息,身份验证

- 编辑:www.bifa688.com -

服务端首荐消息,身份验证

  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")    .addHeader(RawHeader("action","insert:county"))    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

每当客户端收到SSE后即运营downloadFiles函数。downloadFiles函数定义:

下边是本次钻探的示范代码:

服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来促成的。ServerSent伊芙nt类型定义如下:

             optionalHeaderValueByName("action") {                case Some =>                  entity(asSourceOf[County]) { source =>                    val futofNames: Future[List[String]] =                      source.runFold(List[String]("")) => acc    List                    complete(s"Received rows for $action")                  }                case None => complete ("No action specified!")              }
    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._    import system.dispatcher        Http()      .singleRequest(Get("http://localhost:8011/events"))      .flatMap(Unmarshal.to[Source[ServerSentEvent, NotUsed]])      .foreach(_.runForeach(se => downloadFiles)

服务端:

那几个类别的参数代表事件音信的数据结构。顾客能够依照实际需求丰盛利用那一个数据结构来传递音讯。服务端是由此complete以SeverSentEvent类为因素的Source来开展SSE的,如下:

当大家把Akka-http作为数据库数据交流工具时,数据是以Source[ROW,_]方式存放在Entity里的。非常多时候除数量之外大家也许需要张开一些叠合的音讯传送如对数据的现实性管理情势等。咱们可以通过Akka-http的raw-header来达成附加自定义务消防队息的传递,那项成效能够经过Akka-http提供的raw-header筛选成效来兑现。在客商端大家把附加音讯放在HttpRequest的raw header里,如下:

运算结果:

在此间客商端注脚上传数据应插入county表。服务端能够像上边那样获取那项音信:

/** * Representation of a server-sent event. According to the specification, an empty data field designates an event * which is to be ignored which is useful for heartbeats. * * @param data data, may span multiple lines * @param eventType optional type, must not contain n or r * @param id optional id, must not contain n or r * @param retry optional reconnection delay in milliseconds */final case class ServerSentEvent(  data:      String,  eventType: Option[String] = None,  id:        Option[String] = None,  retry:     Option[Int]    = None) {...}

必发88手机版,服务端对顾客端的身份验证管理办法如下:

  private def processToServerSentEvent: ServerSentEvent = {    Thread.sleep(3000)   //processing delay    ServerSentEvent(SyncFiles.fileToSync)  }

客户端:

import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.server.Directivesimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport scala.concurrent.duration.DurationIntimport akka.http.scaladsl.model.sse.ServerSentEventobject SSEServer {  def main(args: Array[String]): Unit = {    implicit val system = ActorSystem()    implicit val mat    = ActorMaterializer()    Http().bindAndHandle(route, "localhost", 8011)    scala.io.StdIn.readLine()    system.terminate()  }  object SyncFiles {    var fileToSync: String = ""  }  private def route = {    import Directives._    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._    def syncRequests =      pathPrefix("sync") {        pathSingleSlash {        post {            parameter("file") { filename =>              complete {                SyncFiles.fileToSync = filename                s"set download file to : $filename"              }            }          }        }      }    def events =      path("events") {        get {          complete {            Source              .tick(2.seconds, 2.seconds, NotUsed)              .map( _ => processToServerSentEvent)              .keepAlive(1.second, () => ServerSentEvent.heartbeat)          }        }      }    syncRequests ~ events  }  private def processToServerSentEvent: ServerSentEvent = {    Thread.sleep(3000)   //processing delay    ServerSentEvent(SyncFiles.fileToSync)  }}
         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>            authorizeAsync(_ => hasPermissions {              withoutSizeLimit {                handleExceptions(postExceptionHandler) {                  optionalHeaderValueByName("action") {                    case Some =>                      entity(asSourceOf[County]) { source =>                        val futofNames: Future[List[String]] =                          source.runFold(List[String]("")) => acc    List                        complete(s"Received rows for $action sent from $user")                      }                    case None => complete(s"$user did not specify action for uploaded rows!")                  }                }              }            }          }

服务端:

  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")                   .addHeader(RawHeader("action","insert:county"))

客商端订阅SSE的主意如下:

  import akka.http.scaladsl.server.directives.Credentials  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    credentials match {      case p @ Credentials.Provided =>        Future {          // potentially          if (p.verify("p4ssw0rd")) Some          else None        }      case _ => Future.successful    }  }  case class User(name: String)  val validUsers = Set("john","peter","tiger","susan")  def hasAdminPermissions(user: User): Future[Boolean] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    Future.successful(validUsers.contains(user.name))  }
do some thing ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download Ordersdo some other things ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download OrdersTry to download ItemsTry to download ItemsTry to download ItemsProcess finished with exit code 0

Akka-http通过Credential类的Directive提供了authentication和authorization。在顾客端能够用上面包车型地铁艺术提供自个儿的客商身份消息:

客户端:

import akka.actor._import akka.stream._import akka.stream.scaladsl._import akka.http.scaladsl.Httpimport akka._import akka.http.scaladsl.common._import spray.json.DefaultJsonProtocolimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupportimport scala.concurrent._import akka.http.scaladsl.server._import akka.http.scaladsl.server.Directives._import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocolobject Converters extends MyFormats {  case class County(id: Int, name: String)  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }  implicit val countyFormat = jsonFormat2}object HttpServerDemo extends App {  import Converters._  implicit val httpSys = ActorSystem("httpSystem")  implicit val httpMat = ActorMaterializer()  implicit val httpEC = httpSys.dispatcher  implicit val jsonStreamingSupport = EntityStreamingSupport.json()    .withParallelMarshalling(parallelism = 8, unordered = false)  def postExceptionHandler: ExceptionHandler =    ExceptionHandler {      case _: RuntimeException =>        extractRequest { req =>          req.discardEntityBytes()          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))        }    }  import akka.http.scaladsl.server.directives.Credentials  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    credentials match {      case p @ Credentials.Provided =>        Future {          // potentially          if (p.verify("p4ssw0rd")) Some          else None        }      case _ => Future.successful    }  }  case class User(name: String)  val validUsers = Set("john","peter","tiger","susan")  def hasPermissions(user: User): Future[Boolean] = {    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")    Future.successful(validUsers.contains(user.name))  }  val route =    path("rows") {      get {        complete {          source        }      } ~        post {          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>            authorizeAsync(_ => hasPermissions {              withoutSizeLimit {                handleExceptions(postExceptionHandler) {                  optionalHeaderValueByName("action") {                    case Some =>                      entity(asSourceOf[County]) { source =>                        val futofNames: Future[List[String]] =                          source.runFold(List[String]("")) => acc    List                        complete(s"Received rows for $action sent from $user")                      }                    case None => complete(s"$user did not specify action for uploaded rows!")                  }                }              }            }          }        }    }  val (port, host) = (8011,"localhost")  val bindingFuture = Http().bindAndHandle(route,host,port)  println(s"Server running at $host $port. Press any key to exit ...")  scala.io.StdIn.readLine()  bindingFuture.flatMap(_.unbind    .onComplete(_ => httpSys.terminate}
  def downloadFiles(file: String) = {    Thread.sleep(3000)   //process delay    if (file != "")      println(s"Try to download $file")  }

上边是Credential-Directive的采取办法:

  object SyncFiles {    var fileToSync: String = ""  }  private def route = {    import Directives._    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._    def syncRequests =      pathPrefix("sync") {        pathSingleSlash {        post {            parameter("file") { filename =>              complete {                SyncFiles.fileToSync = filename                s"set download file to : $filename"              }            }          }        }      }
import akka.actor._import akka.stream._import akka.stream.scaladsl._import akka.http.scaladsl.Httpimport scala.util._import akka._import akka.http.scaladsl.common._import spray.json.DefaultJsonProtocolimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupportimport akka.http.scaladsl.common.EntityStreamingSupportimport akka.http.scaladsl.model._import spray.json._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocolobject Converters extends MyFormats {  case class County(id: Int, name: String)  implicit val countyFormat = jsonFormat2}object HttpClientDemo extends App {  import Converters._  implicit val sys = ActorSystem("ClientSys")  implicit val mat = ActorMaterializer()  implicit val ec = sys.dispatcher  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()  import akka.util.ByteString  import akka.http.scaladsl.model.HttpEntity.limitableByteSource  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}  def countyToByteString(c: County) = {    ByteString(c.toJson.toString)  }  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)  val rowBytes = limitableByteSource(source via flowCountyToByteString)  import akka.http.scaladsl.model.headers._  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")    .addHeader(RawHeader("action","insert:county"))    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))  val data = HttpEntity(    ContentTypes.`application/json`,    rowBytes  )  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {    val futResp = Http.singleRequest(      request.copy(entity = dataEntity)    )    futResp      .andThen {        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>          entity.dataBytes.map(_.utf8String).runForeach        case Success(r@HttpResponse(code, _, _, _)) =>          println(s"Upload request failed, response code: $code")          r.discardEntityBytes()        case Success => println("Unable to Upload file!")        case Failure => println(s"Upload failed: ${err.getMessage}")      }  }  uploadRows(request,data)  scala.io.StdIn.readLine()  sys.terminate()}
import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.client.RequestBuilding.Getimport akka.http.scaladsl.model.HttpMethodsimport akka.http.scaladsl.unmarshalling.Unmarshalimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport akka.http.scaladsl.model.sse.ServerSentEventimport akka.http.scaladsl.model._object SSEClient {  def downloadFiles(file: String) = {    Thread.sleep(3000)   //process delay    if (file != "")      println(s"Try to download $file")  }  def main(args: Array[String]): Unit = {    implicit val system = ActorSystem()    implicit val mat    = ActorMaterializer()    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._    import system.dispatcher    Http()      .singleRequest(Get("http://localhost:8011/events"))      .flatMap(Unmarshal.to[Source[ServerSentEvent, NotUsed]])      .foreach(_.runForeach(se => downloadFiles)    scala.io.StdIn.readLine()    println("do some thing ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")    ).onSuccess {      case msg => println    }    scala.io.StdIn.readLine()    println("do some other things ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")    ).onSuccess {      case msg => println    }    scala.io.StdIn.readLine()    system.terminate()  }}
    scala.io.StdIn.readLine()    println("do some thing ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")    ).onSuccess {      case msg => println    }    scala.io.StdIn.readLine()    println("do some other things ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")    ).onSuccess {      case msg => println    }

自个儿的博客将要联合至Tencent云 社区。邀我们一起入驻

因为本身打听Akka-http的首要指标不是为了有关Web-Server的编制程序,而是想完成一套系统融合为一的api,所以也需求思索由服务端主动向顾客端发送指令的行使场景。比方二个零售店管理平台的服务端在成就了少数数据更新后须求公告各零售门市客商端下载最新数据。即使Akka-http也提供对websocket协商的匡助,但websocket的互联网连接是双向永久的,适合频仍的问答交互式服务端与客商端的沟通,消息结构也比较零碎。而大家面前碰着的或是是批次型的豁达数据库数据调换,只须要轻巧的劳动端单向音讯就行了,所以websocket不太适宜,而Akka-http的SSE应该比较相符我们的须要。SSE方式的基本原理是服务端统一聚集公布消息,各客商端长久订阅服务端公布的新闻并从音讯的剧情中筛选出属于本人应有实行的一声令下,然后开展对应的拍卖。顾客端接收SSE是在四个独自的线程里不停开展的,不会潜移暗化客商端当前的运算流程。当接到有效的音讯后就能调用叁个事务职能函数作为后台异步运算职务。

上面是这次讨论的示范源代码:

本条函数模拟发表事件数量是某种业务运算结果,在此地代表客商端供给下载文件名称。大家用客商端request来效仿设定那些文件名称:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._         complete {            Source              .tick(2.seconds, 2.seconds, NotUsed)              .map( _ => processToServerSentEvent)              .keepAlive(1.second, () => ServerSentEvent.heartbeat)          }

如上代码代表服务端定期运算processToServerSent伊芙nt再次来到ServerSentEvent类型结果后发表给持有订阅的客商端。大家用三个函数processToServerSentEvent模拟重复运算的事务职能:

下边是顾客端程序的测量检验运算步骤:

本文由必发88手机版发布,转载请注明来源:服务端首荐消息,身份验证