Coverage for src/hods/node.py: 100.00%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1"""hods - home directory synchronization. 

2 

3Copyright (C) 2016-2020 Mathias Stelzer <knoppo@rolln.de> 

4 

5hods is free software: you can redistribute it and/or modify 

6it under the terms of the GNU General Public License as published by 

7the Free Software Foundation, either version 3 of the License, or 

8(at your option) any later version. 

9 

10hods is distributed in the hope that it will be useful, 

11but WITHOUT ANY WARRANTY; without even the implied warranty of 

12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13GNU General Public License for more details. 

14 

15You should have received a copy of the GNU General Public License 

16along with this program. If not, see <http://www.gnu.org/licenses/>. 

17""" 

18import datetime 

19import logging 

20import os 

21 

22from hods.path import Directory, File, Path 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class NodeChildExists(Exception): 

28 """Exception raised when trying to add a child that already exists.""" 

29 

30 def __init__(self, message, existing_node, new_node=None): 

31 """Initialize exception. 

32 

33 :param message: `str` - Error message 

34 :param existing_node: `PathNode` - The existing path node instance 

35 :param new_node: `PathNode` - The new path node instance 

36 """ 

37 self.existing_node = existing_node 

38 self.new_node = new_node 

39 

40 message = '{} A child with the basename "{}" already exists in the node "{}".'.format( 

41 message, existing_node.basename, repr(existing_node.parent)) 

42 super().__init__(message) 

43 

44 

45class NoRootNode(Exception): 

46 """Exception raised when trying to add a node to a parent without root node.""" 

47 

48 pass 

49 

50 

51class Node: 

52 """Mixin class for all nodes.""" 

53 

54 @classmethod 

55 def from_data(cls, *args, **data): 

56 """Create and return an object from the given stored data.""" 

57 return cls(*args, **data) 

58 

59 def as_dict(self): 

60 """Return a config `dict`.""" 

61 return {'name': self.basename} 

62 

63 @property 

64 def name(self): 

65 """An alias for `basename`.""" 

66 return self.basename 

67 

68 @name.setter 

69 def name(self, value): 

70 self.basename = value 

71 

72 def __str__(self): 

73 """The basename as string representation.""" 

74 return str(self.name) 

75 

76 

77class ChildNodeMixin: 

78 """Mixin class for all nodes except the root node.""" 

79 

80 def __init__(self, parent, basename, attach=True): 

81 """Initialize root path node. 

82 

83 Create all required parent nodes if any. 

84 

85 :param path: `str` - Absolute or relative path 

86 :param parent: `PathNode` - Parent directory node. Pass ``None`` to make a root node. 

87 :param attach: `bool` - attach the file to the parent (Default: ``True``) 

88 """ 

89 if os.sep in basename: 

90 raise AttributeError('{}: basename argument must NOT be a path!'.format(self.__class__.__name__)) 

91 path = parent.join(basename) 

92 super().__init__(path) 

93 

94 self.parent = parent 

95 self.parents = list(self._collect_parents()) 

96 

97 if not self.parents: 

98 raise NoRootNode('{} received parent {} which has no root node!'.format(self.__class__.__name__, parent)) 

99 

100 self.root = self.parents[-1] 

101 

102 if attach: 

103 self.attach() 

104 

105 def _collect_parents(self): 

106 """Return a list of all parent nodes.""" 

107 parent = self.parent 

108 while not parent.is_root: 

109 yield parent 

110 parent = parent.parent 

111 yield parent 

112 

113 def delete(self): 

114 """Detach the node from the tree and delete it.""" 

115 super().delete() 

116 self.detach() 

117 

118 def attach(self): 

119 """Append this node to its parent.""" 

120 existing = self.parent.get_child(self.basename) 

121 if existing is not None: 

122 raise NodeChildExists('Failed to attach {}.'.format(repr(self)), existing) 

123 self.parent.children.append(self) 

124 

125 def attach_branch(self): 

126 """Append this node and its parents to the tree.""" 

127 if not self.parent.is_root: 

128 try: 

129 self.parent.attach_branch() 

130 except NodeChildExists: 

131 pass 

132 self.attach() 

133 

134 def detach(self): 

135 """Remove this node from the tree.""" 

136 if self in self.parent.children: 

137 self.parent.children.remove(self) 

138 

139 def is_attached(self): 

140 """Return whether this node is attached to its parent.""" 

141 return self in self.parent.children 

142 

