Coverage for tcprocd/runner.py: 98.25%
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""tcprocd runner."""
2from __future__ import unicode_literals, print_function, absolute_import
3import grp
4import logging
5import multiprocessing
6import os
7import pwd
8import select
9import shlex
10import subprocess
11import sys
12import threading
15if sys.version_info[0] == 2: 15 ↛ 16line 15 didn't jump to line 16, because the condition on line 15 was never true
16 FileNotFoundError = OSError
19logger = logging.getLogger(__name__)
22class Runner(threading.Thread):
23 """
24 A class to run a process in a thread.
26 :param name: name of the process to access it
27 :param command: command to run the process
28 :param owner: the user who starts this process
29 :param on_exit: a function that will be called when the process exits
30 :param path: Path to execeute the command in
31 :param user: System user to switch to before starting the process
32 :param group: System group to switch to before starting the process
33 """
35 #: Make the thread a daemon
36 daemon = True
38 def __init__(self, name, command, owner, on_exit, path=None, as_user=None, as_group=None):
39 """Initialize runner."""
40 self.owner = owner
41 self.process_name = name
42 self.command = command
43 self._on_exit = on_exit
44 self.path = path
46 self.proc = None
47 self.history = []
48 self.stdin_queue = multiprocessing.Queue()
49 self.attached_handlers = []
51 self.env = os.environ.copy()
52 self.uid = None
53 self.gid = None
55 if as_user:
56 pw_record = pwd.getpwnam(as_user)
57 self.uid = pw_record.pw_uid
58 self.gid = pw_record.pw_gid
59 self.env['HOME'] = pw_record.pw_dir
60 self.env['LOGNAME'] = pw_record.pw_name
61 self.env['USER'] = pw_record.pw_name
63 if as_group:
64 self.gid = grp.getgrnam(as_group).gr_gid
66 threading.Thread.__init__(self, name=name)
68 def on_line(self, line):
69 """Send the given line to all attached handlers."""
70 self.history.append(line)
71 for handler in self.attached_handlers:
72 handler.protocol.sendline(line)
74 def on_exit(self):
75 """Cleanup and close handlers."""
76 self._on_exit(self.name)
77 for handler in self.attached_handlers:
78 handler.protocol.sendline('exit')
80 def write_line(self, line):
81 """Write ``message`` to process's stdin."""
82 self.stdin_queue.put(line)
84 def demote(self):
85 """Switch user and group."""
86 if self.gid is not None:
87 os.setgid(self.gid)
88 if self.uid is not None:
89 os.setuid(self.uid)
91 def run(self):
92 """
93 Start the runner.
95 Execute ``command`` and call ``on_line`` for each line of output until
96 it terminates. Change the directory to ``path`` and switch user and
97 group (if they're given) before running the process.
98 Finally call ``on_exit``.
99 """
100 info = 'starting runner: {}'.format(self.command)
101 if self.path:
102 os.chdir(self.path)
103 self.env['PWD'] = self.path
104 info += ' in {}'.format(self.path)
106 logger.info(info)
108 try:
109 self.proc = subprocess.Popen(shlex.split(self.command),
110 stdout=subprocess.PIPE,
111 stdin=subprocess.PIPE,
112 stderr=subprocess.STDOUT,
113 preexec_fn=self.demote,
114 env=self.env,
115 cwd=self.path,
116 close_fds=True)
117 except FileNotFoundError:
118 logger.info('{}: Executable not found!'.format(self.name))
119 self.on_exit()
120 return
122 try:
123 while self.proc.returncode is None:
124 r, _, _ = select.select([self.proc.stdout.fileno(), self.stdin_queue._reader], [], [])
126 for s in r:
128 if s == self.proc.stdout.fileno():
129 line = self.proc.stdout.readline().decode()
130 self.on_line(line)
132 else:
133 line = self.stdin_queue.get()
134 logger.debug('writing: "{}"'.format(line))
135 if not line.endswith('\n'):
136 line += '\n'
137 self.proc.stdin.write(line.encode())
138 self.proc.stdin.flush()
140 finally:
141 self.proc.kill()
142 self.on_exit()
144 def kill(self):
145 """Kill the process if it exists."""
146 if self.proc is None:
147 logger.ERROR('{}: no proc'.format(self.name))
148 return
150 self.proc.kill()
151 logger.info('{}: killed'.format(self.name))