注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录C的内容都在本文呈现。
Jetty简介
Jetty是一个开源的,以Java作为开发语言的servlet容器。它的API以一组JAR包的形式发布。Jetty
容器可以实例化成一个对象,因而迅速为一些独立运行的Java应用提供网络和web服务。
要为Jetty创建servlet,就涉及ServletContextHandler的API使用。示例代码如下:
class HelloServlet extends HttpServlet { private static final long 
serialVersionUID = 1L; private String msg = "Hello World!"; protected void 
doGet(HttpServletRequest request, HttpServletResponse response) throws 
ServletException, IOException { response.setContentType("text/html"); 
response.setStatus(HttpServletResponse.SC_OK); 
response.getWriter().println("<h1>" + msg + "</h1>"); 
response.getWriter().println("session=" + request.getSession(true).getId()); } 
} public static void main(String[] args) throws Exception { Server server = new 
Server(8080); ServletContextHandler context = new ServletContextHandler(); 
context.setContextPath("/"); server.setHandler(context); // 
http://localhost:8080/hello context.addServlet(new ServletHolder(new 
HelloServlet()), "/hello"); server.start(); server.join(); }如果想更深入了解Jetty,请访问官网
http://www.eclipse.org/jetty/ <http://www.eclipse.org/jetty/>
JettyUtils
JettyUtils是Spark对于Jetty相关API的又一层封装,这里对其中一些主要类型和方法进行介绍。
ServerInfo
功能描述:提供给Jetty服务器添加或移除ContextHandler,以及停止Jetty服务器的实现。
private[spark] case class ServerInfo( server: Server, boundPort: Int, 
securePort: Option[Int], private val rootHandler: ContextHandlerCollection) { 
def addHandler(handler: ContextHandler): Unit = { 
handler.setVirtualHosts(Array("@" + JettyUtils.SPARK_CONNECTOR_NAME)) 
rootHandler.addHandler(handler) if (!handler.isStarted()) { handler.start() } } 
def removeHandler(handler: ContextHandler): Unit = { 
rootHandler.removeHandler(handler) if (handler.isStarted) { handler.stop() } } 
def stop(): Unit = { server.stop() // Stop the ThreadPool if it supports stop() 
method (through LifeCycle). // It is needed because stopping the Server won't 
stop the ThreadPool it uses. val threadPool = server.getThreadPool if 
(threadPool != null && threadPool.isInstanceOf[LifeCycle]) { 
threadPool.asInstanceOf[LifeCycle].stop } } }
createServlet
功能描述:创建javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。
 def createServlet[T <% AnyRef]( servletParams: ServletParams[T], securityMgr: 
SecurityManager, conf: SparkConf): HttpServlet = { val allowFramingFrom = 
conf.getOption("spark.ui.allowFramingFrom") val xFrameOptionsValue = 
allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN") new 
HttpServlet { override def doGet(request: HttpServletRequest, response: 
HttpServletResponse) { try { if 
(securityMgr.checkUIViewPermissions(request.getRemoteUser)) { 
response.setContentType("%s;charset=utf-8".format(servletParams.contentType)) 
response.setStatus(HttpServletResponse.SC_OK) val result = 
servletParams.responder(request) response.setHeader("Cache-Control", "no-cache, 
no-store, must-revalidate") response.setHeader("X-Frame-Options", 
xFrameOptionsValue) response.getWriter.print(servletParams.extractFn(result)) } 
else { response.setStatus(HttpServletResponse.SC_UNAUTHORIZED) 
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate") 
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User is not authorized 
to access this page.") } } catch { case e: IllegalArgumentException => 
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage) case e: 
Exception => logWarning(s"GET ${request.getRequestURI} failed: $e", e) throw e 
} } // SPARK-5983 ensure TRACE is not supported protected override def 
doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = { 
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } } }
createServletHandler
功能描述:创建以给定路径为前缀的请求的ServletContextHandler。处理步骤如下:
1) 
调用createServlet,生成javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。
2)调用重载的createServletHandler方法,生成org.eclipse.jetty.servlet.ServletHolder 
,并最终生成ServletContextHandler。
createServletHandler的实现如下。
 def createServletHandler[T <% AnyRef]( path: String, servletParams: 
ServletParams[T], securityMgr: SecurityManager, conf: SparkConf, basePath: 
String = ""): ServletContextHandler = { createServletHandler(path, 
createServlet(servletParams, securityMgr, conf), basePath) } /** Create a 
context handler that responds to a request with the given path prefix */ def 
createServletHandler( path: String, servlet: HttpServlet, basePath: String): 
ServletContextHandler = { val prefixedPath = if (basePath == "" && path == "/") 
{ path } else { (basePath + path).stripSuffix("/") } val contextHandler = new 
ServletContextHandler val holder = new ServletHolder(servlet) 
contextHandler.setContextPath(prefixedPath) contextHandler.addServlet(holder, 
"/") contextHandler }
createStaticHandler
功能描述:创建对静态目录提供文件服务的ServletContextHandler。
 def createStaticHandler(resourceBase: String, path: String): 