143 def is_branch_attached(self): 

144 """Return whether this node is attached to the root node.""" 

145 if not self.parent.is_root and not self.parent.is_branch_attached(): 

146 return False 

147 return self.is_attached() 

148 

149 

150class FileNode(ChildNodeMixin, Node, File): 

151 """A file node in a tree.""" 

152 

153 pass 

154 

155 

156class NotInTree(Exception): 

157 """Raised when a child-/sub-path is expected but not given.""" 

158 

159 pass 

160 

161 

162def validate_child_path(path, parent_path): 

163 """Check given path is a child of given parent and make it relative.""" 

164 path = Path._get_path(path) 

165 if not os.path.isabs(path): 

166 return path 

167 if path.startswith(parent_path): 

168 return os.path.relpath(path, parent_path) 

169 raise NotInTree('"{}" is not a child of {}'.format(path, parent_path)) 

170 

171 

172_UNSET = object() 

173 

174 

175class BaseDirectoryNode(Node, Directory): 

176 """Base class for directory nodes and the root node.""" 

177 

178 #: class to build directory children 

179 child_directory_class = _UNSET 

180 

181 #: class to build file children 

182 child_file_class = _UNSET 

183 

184 #: sort children alphabetically 

185 sort_children = True 

186 

187 #: as_dict key for children 

188 children_key = 'children' 

189 

190 #: detach children that dont exist on disk 

191 #: set `False` for representation classes (objects may not exist disk) 

192 clear_on_scan = True 

193 

194 def __init__(self, path): 

195 """Initialize the path node. 

196 

197 :param path: `str` - Absolute path for the node. 

198 """ 

199 super().__init__(path) 

200 self.children = [] 

201 self.scanned = None 

202 

203 def scan_aged(self, delta=None): 

204 """Check if the last scan has aged the given delta, 1 minute by default.""" 

205 if self.scanned is None: 

206 return True 

207 if delta is None: 

208 delta = datetime.timedelta(minutes=1) 

209 return self.scanned + delta < datetime.datetime.now() 

210 

211 def _filter_children(self, cls=None): 

212 for c in self.children: 

213 if cls is None or isinstance(c, cls): 

214 yield c 

215 

216 @property 

217 def files(self): 

218 """List all file children.""" 

219 return self._filter_children(self.get_child_file_class()) 

220 

221 @property 

222 def directories(self): 

223 """List all directory children.""" 

224 return self._filter_children(self.get_child_directory_class()) 

225 

226 @property 

227 def is_root(self): 

228 """Overwrite and return a boolean whether this is a root node.""" 

229 return False 

230 

231 def _sort_key(self, node): 

232 """Key function to sort children.""" 

233 cls = self.get_child_file_class() 

234 if cls is None: 

235 return node.basename 

236 return not isinstance(node, cls), node.basename 

237 

238 def sort(self): 

239 """Order the children by type and basename.""" 

240 self.children.sort(key=self._sort_key) 

241 

242 def collect_children(self, exclude_dirs=False): 

243 """Recursively collect and yield all children. 

244 

245 Yields: next child or sibling in tree 

246 """ 

247 for child in self.children: 

248 if isinstance(child, FileNode): 

249 yield child 

250 else: 

251 if not exclude_dirs: 

252 yield child 

253 yield from child.collect_children(exclude_dirs=exclude_dirs) 

254 

255 def get_child_directory_class(self): 

256 """Return the child directory class.""" 

257 if self.child_directory_class is _UNSET: 

258 return DirectoryNode 

259 return self.child_directory_class 

260 

261 def get_child_file_class(self): 

262 """Return the child file class.""" 

263 if self.child_file_class is _UNSET: 

264 return FileNode 

265 return self.child_file_class 

266 

267 def load_child_class(self, data): 

268 """Return the child class for the given stored dict.""" 

269 dir_cls = self.get_child_directory_class() 

270 if dir_cls.children_key in data: 

271 return dir_cls 

272 return self.get_child_file_class() 

273 

274 @classmethod 

275 def from_data(cls, *args, **data): 

276 """Create and return an object from the given stored data.""" 

277 children_data = data.pop(cls.children_key, []) 

278 

279 obj = cls(*args, **data) 

280 obj.load_children(children_data) 

281 

282 return obj 

283 

284 def load_children(self, children_data): 

285 """Parse the given child data dict.""" 

286 for child_data in children_data: 

287 child_class = self.load_child_class(child_data) 

