Coverage for tcprocd/runner.py: 98.25%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1"""tcprocd runner.""" 

2from __future__ import unicode_literals, print_function, absolute_import 

3import grp 

4import logging 

5import multiprocessing 

6import os 

7import pwd 

8import select 

9import shlex 

10import subprocess 

11import sys 

12import threading 

13 

14 

15if sys.version_info[0] == 2: 15 ↛ 16line 15 didn't jump to line 16, because the condition on line 15 was never true

16 FileNotFoundError = OSError 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22class Runner(threading.Thread): 

23 """ 

24 A class to run a process in a thread. 

25 

26 :param name: name of the process to access it 

27 :param command: command to run the process 

28 :param owner: the user who starts this process 

29 :param on_exit: a function that will be called when the process exits 

30 :param path: Path to execeute the command in 

31 :param user: System user to switch to before starting the process 

32 :param group: System group to switch to before starting the process 

33 """ 

34 

35 #: Make the thread a daemon 

36 daemon = True 

37 

38 def __init__(self, name, command, owner, on_exit, path=None, as_user=None, as_group=None): 

39 """Initialize runner.""" 

40 self.owner = owner 

41 self.process_name = name 

42 self.command = command 

43 self._on_exit = on_exit 

44 self.path = path 

45 

46 self.proc = None 

47 self.history = [] 

48 self.stdin_queue = multiprocessing.Queue() 

49 self.attached_handlers = [] 

50 

51 self.env = os.environ.copy() 

52 self.uid = None 

53 self.gid = None 

54 

55 if as_user: 

56 pw_record = pwd.getpwnam(as_user) 

57 self.uid = pw_record.pw_uid 

58 self.gid = pw_record.pw_gid 

59 self.env['HOME'] = pw_record.pw_dir 

60 self.env['LOGNAME'] = pw_record.pw_name 

61 self.env['USER'] = pw_record.pw_name 

62 

63 if as_group: 

64 self.gid = grp.getgrnam(as_group).gr_gid 

65 

66 threading.Thread.__init__(self, name=name) 

67 

68 def on_line(self, line): 

69 """Send the given line to all attached handlers.""" 

70 self.history.append(line) 

71 for handler in self.attached_handlers: 

72 handler.protocol.sendline(line) 

73 

74 def on_exit(self): 

75 """Cleanup and close handlers.""" 

76 self._on_exit(self.name) 

77 for handler in self.attached_handlers: 

78 handler.protocol.sendline('exit') 

79 

80 def write_line(self, line): 

81 """Write ``message`` to process's stdin.""" 

82 self.stdin_queue.put(line) 

83 

84 def demote(self): 

85 """Switch user and group.""" 

86 if self.gid is not None: 

87 os.setgid(self.gid) 

88 if self.uid is not None: 

89 os.setuid(self.uid) 

90 

91 def run(self): 

92 """ 

93 Start the runner. 

94 

95 Execute ``command`` and call ``on_line`` for each line of output until 

96 it terminates. Change the directory to ``path`` and switch user and 

97 group (if they're given) before running the process. 

98 Finally call ``on_exit``. 

99 """ 

100 info = 'starting runner: {}'.format(self.command) 

101 if self.path: 

102 os.chdir(self.path) 

103 self.env['PWD'] = self.path 

104 info += ' in {}'.format(self.path) 

105 

106 logger.info(info) 

107 

108 try: 

109 self.proc = subprocess.Popen(shlex.split(self.command), 

110 stdout=subprocess.PIPE, 

111 stdin=subprocess.PIPE, 

112 stderr=subprocess.STDOUT, 

113 preexec_fn=self.demote, 

114 env=self.env, 

115 cwd=self.path, 

116 close_fds=True) 

117 except FileNotFoundError: 

118 logger.info('{}: Executable not found!'.format(self.name)) 

119 self.on_exit() 

120 return 

121 

122 try: 

123 while self.proc.returncode is None: 

124 r, _, _ = select.select([self.proc.stdout.fileno(), self.stdin_queue._reader], [], []) 

125 

126 for s in r: 

127 

128 if s == self.proc.stdout.fileno(): 

129 line = self.proc.stdout.readline().decode() 

130 self.on_line(line) 

131 

132 else: 

133 line = self.stdin_queue.get() 

134 logger.debug('writing: "{}"'.format(line)) 

135 if not line.endswith('\n'): 

136 line += '\n' 

137 self.proc.stdin.write(line.encode()) 

138 self.proc.stdin.flush() 

139 

140 finally: 

141 self.proc.kill() 

142 self.on_exit() 

143 

144 def kill(self): 

145 """Kill the process if it exists.""" 

146 if self.proc is None: 

147 logger.ERROR('{}: no proc'.format(self.name)) 

148 return 

149 

150 self.proc.kill() 

151 logger.info('{}: killed'.format(self.name))