00001 /* 00002 * Copyright 2015 CERN 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 * 00016 */ 00017 00018 00019 00020 /** @file dmTaskExec.h 00021 * @brief A class that spawns commands that perform actions 00022 * @author Fabrizio Furano 00023 * @date Dec 2015 00024 */ 00025 00026 00027 #include <boost/thread.hpp> 00028 #include <signal.h> 00029 #include <vector> 00030 #include <string> 00031 #include <algorithm> 00032 #include <sstream> 00033 #include <iterator> 00034 #include <iostream> 00035 00036 namespace dmlite { 00037 class dmTaskExec; 00038 00039 class dmTask: public boost::mutex { 00040 00041 protected: 00042 /// Threads waiting for result about this task will wait and synchronize here 00043 /// using something like 00044 /// boost::lock_guard< boost::mutex > l(workmutex); 00045 /// 00046 boost::condition_variable condvar; 00047 public: 00048 dmTask(dmTaskExec *wheretolog); 00049 dmTask(const dmTask &o) { 00050 key = o.key; 00051 cmd = o.cmd; 00052 for(unsigned int i = 0; i < 64; i++) parms[i] = NULL; 00053 resultcode = o.resultcode; 00054 starttime = o.starttime; 00055 endtime = o.endtime; 00056 finished = o.finished; 00057 fd[0] = 0; fd[1] = 0; fd[2] = 0; 00058 this->stdout = o.stdout; 00059 this->loggerinst = o.loggerinst; 00060 } 00061 00062 ~dmTask(); 00063 int key; 00064 00065 std::string cmd; 00066 const char *parms[64]; 00067 00068 int resultcode; 00069 00070 time_t starttime, endtime; 00071 bool finished; 00072 00073 int fd[3]; 00074 pid_t pid; 00075 std::string stdout; 00076 00077 /// Split che command string into the single parms 00078 void splitCmd(); 00079 00080 /// Wait until the task has finished or the timeout is expired 00081 int waitFinished(int tmout=5); 00082 00083 void notifyAll() { 00084 condvar.notify_all(); 00085 } 00086 00087 dmTaskExec *loggerinst; 00088 }; 00089 00090 00091 /// Allows to spawn commands, useful for checksum calculations or file pulling 00092 /// The spawned commands are pollable, i.e. in a given moment it's possible to 00093 /// know the list of commands that are still running. 00094 /// Objects belonging to this class in general are created in the disk nodes, 00095 /// e.g. for running checksums or file copies and pulls 00096 class dmTaskExec: public boost::recursive_mutex { 00097 00098 public: 00099 dmTaskExec(); 00100 ~dmTaskExec(); 00101 std::string instance; 00102 /// Executes a command. Returns a positive integer as a key to reference 00103 /// the execution status and the result 00104 /// The mechanics is that a detached thread is started. This guy invokes popen3 00105 /// and blocks waiting for the process to end. Upon end it updates the corresponding 00106 /// instance of dmTask with the result and the stdout 00107 int submitCmd(std::string cmd); 00108 00109 00110 /// Executes a command. Returns a positive integer as a key to reference 00111 // the execution status and the result 00112 // The mechanics is that a detached thread is started. This guy invokes popen3 00113 // and blocks waiting for the process to end. Upon end it updates the corresponding 00114 // instance of dmTask with the result and the stdout 00115 // -1 is returned in case of error in the submission 00116 int submitCmd(std::vector<std::string> &args); 00117 00118 /// Actually starts the thread corresponding to a command that was just submitted 00119 /// Avoids race conditions 00120 void goCmd(int id); 00121 00122 /// Split che command string into the single parms 00123 void assignCmd(dmTask *task, std::vector<std::string> &args); 00124 00125 /// Get the results of a task. 00126 /// Wait at max tmout seconds until the task finishes 00127 /// Return 0 if the task has finished and there is a result 00128 /// Return nonzero if the task is still running 00129 int waitResult(int taskID, int tmout=5); 00130 00131 //kill a specific task given the id 00132 int killTask(int taskID); 00133 00134 //get a dmTask given the id ( mainly for testing) 00135 dmTask* getTask(int taskID); 00136 00137 //get the current stdout of a task which may be running 00138 int getTaskStdout(int taskID, std::string &stdout); 00139 00140 /// Loops over all the tasks and: 00141 /// - send a notification to the head node about all the processes that are running or that have finished 00142 /// - garbage collect the task list. 00143 /// - Task that are finished since long (e.g. 1 hour) 00144 /// - Tasks that are stuck (e.g. 1 day) 00145 void tick(); 00146 00147 /// Event invoked internally to log stuff 00148 virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0; 00149 /// Event invoked internally to log stuff 00150 virtual void onErrLoggingRequest(std::string const & msg) = 0; 00151 00152 protected: 00153 00154 /// event for immediate notifications when a task finishes 00155 /// Subclasses can specialize this and apply app-dependent behavior to 00156 /// perform actions when something has finished running 00157 /// NOTE the signature. This passes copies of Task objects, not the originals 00158 virtual void onTaskCompleted(dmTask &task); 00159 00160 // event that notifies that a task is running 00161 // This event can be invoked multiple times during the life of a task 00162 /// NOTE the signature. This passes copies of Task objects, not the originals 00163 virtual void onTaskRunning(dmTask &task); 00164 00165 00166 private: 00167 00168 int popen3(int fd[3], pid_t *pid, const char ** argv ); 00169 00170 /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number 00171 int taskcnt; 00172 /// This map works like a sparse array :-) 00173 std::map<int, dmTask*> tasks; 00174 00175 00176 /// Here we invoke popen3 00177 /// and block waiting for the process to end. Upon end it updates the corresponding 00178 /// instance of dmTask with the result and the stdout 00179 virtual void run(dmTask &task); 00180 00181 friend void taskfunc(dmTaskExec *, int); 00182 00183 //kill a specific task 00184 int killTask(dmTask *task); 00185 }; 00186 00187 00188 00189 }