288 child_class.from_data(self, **child_data) 

289 

290 def get_children_dicts(self): 

291 """Return a list of children as dictionaries.""" 

292 return [s.as_dict() for s in self.children] 

293 

294 def as_dict(self, include_children=True): 

295 """Return a config `dict`.""" 

296 data = super().as_dict() 

297 if include_children: 

298 children = list(self.get_children_dicts()) 

299 else: 

300 children = [] 

301 data[self.children_key] = children 

302 return data 

303 

304 # 

305 # build tree 

306 # 

307 

308 def get_child(self, basename): 

309 """Find and return the child with the given basename.""" 

310 for child in self.children: 

311 if child.basename == basename: 

312 return child 

313 

314 def _scan_basename_for_child_class(self, basename): 

315 """Return the class to build the child with given basename.""" 

316 path = self.join(basename) 

317 if not os.path.islink(path) and not os.path.exists(path): 

318 msg = '{} not found. _scan_basename_for_child_class requires given basename to exist!' 

319 raise FileNotFoundError(msg.format(path)) 

320 if os.path.isdir(path): 

321 return self.get_child_directory_class() 

322 return self.get_child_file_class() 

323 

324 def get_child_args(self, basename): 

325 """Return args to create a child instance for the given basename.""" 

326 return self, basename 

327 

328 def scan(self, clear=False, recursive=False): 

329 """Scan for children on disk and synchronize them. 

330 

331 Args: 

332 clear: detach children that dont exist on disk 

333 recursive: call this method on all child directories afterwards 

334 

335 """ 

336 basenames = self.listdir(dirs_only=self.get_child_file_class() is None) 

337 

338 if self.clear_on_scan or clear: 

339 for child in self.children: 

340 if child.basename not in basenames: 

341 child.detach() 

342 

343 for basename in basenames: 

344 child = self.get_child(basename) 

345 if child is None: 

346 cls = self._scan_basename_for_child_class(basename) 

347 args = self.get_child_args(basename) 

348 cls(*args) # The child appends itself to its parents children 

349 

350 if self.sort_children: 

351 self.sort() 

352 

353 if recursive: 

354 for child in self.directories: 

355 child.scan(clear=clear, recursive=True) 

356 

357 now = datetime.datetime.now() 

358 self.scanned = now 

359 

360 def _create_child(self, basename, is_dir, node_cls, cls): 

361 if is_dir: # create parent node 

362 child_class = node_cls 

363 if child_class is None: 

364 child_class = self.get_child_directory_class() 

365 else: # create leaf node 

366 child_class = cls 

367 if child_class is None: 

368 child_class = self.get_child_file_class() 

369 elif child_class == 'dir': 

370 child_class = self.get_child_directory_class() 

371 

372 args = self.get_child_args(basename) 

373 return child_class(*args) 

374 

375 def validate_child_path(self, path): 

376 """Check given path is a child of this node and makeit relative.""" 

377 return validate_child_path(path, self.path) 

378 

379 def find(self, path, create=False, node_cls=None, cls=None): 

380 """Find a file or directory node for the given path. 

381 

382 :param path: Path inside this tree to find a node for. 

383 :param create: Create the node and its parents if necessary. 

384 :param node_cls: Class to create parent nodes. 

385 :param cls: Class to create the leaf node. Use the special value 'dir' if you don't care 

386 about the actual class but want to create a directory node instead of a file node. 

387 :return: The found child node attached to this tree. 

388 """ 

389 path = self.validate_child_path(path) 

390 

391 # if the final path is this node return it 

392 if os.path.abspath(self.join(path)) == self.path: 

393 return self 

394 

395 parts = path.split(os.sep) 

396 basename = parts.pop(0) 

397 

398 child = self.get_child(basename) 

399 if child is None: 

400 if not create: 

401 return 

402 child = self._create_child(basename, parts, node_cls, cls) 

403 

404 if not parts: 

405 return child 

406 

407 path = os.path.join(*parts) 

408 return child.find(path, create=create, node_cls=node_cls, cls=cls) 

409 

410 

411class RootDirectoryNode(BaseDirectoryNode): 

412 """The root node for a file tree.""" 

413 

414 @property 

415 def is_root(self): 

416 """Return True to mark this as root node.""" 

417 return True 

418 

419 

420class DirectoryNode(ChildNodeMixin, BaseDirectoryNode): 

421 """A directory node in a file tree.""" 

422 

423 pass