diff --git a/common/Common.hpp b/common/Common.hpp index 7a02dfaaed..82f8485797 100644 --- a/common/Common.hpp +++ b/common/Common.hpp @@ -36,6 +36,7 @@ constexpr const char JAILED_DOCUMENT_ROOT[] = "/user/docs/"; constexpr const char CHILD_URI[] = "/loolws/child?"; constexpr const char NEW_CHILD_URI[] = "/loolws/newchild"; constexpr const char LO_JAIL_SUBPATH[] = "lo"; +constexpr const char FORKIT_URI[] = "/loolws/forkit"; constexpr const char CAPABILITIES_END_POINT[] = "/hosting/capabilities"; diff --git a/kit/ForKit.cpp b/kit/ForKit.cpp index 4ebd579070..8b18ef34e5 100644 --- a/kit/ForKit.cpp +++ b/kit/ForKit.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -64,75 +65,88 @@ int ClientPortNumber = DEFAULT_CLIENT_PORT_NUMBER; std::string MasterLocation; #endif -/// Dispatcher class to demultiplex requests from -/// WSD and handles them. -class CommandDispatcher : public IoUtil::PipeReader +class ServerWSHandler; + +// We have a single thread and a single connection so we won't bother with +// access synchronization +std::shared_ptr WSHandler; + +class ServerWSHandler final : public WebSocketHandler { + std::string _socketName; + public: - CommandDispatcher(const int pipe) : - PipeReader("wsd_pipe_rd", pipe) + ServerWSHandler(const std::string& socketName) : + WebSocketHandler(/* isClient = */ true, /* isMasking */ false), + _socketName(socketName) { } - /// Polls WSD commands and handles them. - bool pollAndDispatch() +protected: + void handleMessage(const std::vector& data) override { - std::string message; - const int ready = readLine(message, [](){ return SigUtil::getTerminationFlag(); }); - if (ready <= 0) - { - // Termination is done via SIGTERM, which breaks the wait. - if (ready < 0) - { - if (SigUtil::getTerminationFlag()) - { - LOG_INF("Poll interrupted in " << getName() << " and TerminationFlag is set."); - } + std::string message(data.data(), data.size()); - // Break. - return false; +#if !MOBILEAPP + if (UnitKit::get().filterKitMessage(this, message)) + return; +#endif + StringVector tokens = LOOLProtocol::tokenize(message); + Log::StreamLogger logger = Log::debug(); + if (logger.enabled()) + { + logger << _socketName << ": recv ["; + for (const auto& token : tokens) + { + logger << tokens.getParam(token) << ' '; } - // Timeout. - return true; + LOG_END(logger, true); } - LOG_INF("ForKit command: [" << message << "]."); - try + // Note: Syntax or parsing errors here are unexpected and fatal. + if (SigUtil::getTerminationFlag()) { - StringVector tokens = LOOLProtocol::tokenize(message); - if (tokens.size() == 2 && tokens.equals(0, "spawn")) + LOG_DBG("Termination flag set: skip message processing"); + } + else if (tokens.size() == 2 && tokens.equals(0, "spawn")) + { + const int count = std::stoi(tokens[1]); + if (count > 0) { - const int count = std::stoi(tokens[1]); - if (count > 0) - { - LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request."); - ForkCounter = count; - } - else - { - LOG_WRN("Cannot spawn " << tokens[1] << " children as requested."); - } - } - else if (tokens.size() == 3 && tokens.equals(0, "setconfig")) - { - // Currently only rlimit entries are supported. - if (!Rlimit::handleSetrlimitCommand(tokens)) - { - LOG_ERR("Unknown setconfig command: " << message); - } + LOG_INF("Setting to spawn " << tokens[1] << " child" << (count == 1 ? "" : "ren") << " per request."); + ForkCounter = count; } else { - LOG_ERR("Unknown command: " << message); + LOG_WRN("Cannot spawn " << tokens[1] << " children as requested."); } } - catch (const std::exception& exc) + else if (tokens.size() == 3 && tokens.equals(0, "setconfig")) { - LOG_ERR("Error while processing forkit request [" << message << "]: " << exc.what()); + // Currently only rlimit entries are supported. + if (!Rlimit::handleSetrlimitCommand(tokens)) + { + LOG_ERR("Unknown setconfig command: " << message); + } } + else if (tokens.equals(0, "exit")) + { + LOG_INF("Setting TerminationFlag due to 'exit' command from parent."); + SigUtil::setTerminationFlag(); + } + else + { + LOG_ERR("Bad or unknown token [" << tokens[0] << "]"); + } + } - return true; + void onDisconnect() override + { +#if !MOBILEAPP + LOG_WRN("ForKit connection lost without exit arriving from wsd. Setting TerminationFlag"); + SigUtil::setTerminationFlag(); +#endif } }; @@ -234,7 +248,7 @@ static void cleanupChildren() LOG_ERR("Unknown child " << exitedChildPid << " has exited"); } } - + // Now delete the jails. for (const auto& path : jails) { @@ -545,18 +559,22 @@ int main(int argc, char** argv) Log::logger().setLevel(LogLevel); } - CommandDispatcher commandDispatcher(0); + SocketPoll mainPoll(Util::getThreadName()); + mainPoll.runOnClientThread(); // We will do the polling on this thread. + + WSHandler = std::make_shared("forkit_ws"); + +#if !MOBILEAPP + mainPoll.insertNewUnixSocket(MasterLocation, FORKIT_URI, WSHandler); +#endif + LOG_INF("ForKit process is ready."); while (!SigUtil::getTerminationFlag()) { UnitKit::get().invokeForKitTest(); - if (!commandDispatcher.pollAndDispatch()) - { - LOG_INF("Child dispatcher flagged for termination."); - break; - } + mainPoll.poll(POLL_TIMEOUT_MS); #if ENABLE_DEBUG if (!SingleKit) diff --git a/net/Socket.hpp b/net/Socket.hpp index d09c39334d..fd0a5278d2 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -543,6 +543,11 @@ public: } } + const std::thread::id &getThreadOwner() + { + return _owner; + } + /// Are we running in either shutdown, or the polling thread. /// Asserts in the debug builds, otherwise just logs. void assertCorrectThread() const diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp index 93fe768277..ca72ecd610 100644 --- a/wsd/Admin.cpp +++ b/wsd/Admin.cpp @@ -24,7 +24,6 @@ #include #include "FileServer.hpp" #include -#include "LOOLWSD.hpp" #include #include #include "Storage.hpp" @@ -352,7 +351,6 @@ Admin::Admin() : SocketPoll("admin"), _model(AdminModel()), _forKitPid(-1), - _forKitWritePipe(-1), _lastTotalMemory(0), _lastJiffies(0), _lastSentCount(0), @@ -594,10 +592,7 @@ void Admin::notifyForkit() << "setconfig limit_file_size_mb " << _defDocProcSettings.getLimitFileSizeMb() << '\n' << "setconfig limit_num_open_files " << _defDocProcSettings.getLimitNumberOpenFiles() << '\n'; - if (_forKitWritePipe != -1) - IoUtil::writeToPipe(_forKitWritePipe, oss.str()); - else - LOG_INF("Forkit write pipe not set (yet)."); + LOOLWSD::sendMessageToForKit(oss.str()); } void Admin::triggerMemoryCleanup(const size_t totalMem) diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp index 5d3cd4b051..e39d89fe78 100644 --- a/wsd/Admin.hpp +++ b/wsd/Admin.hpp @@ -16,6 +16,7 @@ #include "Log.hpp" #include "net/WebSocketHandler.hpp" +#include "LOOLWSD.hpp" class Admin; @@ -91,7 +92,6 @@ public: void rmDoc(const std::string& docKey); void setForKitPid(const int forKitPid) { _forKitPid = forKitPid; _model.setForKitPid(forKitPid);} - void setForKitWritePipe(const int forKitWritePipe) { _forKitWritePipe = forKitWritePipe; } /// Callers must ensure that modelMutex is acquired AdminModel& getModel(); @@ -156,7 +156,6 @@ private: /// the Admin Poll thread. AdminModel _model; int _forKitPid; - int _forKitWritePipe; size_t _lastTotalMemory; size_t _lastJiffies; uint64_t _lastSentCount; diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index e304cd60c1..251e97bba2 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -58,9 +58,11 @@ public: } }; +#include "LOOLWSD.hpp" + /// Represents a new LOK child that is read /// to host a document. -class ChildProcess +class ChildProcess : public WSProcess { public: /// @param pid is the process ID of the child. @@ -70,12 +72,9 @@ public: const std::shared_ptr& socket, const Poco::Net::HTTPRequest &request) : - _pid(pid), - _jailId(jailId), - _ws(std::make_shared(socket, request)), - _socket(socket) + WSProcess("ChildProcess", pid, socket, std::make_shared(socket, request)), + _jailId(jailId) { - LOG_INF("ChildProcess ctor [" << _pid << "]."); } @@ -83,125 +82,12 @@ public: const ChildProcess& operator=(ChildProcess&& other) = delete; - ~ChildProcess() - { - LOG_DBG("~ChildProcess dtor [" << _pid << "]."); - - if (_pid <= 0) - return; - - terminate(); - - // No need for the socket anymore. - _ws.reset(); - _socket.reset(); - } - void setDocumentBroker(const std::shared_ptr& docBroker); std::shared_ptr getDocumentBroker() const { return _docBroker.lock(); } - - /// Let the child close a nice way. - void close() - { - if (_pid < 0) - return; - - try - { - LOG_DBG("Closing ChildProcess [" << _pid << "]."); - - // Request the child to exit - if (isAlive()) - { - LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command."); - sendTextFrame("exit"); - } - - // Shutdown the socket. - if (_ws) - _ws->shutdown(); - } - catch (const std::exception& ex) - { - LOG_ERR("Error while closing child process: " << ex.what()); - } - - _pid = -1; // Detach from child. - } - - /// Kill or abandon the child. - void terminate() - { - if (_pid < 0) - return; - -#if !MOBILEAPP - if (::kill(_pid, 0) == 0) - { - LOG_INF("Killing child [" << _pid << "]."); - if (!SigUtil::killChild(_pid)) - { - LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning."); - } - } -#else - // What to do? Throw some unique exception that the outermost call in the thread catches and - // exits from the thread? -#endif - _pid = -1; - } - - Poco::Process::PID getPid() const { return _pid; } const std::string& getJailId() const { return _jailId; } - /// Send a text payload to the child-process WS. - bool sendTextFrame(const std::string& data) - { - try - { - if (_ws) - { - LOG_TRC("Send DocBroker to Child message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "]."); - _ws->sendMessage(data); - return true; - } - } - catch (const std::exception& exc) - { - LOG_ERR("Failed to send child [" << _pid << "] data [" << - LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what()); - throw; - } - - LOG_WRN("No socket between DocBroker and child to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]"); - return false; - } - - /// Check whether this child is alive and socket not in error. - /// Note: zombies will show as alive, and sockets have waiting - /// time after the other end-point closes. So this isn't accurate. - bool isAlive() const - { -#if !MOBILEAPP - try - { - return _pid > 1 && _ws && ::kill(_pid, 0) == 0; - } - catch (const std::exception&) - { - } - - return false; -#else - return _pid > 1; -#endif - } - private: - Poco::Process::PID _pid; const std::string _jailId; - std::shared_ptr _ws; - std::shared_ptr _socket; std::weak_ptr _docBroker; }; diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 54ba8e2b9c..e9e17b86fb 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -91,7 +91,6 @@ using Poco::Net::PartHandler; #include #include #include -#include #include #include #include @@ -387,16 +386,11 @@ static int forkChildren(const int number) #else const std::string aMessage = "spawn " + std::to_string(number) + "\n"; LOG_DBG("MasterToForKit: " << aMessage.substr(0, aMessage.length() - 1)); - if (write(LOOLWSD::ForKitWritePipe, aMessage.c_str(), aMessage.length()) > 0) + LOOLWSD::sendMessageToForKit(aMessage); #endif - { - OutstandingForks += number; - LastForkRequestTime = std::chrono::steady_clock::now(); - return number; - } - - LOG_ERR("No forkit pipe while rebalancing children."); - return -1; // Fail. + OutstandingForks += number; + LastForkRequestTime = std::chrono::steady_clock::now(); + return number; } return 0; @@ -704,8 +698,8 @@ inline std::string getServiceURI(const std::string &sub, bool asAdmin = false) std::atomic LOOLWSD::NextConnectionId(1); #ifndef KIT_IN_PROCESS -std::atomic LOOLWSD::ForKitWritePipe(-1); std::atomic LOOLWSD::ForKitProcId(-1); +std::shared_ptr LOOLWSD::ForKitProc; #endif #if !MOBILEAPP bool LOOLWSD::NoCapsForKit = false; @@ -780,6 +774,42 @@ public: /// Check prisoners are still alive and balanced. void wakeupHook() override; + + // Resets the forkit porcess object + void setForKitProcess(const std::weak_ptr& forKitProc) + { + assertCorrectThread(); + _forKitProc = forKitProc; + } + + void sendMessageToForKit(const std::string& msg) + { + if (std::this_thread::get_id() == getThreadOwner()) + { + // Speed up sending the message if the request comes from owner thread + std::shared_ptr forKitProc = _forKitProc.lock(); + if (forKitProc) + { + forKitProc->sendTextFrame(msg); + } + } + else + { + // Put the message in the owner's thread queue to be send later + // because WebSocketHandler is not thread safe and otherwise we + // should synchronize inside WebSocketHandler. + addCallback([=]{ + std::shared_ptr forKitProc = _forKitProc.lock(); + if (forKitProc) + { + forKitProc->sendTextFrame(msg); + } + }); + } + } + +private: + std::weak_ptr _forKitProc; }; /// This thread listens for and accepts prisoner kit processes. @@ -799,6 +829,17 @@ public: } }; +void ForKitProcWSHandler::handleMessage(const std::vector &data) +{ + LOG_TRC("ForKitProcWSHandler: handling incoming [" << LOOLProtocol::getAbbreviatedMessage(&data[0], data.size()) << "]."); + const std::string firstLine = LOOLProtocol::getFirstLine(&data[0], data.size()); + const StringVector tokens = LOOLProtocol::tokenize(firstLine.data(), firstLine.size()); + + // Just add here the processing of specific received messages + + LOG_ERR("ForKitProcWSHandler: unknown command: " << tokens[0]); +} + LOOLWSD::LOOLWSD() { } @@ -1708,13 +1749,10 @@ bool LOOLWSD::createForKit() Admin::instance().setForKitPid(ForKitProcId); } - if (ForKitWritePipe != -1) - { - close(ForKitWritePipe); - ForKitWritePipe = -1; - Admin::instance().setForKitWritePipe(ForKitWritePipe); - } - + // Below line will be executed by PrisonerPoll thread. + ForKitProc = nullptr; + PrisonerPoll.setForKitProcess(ForKitProc); + // ForKit always spawns one. ++OutstandingForks; @@ -1722,17 +1760,13 @@ bool LOOLWSD::createForKit() args.cat(std::string(" "), 0)); LastForkRequestTime = std::chrono::steady_clock::now(); - int childStdin = -1; - int child = Util::spawnProcess(forKitPath, args, nullptr, &childStdin); - - ForKitWritePipe = childStdin; + int child = Util::spawnProcess(forKitPath, args); ForKitProcId = child; LOG_INF("Forkit process launched: " << ForKitProcId); // Init the Admin manager Admin::instance().setForKitPid(ForKitProcId); - Admin::instance().setForKitWritePipe(ForKitWritePipe); const int balance = LOOLWSD::NumPreSpawnedChildren - OutstandingForks; if (balance > 0) @@ -1742,6 +1776,11 @@ bool LOOLWSD::createForKit() #endif } +void LOOLWSD::sendMessageToForKit(const std::string& message) +{ + PrisonerPoll.sendMessageToForKit(message); +} + #endif // !MOBILEAPP #ifdef FUZZER @@ -1903,6 +1942,20 @@ private: LOG_TRC("Child connection with URI [" << LOOLWSD::anonymizeUrl(request.getURI()) << "]."); Poco::URI requestURI(request.getURI()); +#ifndef KIT_IN_PROCESS + if (requestURI.getPath() == FORKIT_URI) + { + if (socket->getPid() != LOOLWSD::ForKitProcId) + { + LOG_WRN("Connection request received on " << FORKIT_URI << " endpoint from unexpected ForKit process. Skipped."); + return; + } + LOOLWSD::ForKitProc = std::make_shared(LOOLWSD::ForKitProcId, socket, request); + socket->getInBuffer().clear(); + PrisonerPoll.setForKitProcess(LOOLWSD::ForKitProc); + return; + } +#endif if (requestURI.getPath() != NEW_CHILD_URI) { LOG_ERR("Invalid incoming URI."); @@ -3621,7 +3674,8 @@ int LOOLWSD::innerMain() LOG_INF("Waiting for forkit process to exit"); int status = 0; waitpid(ForKitProcId, &status, WUNTRACED); - close(ForKitWritePipe); + ForKitProcId = -1; + ForKitProc.reset(); #endif // In case forkit didn't cleanup properly, don't leave jails behind. diff --git a/wsd/LOOLWSD.hpp b/wsd/LOOLWSD.hpp index 469d26bec1..33056538fa 100644 --- a/wsd/LOOLWSD.hpp +++ b/wsd/LOOLWSD.hpp @@ -17,6 +17,8 @@ #include #include +#include + #include #include #include @@ -25,6 +27,7 @@ #include "Util.hpp" #include "FileUtil.hpp" +#include "WebSocketHandler.hpp" class ChildProcess; class TraceFileWriter; @@ -36,6 +39,170 @@ std::shared_ptr getNewChild_Blocks( const std::string& uri #endif ); +// This is common code used to setup as socket to both +// forkit and child document processes via a websocket. +// In general, a WSProcess instance represents a child +// process with which we can communicate through websocket. +class WSProcess +{ +public: + /// @param pid is the process ID. + /// @param socket is the underlying Sockeet to the process. + WSProcess(const std::string& name, + const Poco::Process::PID pid, + const std::shared_ptr& socket, + std::shared_ptr handler) : + + _name(name), + _pid(pid), + _ws(handler), + _socket(socket) + { + LOG_INF(_name << " ctor [" << _pid << "]."); + } + + + WSProcess(WSProcess&& other) = delete; + + const WSProcess& operator=(WSProcess&& other) = delete; + + virtual ~WSProcess() + { + LOG_DBG("~" << _name << " dtor [" << _pid << "]."); + + if (_pid <= 0) + return; + + terminate(); + + // No need for the socket anymore. + _ws.reset(); + _socket.reset(); + } + + /// Let the child close a nice way. + void close() + { + if (_pid < 0) + return; + + try + { + LOG_DBG("Closing ChildProcess [" << _pid << "]."); + + // Request the child to exit + if (isAlive()) + { + LOG_DBG("Stopping ChildProcess [" << _pid << "] by sending 'exit' command."); + sendTextFrame("exit"); + } + + // Shutdown the socket. + if (_ws) + _ws->shutdown(); + } + catch (const std::exception& ex) + { + LOG_ERR("Error while closing child process: " << ex.what()); + } + + _pid = -1; // Detach from child. + } + + /// Kill or abandon the child. + void terminate() + { + if (_pid < 0) + return; + +#if !MOBILEAPP + if (::kill(_pid, 0) == 0) + { + LOG_INF("Killing child [" << _pid << "]."); + if (!SigUtil::killChild(_pid)) + { + LOG_ERR("Cannot terminate lokit [" << _pid << "]. Abandoning."); + } + } +#else + // What to do? Throw some unique exception that the outermost call in the thread catches and + // exits from the thread? +#endif + _pid = -1; + } + + Poco::Process::PID getPid() const { return _pid; } + + /// Send a text payload to the child-process WS. + virtual bool sendTextFrame(const std::string& data) + { + try + { + if (_ws) + { + LOG_TRC("Send to " << _name << " message: [" << LOOLProtocol::getAbbreviatedMessage(data) << "]."); + _ws->sendMessage(data); + return true; + } + } + catch (const std::exception& exc) + { + LOG_ERR("Failed to send " << _name << " [" << _pid << "] data [" << + LOOLProtocol::getAbbreviatedMessage(data) << "] due to: " << exc.what()); + throw; + } + + LOG_WRN("No socket to " << _name << " to send [" << LOOLProtocol::getAbbreviatedMessage(data) << "]"); + return false; + } + + /// Check whether this child is alive and socket not in error. + /// Note: zombies will show as alive, and sockets have waiting + /// time after the other end-point closes. So this isn't accurate. + virtual bool isAlive() const + { +#if !MOBILEAPP + try + { + return _pid > 1 && _ws && ::kill(_pid, 0) == 0; + } + catch (const std::exception&) + { + } + + return false; +#else + return _pid > 1; +#endif + } + + std::string _name; + Poco::Process::PID _pid; + std::shared_ptr _ws; + std::shared_ptr _socket; +}; + +class ForKitProcWSHandler: public WebSocketHandler, public std::enable_shared_from_this +{ +public: + + ForKitProcWSHandler(const std::weak_ptr& socket, const Poco::Net::HTTPRequest& request) + : WebSocketHandler(socket, request) + { + } + + virtual void handleMessage(const std::vector &data) override; +}; + +class ForKitProcess : public WSProcess +{ +public: + ForKitProcess(int pid, std::shared_ptr& socket, const Poco::Net::HTTPRequest &request) + : WSProcess("ForKit", pid, socket, std::make_shared(socket, request)) + { + socket->setHandler(_ws); + } +}; /// The Server class which is responsible for all /// external interactions. @@ -57,7 +224,7 @@ public: static bool SingleKit; #endif #endif - static std::atomic ForKitWritePipe; + static std::shared_ptr ForKitProc; static std::atomic ForKitProcId; static bool DummyLOK; static std::string FuzzFileName; @@ -188,6 +355,9 @@ public: /// Return true when successfull. static bool createForKit(); + /// Sends a message to ForKit through PrisonerPoll. + static void sendMessageToForKit(const std::string& message); + /// Checks forkit (and respawns), rebalances /// child kit processes and cleans up DocBrokers. static void doHousekeeping();