From cb2450cf16a80438898c78cacb59f2f1e3aa94be Mon Sep 17 00:00:00 2001
From: Rob Hoes <rob.hoes@citrix.com>
Date: Mon, 11 Jul 2022 16:28:21 +0000
Subject: [PATCH 3/6] Limit concurrent connections with semaphore

To add some protection against overloading the server, and running out
of file descriptors and other resources, the number of concurrent
connections is now limited to 800 by default. Connections coming in when
the limit has been reached are put on hold, and not accepted until
another connection finishes.

There are three separate buckets for connections to the main unix
socket, the client-cert (unix) socket and the TCP socket.

This also add some more debug logging when connections are accepted and
disconnected. This is disabled by default along with other "http" logs,
but can be enabled in the conf file.

Signed-off-by: Rob Hoes <rob.hoes@citrix.com>
---
 ocaml/database/database_server_main.ml |  2 +-
 ocaml/libs/http-svr/http_svr.ml        | 26 ++++++++++++++++++++++----
 ocaml/libs/http-svr/http_svr.mli       |  2 +-
 ocaml/libs/http-svr/server_io.ml       | 17 +++++++++++------
 ocaml/libs/http-svr/server_io.mli      |  1 +
 ocaml/libs/http-svr/test_server.ml     |  2 +-
 ocaml/xapi/xapi.ml                     |  6 +++++-
 ocaml/xapi/xapi_globs.ml               |  9 +++++++++
 ocaml/xapi/xapi_mgmt_iface.ml          |  7 +++++--
 ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml    |  2 +-
 10 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/ocaml/database/database_server_main.ml b/ocaml/database/database_server_main.ml
index 9778e4a39..4809bc7fd 100644
--- a/ocaml/database/database_server_main.ml
+++ b/ocaml/database/database_server_main.ml
@@ -81,7 +81,7 @@ let _ =
           (Http_svr.BufIO remote_database_access_handler_v1) ;
         Http_svr.Server.add_handler server Http.Post "/post_remote_db_access_v2"
           (Http_svr.BufIO remote_database_access_handler_v2) ;
-        Http_svr.start server socket ;
+        Http_svr.start ~conn_limit:1024 server socket ;
         Printf.printf "server listening\n%!" ;
         if !self_test then (
           Printf.printf "Running unit-tests\n%!" ;
diff --git a/ocaml/libs/http-svr/http_svr.ml b/ocaml/libs/http-svr/http_svr.ml
index 4c56f5be0..9017f94bf 100644
--- a/ocaml/libs/http-svr/http_svr.ml
+++ b/ocaml/libs/http-svr/http_svr.ml
@@ -105,6 +105,7 @@ let response_fct req ?(hdrs = []) s (response_length : int64)
       Http.Response.content_length= Some response_length
     }
   in
+  D.debug "Response %s" (Http.Response.to_string res) ;
   Unixext.really_write_string s (Http.Response.to_wire_string res) ;
   write_response_to_fd_fn s
 
@@ -122,6 +123,7 @@ let response_missing ?(hdrs = []) s body =
       ~headers:(connection :: cache :: hdrs)
       ~body "404" "Not Found"
   in
+  D.debug "Response %s" (Http.Response.to_string res) ;
   Unixext.really_write_string s (Http.Response.to_wire_string res)
 
 let response_error_html ?(version = "1.1") s code message hdrs body =
@@ -133,6 +135,7 @@ let response_error_html ?(version = "1.1") s code message hdrs body =
       ~headers:(content_type :: connection :: cache :: hdrs)
       ~body code message
   in
+  D.debug "Response %s" (Http.Response.to_string res) ;
   Unixext.really_write_string s (Http.Response.to_wire_string res)
 
 let response_unauthorised ?req label s =
@@ -484,7 +487,15 @@ let handle_one (x : 'a Server.t) ss context req =
     ) ;
     !finished
 