ServletContextHandler = { val contextHandler = new ServletContextHandler 
contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", 
"false") val staticHandler = new DefaultServlet val holder = new 
ServletHolder(staticHandler) 
Option(Utils.getSparkClassLoader.getResource(resourceBase)) match { case 
Some(res) => holder.setInitParameter("resourceBase", res.toString) case None => 
throw new Exception("Could not find resource path for Web UI: " + resourceBase) 
} contextHandler.setContextPath(path) contextHandler.addServlet(holder, "/") 
contextHandler }
createRedirectHandler
功能描述:创建将用户对源路径的请求总是重定向到目标路径的ServletContextHandler。
 def createRedirectHandler( srcPath: String, destPath: String, beforeRedirect: 
HttpServletRequest => Unit = x => (), basePath: String = "", httpMethods: 
Set[String] = Set("GET")): ServletContextHandler = { val prefixedDestPath = 
basePath + destPath val servlet = new HttpServlet { override def doGet(request: 
HttpServletRequest, response: HttpServletResponse): Unit = { if 
(httpMethods.contains("GET")) { doRequest(request, response) } else { 
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } } override def 
doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = { if 
(httpMethods.contains("POST")) { doRequest(request, response) } else { 
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } } private def 
doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = { 
beforeRedirect(request) // Make sure we don't end up with "//" in the middle 
val newUrl = new URL(new URL(request.getRequestURL.toString), 
prefixedDestPath).toString response.sendRedirect(newUrl) } // SPARK-5983 ensure 
TRACE is not supported protected override def doTrace(req: HttpServletRequest, 
res: HttpServletResponse): Unit = { 
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } } 
createServletHandler(srcPath, servlet, basePath) }
startJettyServer
功能描述:创建以给定路径为前缀的请求的响应处理。处理步骤如下:
1) 将SparkUI中的全部handler加入ContextHandlerCollection。
2) 如果使用配置spark.ui.filters指定了filter,则给所有handler增加filter。
3) 调用Utils的方法startServiceOnPort,最终回调函数connect。
startJettyServer的实现如下。
 def startJettyServer( hostName: String, port: Int, sslOptions: SSLOptions, 
handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = 
""): ServerInfo = { addFilters(handlers, conf) val gzipHandlers = handlers.map 
{ h => h.setVirtualHosts(Array("@" + SPARK_CONNECTOR_NAME)) val gzipHandler = 
new GzipHandler gzipHandler.setHandler(h) gzipHandler } // Bind to the given 
port, or throw a java.net.BindException if the port is occupied def 
connect(currentPort: Int): ((Server, Option[Int]), Int) = { val pool = new 
QueuedThreadPool if (serverName.nonEmpty) { pool.setName(serverName) } 
pool.setDaemon(true) val server = new Server(pool) val connectors = new 
ArrayBuffer[ServerConnector]() val collection = new ContextHandlerCollection // 
Create a connector on port currentPort to listen for HTTP requests val 
httpConnector = new ServerConnector( server, null, // Call this full 
constructor to set this, which forces daemon threads: new 
ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true), null, -1, -1, 
new HttpConnectionFactory()) httpConnector.setPort(currentPort) connectors += 
httpConnector val httpsConnector = sslOptions.createJettySslContextFactory() 
match { case Some(factory) => // If the new port wraps around, do not try a 
privileged port. val securePort = if (currentPort != 0) { (currentPort + 400 - 
1024) % (65536 - 1024) + 1024 } else { 0 } val scheme = "https" // Create a 
connector on port securePort to listen for HTTPS requests val connector = new 
ServerConnector(server, factory) connector.setPort(securePort) 
connector.setName(SPARK_CONNECTOR_NAME) connectors += connector // redirect the 
HTTP requests to HTTPS port httpConnector.setName(REDIRECT_CONNECTOR_NAME) 
collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) 
Some(connector) case None => // No SSL, so the HTTP connector becomes the 
official one where all contexts bind. 
httpConnector.setName(SPARK_CONNECTOR_NAME) None } // As each acceptor and each 
selector will use one thread, the number of threads should at // least be the 
number of acceptors and selectors plus 1. (See SPARK-13776) var minThreads = 1 
connectors.foreach { connector => // Currently we only use 
"SelectChannelConnector" // Limit the max acceptor number to 8 so that we don't 
waste a lot of threads 
connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8)) 
connector.setHost(hostName) // The number of selectors always equals to the 
number of acceptors minThreads += connector.getAcceptors * 2 } 
pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) val errorHandler = 
new ErrorHandler() errorHandler.setShowStacks(true) 
errorHandler.setServer(server) server.addBean(errorHandler) 
gzipHandlers.foreach(collection.addHandler) server.setHandler(collection) 
server.setConnectors(connectors.toArray) try { server.start() ((server, 
httpsConnector.map(_.getLocalPort())), httpConnector.getLocalPort) } catch { 
case e: Exception => server.stop() pool.stop() throw e } } val ((server, 
securePort), boundPort) = Utils.startServiceOnPort(port, connect, conf, 
serverName) ServerInfo(server, boundPort, securePort, 
server.getHandler().asInstanceOf[ContextHandlerCollection]) }
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下:
京东:https://item.jd.com/12302500.html <https://item.jd.com/12302500.html>
电子版售卖链接如下:京东:https://e.jd.com/30389208.html <https://e.jd.com/30389208.html>
热门工具 换一换