Coverage for src/hods/tui/__main__.py: 100.00%
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""hods - home directory synchronization.
3Copyright (C) 2016-2020 Mathias Stelzer <knoppo@rolln.de>
5hods is free software: you can redistribute it and/or modify
6it under the terms of the GNU General Public License as published by
7the Free Software Foundation, either version 3 of the License, or
8(at your option) any later version.
10hods is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
15You should have received a copy of the GNU General Public License
16along with this program. If not, see <http://www.gnu.org/licenses/>.
17"""
18import logging
19import os
20import subprocess
21import sys
22import tempfile
23from logging.handlers import RotatingFileHandler
25import urwid
27from hods.config.base import Config, Settings
28from hods.config.sources import NoChanges
29from hods.config.template import (
30 TemplateEngineNotInstalled,
31 TemplateRenderError,
32 UnknownTemplateEngine,
33)
34from hods.delta import SameDestinationError
35from hods.tui.base.app import FocusKeyFrame, ViewApp
36from hods.tui.base.edit import ChoiceWindow, EditWindow, ListBoxWindow
37from hods.tui.base.menu import MenuColumns, MenuItem, PullDownMenu
38from hods.tui.base.view import ErrorWindow, ProcessErrorWindow, SuccessWindow, WarningWindow
39from hods.tui.base.widgets import FocusMap
40from hods.tui.config.columns import ConfigColumnsView
41from hods.tui.config.edit import FeatureAddWindow, FeatureInstallWindow, RenderErrorWindow, SourceFileEditWindow
42from hods.tui.config.settings import SettingsWindow
43from hods.tui.delta import DeltaWindow
44from hods.tui.help import AboutWindow, ControlsWindow
45from hods.tui.palette import PALETTE
46from hods.utils import ProcessError, SSHAgent, pw_home, run, which
48logger = logging.getLogger(__name__)
51class WizardWindow(ListBoxWindow):
52 """A window displayed when starting hods for the first time.
54 It contains buttons/actions to get hods running.
55 """
57 def __init__(self, app, **kwargs):
58 """Initialize the window with choices.
60 :param app: `hods.app.App` instance
61 :param kwargs: All keyword arguments are passe to parent class `ChoiceWindow`
62 """
63 super().__init__(app, title='Wizard', **kwargs)
65 def get_items(self):
66 """Collect widgets to display."""
67 yield urwid.Text('No ~/.hods/ directory found!')
68 yield urwid.Divider()
69 yield urwid.Text('Already have a server?')
70 yield FocusMap(urwid.Button('Pull existing configuration', on_press=self.on_press_pull_existing), 'button')
71 yield urwid.Divider()
72 yield urwid.Text('New to hods? Start by adding a feature and a source.')
73 yield urwid.Text('Then add some files by pressing "a" on any directory.')
74 yield FocusMap(urwid.Button('Start new configuration', on_press=self.on_press_start_new), 'feature')
75 yield urwid.Divider()
76 yield urwid.Divider()
77 yield FocusMap(urwid.Button('Abort', on_press=self.on_press_abort), 'error')
79 def on_press_abort(self, btn, user_data=None):
80 """Called when 'Abort' is pressed."""
81 self.close()
82 self.app.init_ssh_agent()
84 def on_press_start_new(self, btn, user_data=None):
85 """Called when 'add first feature' is pressed."""
86 def on_save(data):
87 FeatureAddWindow(self.app).show()
88 SettingsWindow(self.app, on_save=on_save).show()
90 def on_press_pull_existing(self, btn=None, user_data=None):
91 """Step 1, called when 'pull and update' is pressed."""
92 # on first call we have a button
93 # -> ssh-agent is not initialized yet
94 # sub-sequent calls come from an pull_config error
95 # -> ssh-agent is already initialized, skip it
96 on_save = btn and self.init_ssh_agent or self.pull
97 SettingsWindow(self.app, on_save=on_save).show()
99 def init_ssh_agent(self, user_data=None):
100 """Step 2, initialize ssh-agent."""
101 self.app.init_ssh_agent(callback=self.pull)
103 def pull(self, data=None):
104 """Step 3, pull configuration from the server."""
105 if self.app.settings.is_master:
106 ErrorWindow(self.app, 'Server is required to pull configuration!',
107 on_close=self.on_press_pull_existing).show()
108 return
109 self.app.pull(on_error=self.on_press_pull_existing)
112class NewVariablesWindow(EditWindow):
113 """A window to edit a list of variables."""
115 def __init__(self, app, **kwargs):
116 """Initialize window."""
117 super().__init__(app, title='New Variables', **kwargs)
118 variables = app.config.new_variables
119 self.key_width = max(map(len, variables.keys()))
120 self.defaults = variables
121 self.edits = {key: urwid.Edit(wrap=urwid.CLIP) for key in variables.keys()}
123 def get_items(self):
124 """Collect widgets to display."""
125 yield urwid.Text(
126 'New variables were found in your configuration! '
127 'Adjust the values for this instance below or in the settings before you update.')
128 yield urwid.Columns([
129 (self.key_width, urwid.Text('Name:')),
130 urwid.Text('Default:'),
131 urwid.Text('Value:'),
132 ], dividechars=1)
133 for key, edit in self.edits.items():
134 yield urwid.Columns([
135 (self.key_width, urwid.Text(key)),
136 urwid.Text(self.defaults[key]),
137 FocusMap(edit, 'edit'),
138 ], dividechars=1)
140 def clean(self):
141 """Collect widget values."""
142 for key, edit in self.edits.items():
143 yield key, edit.edit_text.strip()
145 def save(self, data):
146 """Store local variables values."""
147 self.app.settings.variables.update(data)
150class App(ViewApp):
151 """Text-based User Interface for hods."""
153 def __init__(self, action=None, enable_unicode=None):
154 """Initialize interface.
156 :param enable_unicode: `bool` or ``None`` - Enable fancy utf8 characters. Pass ``None``
157 to autodetect. (Default: ``None``)
158 """
159 self.action = action
160 self.settings = Settings(load=False)
161 logger.debug('hods init TUI %s', self.settings.version)
163 self.config = Config(self.settings)
164 self.agent = None
166 defer_multiplier = float(os.getenv('HODS_DEFER_MULTIPLIER', 1))
168 super().__init__(defer_multiplier=defer_multiplier, enable_unicode=enable_unicode)
170 self.config_columns = ConfigColumnsView(self)
171 self.menu = MenuColumns(self, [
172 MenuItem('File', [
173 MenuItem('Settings', self.show_settings),
174 MenuItem('Wizard', self.show_wizard),
175 MenuItem('Reload', lambda: self.refresh(load_config=True), key='r'),
176 MenuItem('Exit', self.end, key='q'),
177 ], key='F2'),
178 MenuItem('Actions', [
179 MenuItem('Add files', self.config_columns.show_add_files, key='a'),
180 MenuItem('Pull', self.pull, key='F4'),
181 MenuItem('Update', self.update, key='F5'),
182 MenuItem('Push', self.push, key='F9'),
183 ]),
184 MenuItem('Help', [
185 MenuItem('Controls', self.show_controls, key='?'),
186 MenuItem('About', self.show_about),
187 ]),
188 ])
189 self.frame = FocusKeyFrame(
190 body=self.placeholder,
191 header=urwid.Padding(urwid.Columns([
192 self.menu,
193 urwid.Text('hods ' + self.settings.version, align=urwid.RIGHT),
194 ]), left=1, right=1),
195 header_focus_keys=('f2',),
196 )
198 def get_widget(self):
199 """Return the main widget to display."""
200 return urwid.AttrMap(self.frame, 'frame')
202 def get_palette(self):
203 """Return the main color attribute palette."""
204 return PALETTE
206 def refresh(self, load_settings=False, load_config=False):
207 """Refresh the displayed widgets.
209 :param load_settings: `bool` - Read the local configuration
210 file before refreshing. (Default: False)
211 :param load_config: `bool` - Read the global configuration
212 file before refreshing. (Default: False)
213 """
214 if load_settings:
215 self.settings.load()
216 if load_config:
217 self.config.load()
218 self.config.tree.scan(recursive=True)
219 return super().refresh()
221 def run(self):
222 """Initialize and run the application."""
223 try:
224 return super().run()
225 finally:
226 if self.agent is not None:
227 self.agent.kill()
229 def init(self):
230 """Read configuration files and initialize application."""
231 super().init()
233 if self.defer_multiplier < 1:
234 ErrorWindow(self, 'Environment variable "HODS_DEFER_MULTIPLIER" must be greater than or equal 1',
235 title='Invalid value', on_close=self.end, defer_close=False).show()
236 return
238 self.config_columns.show()
240 if which('rsync') is None:
241 ErrorWindow(self, 'rsync is required!', title='Missing Dependency',
242 on_close=self.end).show()
243 return
245 self.refresh(load_settings=True, load_config=True)
247 settings_exist = self.settings.exists()
249 if settings_exist:
250 self.init_ssh_agent()
252 if self.action == 'pull':
253 self.pull(on_success=self.end, on_error=self.end_error)
254 elif self.action == 'push':
255 self.push(on_success=self.end, on_error=self.end_error)
256 elif not settings_exist:
257 self.show_wizard()
259 def show_wizard(self):
260 """Open the wizard window."""
261 WizardWindow(self).show()
263 def init_ssh_agent(self, callback=None):
264 """Initialize ssh-agent."""
265 mode = self.settings.ssh_agent_mode
266 warning = 'ssh-agent is not running or missing a key!'
268 if mode not in ('warn', 'ask', 'start', 'ignore'):
269 logger.warning('Invalid ssh_agent_mode "%s" has been reset to "ignore"!', mode)
270 self.settings.ssh_agent_mode = mode = 'ignore'
272 def done():
273 if callable(callback):
274 callback()
276 if mode == 'ignore':
277 return done()
279 agent = SSHAgent()
281 if agent.is_running() and agent.has():
282 return done()
284 if mode == 'warn':
285 WarningWindow(self, warning + '\nYou may be asked to enter your password a few times!',
286 on_close=done).show()
287 return
289 if mode == 'ask':
290 choices = [
291 ('yes', 'Yes', 'success'),
292 ('no', 'No', 'warning'),
293 ]
295 def on_select(answer):
296 if answer == 'yes':
297 self.agent = self._add_ssh_key(agent)
298 ChoiceWindow(
299 self,
300 choices,
301 title='SSH Agent',
302 text=warning + ' Start it and add your key now?',
303 on_select=on_select,
304 on_close=done,
305 ).show()
306 return
308 self.agent = self._add_ssh_key(agent)
309 done()
311 def _add_ssh_key(self, agent):
312 with self.pause_loop():
313 print('Starting ssh-agent ...')
314 agent.start()
315 agent.add(key=self.settings.ssh_agent_key)
316 return agent
318 def open_editor(self, path, parent=None):
319 """Open given path in external editor."""
320 editor = os.getenv('EDITOR', None)
321 if not editor:
322 msg = 'Environment variable "$EDITOR" is unset or empty!'
323 ErrorWindow(self, msg, title='No Editor', parent=parent).show()
324 return
326 with self.pause_loop():
327 return run(editor, path, capture_output=False)
329 def open_in_editor(self, text, suffix=None, ignore_error=False, parent=None):
330 """Write given text to temporary file and open it in exterenal editor."""
331 file = tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='UTF-8', suffix=suffix)
332 try:
333 with file:
334 file.write(text)
336 try:
337 if self.open_editor(file.name, parent=parent) is None:
338 return
339 except subprocess.CalledProcessError as e:
340 if not ignore_error:
341 ProcessErrorWindow(self, e, 'No changes were made!', name='editor', parent=parent).show()
342 return
344 with open(file.name, encoding='UTF-8') as f2:
345 return f2.read().strip()
346 finally:
347 os.unlink(file.name)
349 def save(self):
350 """Write all configuration files."""
351 self.config.save()
352 self.settings.save()
354 def unhandled_input(self, key):
355 """If no widget has an action for the given key, this is called.
357 :param key: `str` - The pressed key
358 """
359 if key in ('f1', '?'):
360 self.show_controls()
361 return
362 if key in ('r', 'R'):
363 self.refresh(load_settings=True, load_config=True)
364 return
365 if key == 'f4':
366 self.pull()
367 return
368 if key == 'f5':
369 self.update()
370 return
371 if key == 'f9':
372 self.push()
373 return
375 # close menu pulldowns on click
376 if isinstance(key, tuple) and key[0] == 'mouse press':
377 if isinstance(self.view, PullDownMenu):
378 self.view.close(focus_body=True)
379 return
381 return super().unhandled_input(key)
383 def show_controls(self):
384 """Show the help window."""
385 ControlsWindow(self).show()
387 def show_about(self):
388 """Show the help window."""
389 AboutWindow(self).show()
391 def show_settings(self, **kwargs):
392 """Show the settings window."""
393 SettingsWindow(self, **kwargs).show()
395 def update(self):
396 """Show the home delta window to display changes."""
397 try:
398 DeltaWindow(self).show()
399 except UnknownTemplateEngine as e:
400 msg = ('Failed to render {}\nUnknown template engine: "{}"!\n'
401 'A third-party package is required to support other template engines!')
402 ErrorWindow(self, msg.format(e.sourcefile.relative_config_path, e.engine_id)).show()
403 except TemplateEngineNotInstalled as e:
404 msg = 'Failed to render {}\nTemplate engine "{}" is not installed!'.format(
405 e.sourcefile.relative_config_path, e.engine_class.label)
406 ErrorWindow(self, msg).show()
407 except TemplateRenderError as e:
408 edit = SourceFileEditWindow(self, e.sourcefile, on_close=self.update)
409 RenderErrorWindow(self, e, on_close=edit.show).show()
410 except SameDestinationError as e:
411 ex = e # pull into <locals> for usage in on_select
412 choices = (
413 ('first', 'Link "{}" and ignore "{}"'.format(
414 ex.first.relative_config_path,
415 ex.second.relative_config_path,
416 )),
417 ('second', 'Link "{}" and ignore "{}"'.format(
418 ex.second.relative_config_path,
419 ex.first.relative_config_path,
420 )),
421 ('ignore', 'Ignore both'),
422 )
424 def on_select(choice):
425 if choice in ('first', 'ignore'):
426 ex.second.mode = 'ignore'
427 if choice in ('second', 'ignore'):
428 ex.first.mode = 'ignore'
429 self.defer(self.update)
431 ChoiceWindow(self, choices,
432 vertical=True,
433 text=str(ex),
434 on_select=on_select,
435 on_close=self.update,
436 title='Warning',
437 height=10,
438 attr='warning window').show()
440 def cascade_pull(self, on_error=None):
441 """SSH to server and run pull on it."""
442 if self.settings.server_is_proxy and self.settings.cascade:
443 try:
444 with self.pause_loop():
445 self.settings.remote_pull()
446 except subprocess.CalledProcessError as e:
447 ProcessErrorWindow(self, e, 'Failed to pull on the server', on_close=on_error).show()
448 return False
449 return True
451 def cascade_push(self, on_error=None):
452 """SSH to server and run pull on it."""
453 if self.settings.server_is_proxy and self.settings.cascade:
454 try:
455 with self.pause_loop():
456 self.settings.remote_push()
457 except subprocess.CalledProcessError as e:
458 ProcessErrorWindow(self, e, 'Failed to push on the server', on_close=on_error).show()
459 return False
460 return True
462 def pull(self, on_success=None, on_error=None):
463 """Pull configuration file and sources."""
464 logger.info('pull')
466 def _pull_sources():
467 self.pull_sources(on_success=on_success, on_error=on_error)
469 if self.settings.is_master:
470 _pull_sources()
471 return
473 self.pull_config(on_success=_pull_sources, on_error=on_error)
475 def pull_config(self, on_success=None, on_error=None):
476 """Pull the configuration file in a terminal window."""
477 logger.info('pull config')
479 if not self.cascade_pull(on_error=on_error):
480 return
482 try:
483 with self.pause_loop():
484 print('Pull configuration from', self.settings.server)
485 self.config.pull()
486 except subprocess.CalledProcessError as e:
487 if e.returncode == 23:
488 ErrorWindow(self, 'No configuration file at "{}"!'.format(self.config.remote_path),
489 on_close=on_error).show()
490 return
491 message = 'Failed to pull configuration from {}'.format(self.settings.server)
492 ProcessErrorWindow(self, e, message, 'rsync', parent=self.config_columns,
493 on_close=on_error).show()
494 return
496 if list(self.config.new_features):
497 on_success = FeatureInstallWindow(self, on_save=on_success, on_abort=on_error).show
498 if self.config.new_variables:
499 on_success = NewVariablesWindow(self, on_save=on_success, on_abort=on_error).show
500 on_success()
502 def _collect_pull_features(self):
503 for feature in self.config.tree.collect_features_to_sync():
504 feature_sources = list(feature.collect_sources_to_pull())
505 if feature_sources:
506 yield feature, feature_sources
508 def pull_sources(self, on_success=None, on_error=None):
509 """Pull sources of installed features in a terminal window."""
510 logger.info('pull sources')
512 features = list(self._collect_pull_features())
513 if not features:
514 ErrorWindow(self, 'Nothing to pull', on_close=on_error).show()
515 return
517 for feature, sources in features:
518 logger.debug('pull feature %s', feature)
520 for source in sources:
521 name = '{}/{}'.format(source.feature.name, source.name)
522 try:
523 with self.pause_loop():
524 print('Pull', name)
525 source.pull()
526 except ProcessError as e:
527 ErrorWindow(self, 'Failed to pull {}\n{}'.format(name, e), on_close=on_error).show()
528 return
529 except subprocess.CalledProcessError as e:
530 ProcessErrorWindow(self, e, 'Failed to pull {}'.format(name), on_close=on_error).show()
531 return
533 m = 'Successfully pulled the configuration and sources. Run update (F5) to synchronize your home.'
534 SuccessWindow(self, m, on_close=on_success).show()
536 def push(self, on_success=None, on_error=None):
537 """Push the sources and configuration files in a terminal window."""
538 logger.info('push')
540 if not self.settings.is_master:
541 try:
542 with self.pause_loop():
543 print('Push configuration to', self.settings.server)
544 self.config.push()
545 except subprocess.CalledProcessError as e:
546 message = 'Failed to push configuration to {}'.format(self.settings.server)
547 ProcessErrorWindow(self, e, message, on_close=on_error).show()
548 return
550 if not self.push_sources(on_error=on_error):
551 return
553 if not self.cascade_push(on_error=on_error):
554 return
556 SuccessWindow(self, 'Successfully pushed the configuration and sources.', on_close=on_success).show()
558 def _collect_push_features(self):
559 for feature in self.config.tree.collect_features_to_sync():
560 feature_sources = list(feature.collect_sources_to_push())
561 if feature_sources:
562 yield feature, feature_sources
564 def push_sources(self, on_error=None):
565 """Iterate and push sources."""
566 logger.info('push sources')
568 features = list(self._collect_push_features())
569 if not features:
570 ErrorWindow(self, 'Nothing to push', on_close=on_error).show()
571 return
573 for feature, sources in features:
574 logger.debug('push feature %s', feature)
576 try:
577 with self.pause_loop():
578 feature.hooks.push.pre.run()
579 except subprocess.CalledProcessError as e:
580 ProcessErrorWindow(self, e, '{} pre-hook failed!'.format(feature.name),
581 parent=self.config_columns, on_close=on_error).show()
582 return
584 for source in sources:
585 pushed = self._push_source(source, on_error=on_error)
586 if pushed is None:
587 continue
588 if pushed is False:
589 return
591 try:
592 with self.pause_loop():
593 feature.hooks.push.post.run()
594 except subprocess.CalledProcessError as e:
595 ProcessErrorWindow(self, e, '{} post-hook failed!'.format(feature.name), on_close=on_error).show()
596 return
598 return True
600 def _push_source(self, source, on_error=None):
601 name = '{}/{}'.format(source.feature.name, source.name)
602 try:
603 with self.pause_loop():
604 print('Push', name)
605 source.push()
606 except ProcessError as e:
607 ErrorWindow(self, 'Failed to push {}\n{}'.format(name, e), on_close=on_error).show()
608 return False
609 except subprocess.CalledProcessError as e:
610 ProcessErrorWindow(self, e, 'Failed to push {}'.format(name), on_close=on_error).show()
611 return False
612 except NoChanges:
613 logger.info('%s: no changes', name)
614 return
615 return True
618def get_parser():
619 """Build and return the argument parser."""
620 from hods.version import __version__
621 from argparse import ArgumentParser
623 parser = ArgumentParser()
624 parser.add_help = 'hods {} - Home directory synchronization'.format(__version__)
626 parser.add_argument('--unicode', dest='unicode', action='store_true', help='Enable unicode')
627 parser.add_argument('--no-unicode', dest='unicode', action='store_false', help='Disable unicode')
628 parser.set_defaults(unicode=None)
630 log_level_choices = ['debug', 'info', 'warning', 'error']
631 log_level_choices.extend([level.upper() for level in log_level_choices])
632 parser.add_argument('--log-level', choices=log_level_choices, default='warning', help='logging level')
634 parser.add_argument('--pull', dest='action', action='store_const', const='pull', help='Pull and exit')
635 parser.add_argument('--push', dest='action', action='store_const', const='push', help='Push and exit')
636 parser.set_defaults(action=None)
638 return parser
641def get_logging_handler():
642 """Get a rotating file handler or fallback to a streaming handler."""
643 log_path = os.path.join(pw_home(), '.hods', 'log')
644 try:
645 return RotatingFileHandler(log_path, maxBytes=4096)
646 except FileNotFoundError:
647 # log to stderr until config directory exists
648 return logging.StreamHandler()
651def init_logging(log_level):
652 """Initialize root logger."""
653 root = logging.getLogger()
654 handler = get_logging_handler()
655 handler.setLevel('DEBUG')
656 formatter = logging.Formatter('%(asctime)s|%(levelname)-8s| %(message)s')
657 handler.setFormatter(formatter)
658 root.addHandler(handler)
659 root.setLevel(log_level.upper())
662def init_args(argv=None):
663 """Initialize parser and parse arguments."""
664 if argv is None:
665 argv = sys.argv[1:]
667 parser = get_parser()
668 return parser.parse_args(argv)
671def main(argv=None):
672 """Main entry point of the Text-based/Terminal User Interface."""
673 args = init_args(argv)
675 init_logging(args.log_level)
677 app = App(action=args.action, enable_unicode=args.unicode)
679 try:
680 code = app.run()
681 except KeyboardInterrupt:
682 code = 1
684 if code:
685 sys.exit(code)
688if __name__ == '__main__':
689 main()