-let handle_connection (x : 'a Server.t) _ ss =
+let handle_connection (x : 'a Server.t) caller ss =
+  ( match caller with
+  | Unix.ADDR_UNIX _ ->
+      debug "Accepted unix connection"
+  | Unix.ADDR_INET (addr, port) ->
+      debug "Accepted inet connection from %s:%d"
+        (Unix.string_of_inet_addr addr)
+        port
+  ) ;
   let ic = Buf_io.of_fd ss in
   (* For HTTPS requests, a PROXY header is sent by stunnel right at the beginning of
      of its connection to the server, before HTTP requests are transferred, and
@@ -504,7 +515,7 @@ let handle_connection (x : 'a Server.t) _ ss =
     if not finished then
       loop proxy
   in
-  loop None ; Unix.close ss
+  loop None ; debug "Closing connection" ; Unix.close ss
 
 let bind ?(listen_backlog = 128) sockaddr name =
   let domain =
@@ -570,8 +581,15 @@ let socket_table = Hashtbl.create 10
 type socket = Unix.file_descr * string
 
 (* Start an HTTP server on a new socket *)
-let start (x : 'a Server.t) (socket, name) =
-  let handler = {Server_io.name; body= handle_connection x} in
+let start ~conn_limit (x : 'a Server.t) (socket, name)
+    =
+let handler =
+    {
+      Server_io.name
+    ; body= handle_connection x
+    ; lock= Xapi_stdext_threads.Semaphore.create conn_limit
+    }
+  in
   let server = Server_io.server handler socket in
   Hashtbl.add socket_table socket server
 
diff --git a/ocaml/libs/http-svr/http_svr.mli b/ocaml/libs/http-svr/http_svr.mli
index 3781c7eee..d0c79e4d4 100644
--- a/ocaml/libs/http-svr/http_svr.mli
+++ b/ocaml/libs/http-svr/http_svr.mli
@@ -59,7 +59,7 @@ val bind : ?listen_backlog:int -> Unix.sockaddr -> string -> socket
 (* [bind_retry]: like [bind] but will catch (possibly transient exceptions) and retry *)
 val bind_retry : ?listen_backlog:int -> Unix.sockaddr -> socket
 
-val start : 'a Server.t -> socket -> unit
+val start : conn_limit:int -> 'a Server.t -> socket -> unit
 
 val handle_one : 'a Server.t -> Unix.file_descr -> 'a -> Http.Request.t -> bool
 
diff --git a/ocaml/libs/http-svr/server_io.ml b/ocaml/libs/http-svr/server_io.ml
index 28fd584d1..9b2d33a0e 100644
--- a/ocaml/libs/http-svr/server_io.ml
+++ b/ocaml/libs/http-svr/server_io.ml
@@ -23,12 +23,17 @@ type handler = {
     name: string
   ; (* body should close the provided fd *)
     body: Unix.sockaddr -> Unix.file_descr -> unit
+  ; lock: Xapi_stdext_threads.Semaphore.t
 }
 
 let handler_by_thread (h : handler) (s : Unix.file_descr)
     (caller : Unix.sockaddr) =
   Thread.create
-    (fun () -> Debug.with_thread_named h.name (fun () -> h.body caller s) ())
+    (fun () ->
+      Fun.protect
+        ~finally:(fun () -> Xapi_stdext_threads.Semaphore.release h.lock 1)
+        (Debug.with_thread_named h.name (fun () -> h.body caller s))
+    )
     ()
 
 (** Function with the main accept loop *)
@@ -37,16 +42,17 @@ exception PleaseClose
 
 let set_intersect a b = List.filter (fun x -> List.mem x b) a
 
-let establish_server ?(signal_fds = []) forker sock =
+let establish_server ?(signal_fds = []) forker handler sock =
   while true do
     try
       let r, _, _ = Unix.select ([sock] @ signal_fds) [] [] (-1.) in
       (* If any of the signal_fd is active then bail out *)
       if set_intersect r signal_fds <> [] then raise PleaseClose ;
+      Xapi_stdext_threads.Semaphore.acquire handler.lock 1 ;
       let s, caller = Unix.accept sock in
       try
         Unix.set_close_on_exec s ;
-        ignore (forker s caller)
+        ignore (forker handler s caller)
       with exc ->
         (* NB provided 'forker' is configured to make a background thread then the
            	     only way we can get here is if set_close_on_exec or Thread.create fails.
@@ -89,9 +95,8 @@ let server handler sock =
         Debug.with_thread_named handler.name
           (fun () ->
             try
-              establish_server ~signal_fds:[status_out]
-                (handler_by_thread handler)
-                sock
+              establish_server ~signal_fds:[status_out] handler_by_thread
+                handler sock
             with PleaseClose -> debug "Server thread exiting"
           )
           ()
diff --git a/ocaml/libs/http-svr/server_io.mli b/ocaml/libs/http-svr/server_io.mli
index b48952f89..3aca02347 100644
--- a/ocaml/libs/http-svr/server_io.mli
+++ b/ocaml/libs/http-svr/server_io.mli
@@ -16,6 +16,7 @@ type handler = {
     name: string  (** used for naming the thread *)
   ; body: Unix.sockaddr -> Unix.file_descr -> unit
         (** function called in a thread for each connection*)
+  ; lock: Xapi_stdext_threads.Semaphore.t
 }
 
 type server = {
diff --git a/ocaml/libs/http-svr/test_server.ml b/ocaml/libs/http-svr/test_server.ml
index 2b398cfa7..51e4f559e 100644
--- a/ocaml/libs/http-svr/test_server.ml
+++ b/ocaml/libs/http-svr/test_server.ml
@@ -68,7 +68,7 @@ let _ =
   let inet_addr = Unix.inet_addr_of_string ip in
   let addr = Unix.ADDR_INET (inet_addr, !port) in
   let socket = Http_svr.bind ~listen_backlog:5 addr "server" in
-  start server socket ;
+  start ~conn_limit:1024 server socket ;
   Printf.printf "Server started on %s:%d\n" ip !port ;
   with_lock finished_m (fun () ->
       while not !finished do
diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml
index d0b71fb0f..3d83d6e39 100644
--- a/ocaml/xapi/xapi.ml
+++ b/ocaml/xapi/xapi.ml
@@ -844,7 +844,11 @@ let listen_unix_socket sock_path =
   Unixext.mkdir_safe (Filename.dirname sock_path) 0o700 ;
   Unixext.unlink_safe sock_path ;
   let domain_sock = Xapi_http.bind (Unix.ADDR_UNIX sock_path) in
-  ignore (Http_svr.start Xapi_http.server domain_sock)
+  ignore
+    (Http_svr.start
+       ~conn_limit:!Xapi_globs.conn_limit_unix
+       Xapi_http.server domain_sock
+    )
 
 let set_stunnel_timeout () =
   try
diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml
index d9c0c2b7b..8dfd98538 100644
--- a/ocaml/xapi/xapi_globs.ml
+++ b/ocaml/xapi/xapi_globs.ml
@@ -957,6 +957,12 @@ let sqlite3 = ref "/usr/bin/sqlite3"
 
 let samba_dir = "/var/lib/samba"
 
+let conn_limit_tcp = ref 800
+
+let conn_limit_unix = ref 1024
+
+let conn_limit_clientcert = ref 800
+
 let xapi_globs_spec =
   [
     ( "master_connection_reset_timeout"
@@ -1030,6 +1036,9 @@ let xapi_globs_spec =
   ; ( "winbind_update_closest_kdc_interval"
     , Float winbind_update_closest_kdc_interval
     )
+  ; ("conn_limit_tcp", Int conn_limit_tcp)
+  ; ("conn_limit_unix", Int conn_limit_unix)
+  ; ("conn_limit_clientcert", Int conn_limit_clientcert)
   ]
 
 let options_of_xapi_globs_spec =
diff --git a/ocaml/xapi/xapi_mgmt_iface.ml b/ocaml/xapi/xapi_mgmt_iface.ml
index be93ae258..381617f47 100644
--- a/ocaml/xapi/xapi_mgmt_iface.ml
+++ b/ocaml/xapi/xapi_mgmt_iface.ml
@@ -81,7 +81,8 @@ end = struct
           ipv6_enabled := Unix.domain_of_sockaddr sockaddr = Unix.PF_INET6 ;
           Xapi_http.bind sockaddr
     in
-    Http_svr.start Xapi_http.server socket ;
+    Http_svr.start ~conn_limit:!Xapi_globs.conn_limit_tcp Xapi_http.server
+      socket ;
     management_servers := socket :: !management_servers ;
     if Pool_role.is_master () && addr = None then
       (* NB if we synchronously bring up the management interface on a master with a blank
@@ -139,7 +140,9 @@ module Client_certificate_auth_server = struct
       Unixext.mkdir_safe (Filename.dirname sock_path) 0o700 ;
       Unixext.unlink_safe sock_path ;
       let domain_sock = Xapi_http.bind (Unix.ADDR_UNIX sock_path) in
-      Http_svr.start Xapi_http.server domain_sock ;
+      Http_svr.start
+        ~conn_limit:!Xapi_globs.conn_limit_clientcert
+        Xapi_http.server domain_sock ;
       management_server := Some domain_sock
     )
 
diff --git a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
index 49dc74131..8d017d481 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
+++ b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
@@ -93,7 +93,7 @@ let start (xmlrpc_path, http_fwd_path) process =
   Xapi_stdext_unix.Unixext.mkdir_safe (Filename.dirname xmlrpc_path) 0o700 ;
   Xapi_stdext_unix.Unixext.unlink_safe xmlrpc_path ;
   let xmlrpc_socket = Http_svr.bind (Unix.ADDR_UNIX xmlrpc_path) "unix_rpc" in
-  Http_svr.start server xmlrpc_socket ;
+  Http_svr.start ~conn_limit:1024 server xmlrpc_socket ;
   Xapi_stdext_unix.Unixext.unlink_safe http_fwd_path ;
   let http_fwd_socket = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
   Unix.bind http_fwd_socket (Unix.ADDR_UNIX http_fwd_path) ;
-- 
2.31.1

