Coverage for src/hods/node.py: 100.00%
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""hods - home directory synchronization.
3Copyright (C) 2016-2020 Mathias Stelzer <knoppo@rolln.de>
5hods is free software: you can redistribute it and/or modify
6it under the terms of the GNU General Public License as published by
7the Free Software Foundation, either version 3 of the License, or
8(at your option) any later version.
10hods is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
15You should have received a copy of the GNU General Public License
16along with this program. If not, see <http://www.gnu.org/licenses/>.
17"""
18import datetime
19import logging
20import os
22from hods.path import Directory, File, Path
24logger = logging.getLogger(__name__)
27class NodeChildExists(Exception):
28 """Exception raised when trying to add a child that already exists."""
30 def __init__(self, message, existing_node, new_node=None):
31 """Initialize exception.
33 :param message: `str` - Error message
34 :param existing_node: `PathNode` - The existing path node instance
35 :param new_node: `PathNode` - The new path node instance
36 """
37 self.existing_node = existing_node
38 self.new_node = new_node
40 message = '{} A child with the basename "{}" already exists in the node "{}".'.format(
41 message, existing_node.basename, repr(existing_node.parent))
42 super().__init__(message)
45class NoRootNode(Exception):
46 """Exception raised when trying to add a node to a parent without root node."""
48 pass
51class Node:
52 """Mixin class for all nodes."""
54 @classmethod
55 def from_data(cls, *args, **data):
56 """Create and return an object from the given stored data."""
57 return cls(*args, **data)
59 def as_dict(self):
60 """Return a config `dict`."""
61 return {'name': self.basename}
63 @property
64 def name(self):
65 """An alias for `basename`."""
66 return self.basename
68 @name.setter
69 def name(self, value):
70 self.basename = value
72 def __str__(self):
73 """The basename as string representation."""
74 return str(self.name)
77class ChildNodeMixin:
78 """Mixin class for all nodes except the root node."""
80 def __init__(self, parent, basename, attach=True):
81 """Initialize root path node.
83 Create all required parent nodes if any.
85 :param path: `str` - Absolute or relative path
86 :param parent: `PathNode` - Parent directory node. Pass ``None`` to make a root node.
87 :param attach: `bool` - attach the file to the parent (Default: ``True``)
88 """
89 if os.sep in basename:
90 raise AttributeError('{}: basename argument must NOT be a path!'.format(self.__class__.__name__))
91 path = parent.join(basename)
92 super().__init__(path)
94 self.parent = parent
95 self.parents = list(self._collect_parents())
97 if not self.parents:
98 raise NoRootNode('{} received parent {} which has no root node!'.format(self.__class__.__name__, parent))
100 self.root = self.parents[-1]
102 if attach:
103 self.attach()
105 def _collect_parents(self):
106 """Return a list of all parent nodes."""
107 parent = self.parent
108 while not parent.is_root:
109 yield parent
110 parent = parent.parent
111 yield parent
113 def delete(self):
114 """Detach the node from the tree and delete it."""
115 super().delete()
116 self.detach()
118 def attach(self):
119 """Append this node to its parent."""
120 existing = self.parent.get_child(self.basename)
121 if existing is not None:
122 raise NodeChildExists('Failed to attach {}.'.format(repr(self)), existing)
123 self.parent.children.append(self)
125 def attach_branch(self):
126 """Append this node and its parents to the tree."""
127 if not self.parent.is_root:
128 try:
129 self.parent.attach_branch()
130 except NodeChildExists:
131 pass
132 self.attach()
134 def detach(self):
135 """Remove this node from the tree."""
136 if self in self.parent.children:
137 self.parent.children.remove(self)
139 def is_attached(self):
140 """Return whether this node is attached to its parent."""
141 return self in self.parent.children
143 def is_branch_attached(self):
144 """Return whether this node is attached to the root node."""
145 if not self.parent.is_root and not self.parent.is_branch_attached():
146 return False
147 return self.is_attached()
150class FileNode(ChildNodeMixin, Node, File):
151 """A file node in a tree."""
153 pass
156class NotInTree(Exception):
157 """Raised when a child-/sub-path is expected but not given."""
159 pass
162def validate_child_path(path, parent_path):
163 """Check given path is a child of given parent and make it relative."""
164 path = Path._get_path(path)
165 if not os.path.isabs(path):
166 return path
167 if path.startswith(parent_path):
168 return os.path.relpath(path, parent_path)
169 raise NotInTree('"{}" is not a child of {}'.format(path, parent_path))
172_UNSET = object()
175class BaseDirectoryNode(Node, Directory):
176 """Base class for directory nodes and the root node."""
178 #: class to build directory children
179 child_directory_class = _UNSET
181 #: class to build file children
182 child_file_class = _UNSET
184 #: sort children alphabetically
185 sort_children = True
187 #: as_dict key for children
188 children_key = 'children'
190 #: detach children that dont exist on disk
191 #: set `False` for representation classes (objects may not exist disk)
192 clear_on_scan = True
194 def __init__(self, path):
195 """Initialize the path node.
197 :param path: `str` - Absolute path for the node.
198 """
199 super().__init__(path)
200 self.children = []
201 self.scanned = None
203 def scan_aged(self, delta=None):
204 """Check if the last scan has aged the given delta, 1 minute by default."""
205 if self.scanned is None:
206 return True
207 if delta is None:
208 delta = datetime.timedelta(minutes=1)
209 return self.scanned + delta < datetime.datetime.now()
211 def _filter_children(self, cls=None):
212 for c in self.children:
213 if cls is None or isinstance(c, cls):
214 yield c
216 @property
217 def files(self):
218 """List all file children."""
219 return self._filter_children(self.get_child_file_class())
221 @property
222 def directories(self):
223 """List all directory children."""
224 return self._filter_children(self.get_child_directory_class())
226 @property
227 def is_root(self):
228 """Overwrite and return a boolean whether this is a root node."""
229 return False
231 def _sort_key(self, node):
232 """Key function to sort children."""
233 cls = self.get_child_file_class()
234 if cls is None:
235 return node.basename
236 return not isinstance(node, cls), node.basename
238 def sort(self):
239 """Order the children by type and basename."""
240 self.children.sort(key=self._sort_key)
242 def collect_children(self, exclude_dirs=False):
243 """Recursively collect and yield all children.
245 Yields: next child or sibling in tree
246 """
247 for child in self.children:
248 if isinstance(child, FileNode):
249 yield child
250 else:
251 if not exclude_dirs:
252 yield child
253 yield from child.collect_children(exclude_dirs=exclude_dirs)
255 def get_child_directory_class(self):
256 """Return the child directory class."""
257 if self.child_directory_class is _UNSET:
258 return DirectoryNode
259 return self.child_directory_class
261 def get_child_file_class(self):
262 """Return the child file class."""
263 if self.child_file_class is _UNSET:
264 return FileNode
265 return self.child_file_class
267 def load_child_class(self, data):
268 """Return the child class for the given stored dict."""
269 dir_cls = self.get_child_directory_class()
270 if dir_cls.children_key in data:
271 return dir_cls
272 return self.get_child_file_class()
274 @classmethod
275 def from_data(cls, *args, **data):
276 """Create and return an object from the given stored data."""
277 children_data = data.pop(cls.children_key, [])
279 obj = cls(*args, **data)
280 obj.load_children(children_data)
282 return obj
284 def load_children(self, children_data):
285 """Parse the given child data dict."""
286 for child_data in children_data:
287 child_class = self.load_child_class(child_data)
288 child_class.from_data(self, **child_data)
290 def get_children_dicts(self):
291 """Return a list of children as dictionaries."""
292 return [s.as_dict() for s in self.children]
294 def as_dict(self, include_children=True):
295 """Return a config `dict`."""
296 data = super().as_dict()
297 if include_children:
298 children = list(self.get_children_dicts())
299 else:
300 children = []
301 data[self.children_key] = children
302 return data
304 #
305 # build tree
306 #
308 def get_child(self, basename):
309 """Find and return the child with the given basename."""
310 for child in self.children:
311 if child.basename == basename:
312 return child
314 def _scan_basename_for_child_class(self, basename):
315 """Return the class to build the child with given basename."""
316 path = self.join(basename)
317 if not os.path.islink(path) and not os.path.exists(path):
318 msg = '{} not found. _scan_basename_for_child_class requires given basename to exist!'
319 raise FileNotFoundError(msg.format(path))
320 if os.path.isdir(path):
321 return self.get_child_directory_class()
322 return self.get_child_file_class()
324 def get_child_args(self, basename):
325 """Return args to create a child instance for the given basename."""
326 return self, basename
328 def scan(self, clear=False, recursive=False):
329 """Scan for children on disk and synchronize them.
331 Args:
332 clear: detach children that dont exist on disk
333 recursive: call this method on all child directories afterwards
335 """
336 basenames = self.listdir(dirs_only=self.get_child_file_class() is None)
338 if self.clear_on_scan or clear:
339 for child in self.children:
340 if child.basename not in basenames:
341 child.detach()
343 for basename in basenames:
344 child = self.get_child(basename)
345 if child is None:
346 cls = self._scan_basename_for_child_class(basename)
347 args = self.get_child_args(basename)
348 cls(*args) # The child appends itself to its parents children
350 if self.sort_children:
351 self.sort()
353 if recursive:
354 for child in self.directories:
355 child.scan(clear=clear, recursive=True)
357 now = datetime.datetime.now()
358 self.scanned = now
360 def _create_child(self, basename, is_dir, node_cls, cls):
361 if is_dir: # create parent node
362 child_class = node_cls
363 if child_class is None:
364 child_class = self.get_child_directory_class()
365 else: # create leaf node
366 child_class = cls
367 if child_class is None:
368 child_class = self.get_child_file_class()
369 elif child_class == 'dir':
370 child_class = self.get_child_directory_class()
372 args = self.get_child_args(basename)
373 return child_class(*args)
375 def validate_child_path(self, path):
376 """Check given path is a child of this node and makeit relative."""
377 return validate_child_path(path, self.path)
379 def find(self, path, create=False, node_cls=None, cls=None):
380 """Find a file or directory node for the given path.
382 :param path: Path inside this tree to find a node for.
383 :param create: Create the node and its parents if necessary.
384 :param node_cls: Class to create parent nodes.
385 :param cls: Class to create the leaf node. Use the special value 'dir' if you don't care
386 about the actual class but want to create a directory node instead of a file node.
387 :return: The found child node attached to this tree.
388 """
389 path = self.validate_child_path(path)
391 # if the final path is this node return it
392 if os.path.abspath(self.join(path)) == self.path:
393 return self
395 parts = path.split(os.sep)
396 basename = parts.pop(0)
398 child = self.get_child(basename)
399 if child is None:
400 if not create:
401 return
402 child = self._create_child(basename, parts, node_cls, cls)
404 if not parts:
405 return child
407 path = os.path.join(*parts)
408 return child.find(path, create=create, node_cls=node_cls, cls=cls)
411class RootDirectoryNode(BaseDirectoryNode):
412 """The root node for a file tree."""
414 @property
415 def is_root(self):
416 """Return True to mark this as root node."""
417 return True
420class DirectoryNode(ChildNodeMixin, BaseDirectoryNode):
421 """A directory node in a file tree."""
423 pass