Coverage for tcprocd/server.py: 97.50%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

68 statements  

1"""tcprocd server.""" 

2from __future__ import unicode_literals, print_function, absolute_import 

3from tcprocd.user import User 

4from tcprocd.version import __version__, __release__ 

5import json 

6import os 

7import logging 

8import threading 

9try: 

10 import socketserver 

11except ImportError: 

12 import SocketServer as socketserver 

13 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class ThreadedServerDaemonMixin(socketserver.ThreadingMixIn): 

19 """ 

20 tcprocd server. 

21 

22 :param server_address: 

23 :param RequestHandlerClass: 

24 :param config_file: 

25 :param bind_and_activate: 

26 """ 

27 

28 is_unix_domain = False 

29 

30 #: daemonize threads 

31 daemon_threads = True 

32 

33 def __init__(self, server_address, RequestHandlerClass, config_file, bind_and_activate=True): 

34 """Initialize tcp server.""" 

35 self.version = __version__ 

36 self.release = __release__ 

37 

38 self.handlers = [] 

39 self.runners = {} 

40 

41 # config 

42 self.config_file = config_file 

43 

44 self.user = None 

45 self.group = None 

46 self.users = [] 

47 self.read_config() 

48 

49 socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) 

50 

51 def process_request(self, request, client_address): 

52 """Start a new thread to process the request with an appropriate name.""" 

53 if self.is_unix_domain: 

54 name = client_address 

55 else: 

56 name = '{}:{}'.format(*client_address) 

57 t = threading.Thread(target=self.process_request_thread, 

58 args=(request, client_address), 

59 name=name) 

60 t.daemon = self.daemon_threads 

61 t.start() 

62 

63 def on_runner_exit(self, name): 

64 """Remove runner instance for ``name``.""" 

65 try: 

66 del self.runners[name] 

67 except KeyError: 

68 logger.warning('exited silently') 

69 else: 

70 logger.info('exit') 

71 

72 def get_user(self, username): 

73 """ 

74 Find and return the user object for the given username. 

75 

76 Return ``None`` if no user with the given username exists. 

77 

78 :param username: :class:`str` - The username to find and return the user for. 

79 :return: :class:`User` | ``None`` - The user object, if found. 

80 """ 

81 for user in self.users: 

82 if user.username == username: 

83 return user 

84 

85 def read_config(self): 

86 """ 

87 Read and parse the configuration file. 

88 

89 :raise: :class:`ConfigurationError` 

90 """ 

91 if not os.path.exists(self.config_file): 

92 return 

93 

94 with open(self.config_file) as fp: 

95 config = json.load(fp) 

96 

97 self.user = config.get('user', None) 

98 self.group = config.get('group', None) 

99 

100 try: 

101 users = config['users'] 

102 except KeyError: 

103 self.users = [] 

104 else: 

105 self.users[:] = [User(**data) for data in users] 

106 

107 def write_config(self): 

108 """Write configuration file.""" 

109 config = { 

110 "users": [u.as_dict() for u in self.users] 

111 } 

112 

113 with open(self.config_file, 'w') as fp: 

114 json.dump(config, fp, indent=4) 

115 

116 

117class ConfigurationError(Exception): 

118 """An exception raised when parsing the configuration fails.""" 

119 

120 pass 

121 

122 

123class TCPServer(ThreadedServerDaemonMixin, socketserver.TCPServer): 

124 """tcprocd tcp server.""" 

125 

126 allow_reuse_address = True 

127 

128 

129class UnixDomainServer(ThreadedServerDaemonMixin, socketserver.UnixStreamServer): 

130 """tcprocd unix server.""" 

131 

132 is_unix_domain = True 

133 

134 def server_bind(self): 

135 """Override server_bind to set socket options.""" 

136 socketserver.UnixStreamServer.server_bind(self) 

137 os.chmod(self.server_address, 0o777)