graffyworkenv/home/bin/cssh

526 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
import logging
from typing import Optional, List, Dict
import argparse
import re
import subprocess
import abc
import atexit
from pathlib import Path
import os
class RgbColor():
red: int
green: int
blue: int
def __init__(self, red: int, green: int, blue: int):
self.red = red
self.green = green
self.blue = blue
def __str__(self):
return f'[{self.red}, {self.green}, {self.blue}]'
UserId = str # eg 'root'
HostId = str # eg alambix50.ipr.univ-rennes.fr or alambix50
TermProfileName = str # terminal profile name eg 'alambix50.ipr.univ-rennes.fr'
TermProfileUuid = str # eg '2e0dec33-5ec2-4355-b671-7922010cee9f'
class SshTarget():
user_id: Optional[UserId]
host_id: HostId
# eg root@alambix50.ipr.univ-rennes.fr
def __init__(self, ssh_target_as_str: str):
parts = ssh_target_as_str.split('@')
assert len(parts) > 0 and len(parts) <= 2, "malformed ssh target : {ssh_target}. It is expected to be of the form [<user_id>@]<host_id>"
self.host_id = parts[-1] # eg 'alambix50.ipr.univ-rennes.fr'
self.user_id = None
if len(parts) > 1:
self.user_id = parts[0]
assert re.match(r'^[a-z]+$', self.user_id), f'malformed user id : {self.user_id}'
def __str__(self) -> str:
target_as_str = f'{self.host_id}'
if self.user_id:
target_as_str = f'{self.user_id}@{target_as_str}'
return target_as_str
TerminalLabel: str # label used as the title and the color profile of the terminal
class ITerminalLauncher(abc.ABC):
@abc.abstractmethod
def launch_terminal(self, terminal_label: TerminalLabel, bg_color: RgbColor, command: List[str]):
'''
terminal_label: eg 'alambix50.ipr.univ-rennes.fr'
command: the command to execute in the terminal eg ['/usr/bin/ssh', 'root@alambix50.ipr.univ-rennes.fr']
'''
OsxWindowId = str
class DarwinTerminalLauncher(ITerminalLauncher):
'''
macos terminal launcher
'''
THIS_OSX_TERM_WINDOW_ID = None
SOLARIZED_BACKGROUND = RgbColor( 0 / 256, 8256 / 256, 10368 / 256) # '{0, 8256, 10368}' #002b36 (solarized dark base03)
SAILOR_BLUE = RgbColor( 8832 / 256, 14208 / 256, 18816 / 256) # '{8832, 14208, 18816}' # 2e4a62
PRUNE = RgbColor(15360 / 256, 10944 / 256, 14592 / 256) # '{15360, 10944, 14592}' #50394c
TAUPE = RgbColor(19968 / 256, 18816 / 256, 16512 / 256) # '{19968, 18816, 16512}' #686256
DARK_ORANGE = RgbColor(27456 / 256, 12864 / 256, 0 / 256) # '{27456, 12864, 0}' #8f4300
DARK_OLIVE = RgbColor( 8832 / 256, 11904 / 256, 5568 / 256) # '{8832, 11904, 5568}' #2e3e1d
@staticmethod
def get_current_terminal_window_id() -> OsxWindowId:
# local terminal_windows_id=''
# terminal_windows_id=$(osascript -e "tell application \"Terminal\" to get id of window 1")
# echo "$terminal_windows_id"
osacommand = 'tell application "Terminal" to get id of window 1'
command = ['osascript', '-e', osacommand]
proc = subprocess.run(command, check=True, stdout=subprocess.PIPE)
lines = proc.stdout.decode('utf8').split('\n')
assert len(lines) == 2
window_id = lines[0]
return window_id
@staticmethod
def color_as_osx_str(color: RgbColor) -> str:
'''
SOLARIZED_BACKGROUND = '{0, 8256, 10368}'
'''
return '{%d, %d, %d}' % (color.red * 256, color.green * 256, color.blue * 256)
@staticmethod
def set_bg(window_id: OsxWindowId, color: RgbColor):
# window_id
# color: eg {45000, 0, 0, 50000}
osacommand = f'tell application "Terminal" to set background color of (every window whose id is {window_id}) to {DarwinTerminalLauncher.color_as_osx_str()}'
command = ['osascript', '-e', osacommand]
subprocess.run(command, check=True)
@staticmethod
def on_exit():
DarwinTerminalLauncher.set_bg(DarwinTerminalLauncher.THIS_OSX_TERM_WINDOW_ID, DarwinTerminalLauncher.SOLARIZED_BACKGROUND)
def launch_terminal(self, terminal_label: TerminalLabel, bg_color: RgbColor, command: List[str]):
'''
command: the command to execute in the terminal eg ['/usr/bin/ssh', f'{str(ssh_target)}']
'''
raise NotImplementedError()
DarwinTerminalLauncher.THIS_OSX_TERM_WINDOW_ID = DarwinTerminalLauncher.get_current_terminal_window_id()
# echo "THIS_OSX_TERM_WINDOW_ID=$THIS_OSX_TERM_WINDOW_ID"
DarwinTerminalLauncher.set_bg(THIS_OSX_TERM_WINDOW_ID, bg_color)
# make cssh restore the default terminal color when exiting
atexit.register(DarwinTerminalLauncher.on_exit)
# /usr/bin/ssh "$@"
subprocess.run(command, check=True)
def is_valid_uuid(uuid: str) -> bool:
'''
uuid: eg 'b1dcc9dd-5262-4d8d-a863-c897e6d979b9'
'''
m = re.match(r'^[0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f]-[0-9a-f][0-9a-f][0-9a-f][0-9a-f]-[0-9a-f][0-9a-f][0-9a-f][0-9a-f]-[0-9a-f][0-9a-f][0-9a-f][0-9a-f]-[0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f][0-9a-f]$', uuid)
return m is not None
GSettingsKeyPath = str # eg org.gnome.Terminal.ProfilesList
class GnomeSettings():
def get_list(self, key_id: GSettingsKeyPath) -> List[str]:
# 20260616-17:34:08 graffy@graffy-ws2:~$ gsettings get org.gnome.Terminal.ProfilesList list
# ['b1dcc9dd-5262-4d8d-a863-c897e6d979b9', '2e0dec33-5ec2-4355-b671-7922010cee9f']
proc = subprocess.run(['gsettings', 'get', key_id, 'list'], check=True, stdout=subprocess.PIPE)
lines = proc.stdout.decode('utf8').split('\n')
values = []
assert len(lines) == 2
m = re.match(r'^\[(?P<series_of_strings>[^]]*)\]$', lines[0])
assert m is not None
series_of_strings = m['series_of_strings']
logging.debug('get_list: series_of_strings = %s', str(series_of_strings))
strings = series_of_strings.split(', ')
for s in strings:
m = re.match(r"'(?P<unquoted_string>[^']*)'", s)
assert m is not None
values.append(m['unquoted_string'])
logging.debug('get_list: values = %s', str(values))
return values
def set_list(self, key_id: GSettingsKeyPath, value: List[str]):
# gsettings set org.gnome.Terminal.ProfilesList list "$(echo "$CURRENT_LIST" | tr -d "[]" | sed "s/$/,$PROFILE_ID/")"
value_as_str = ','.join([f"'{unq_str}'" for unq_str in value])
logging.debug('value_as_str = "%s"', value_as_str)
subprocess.run(['gsettings', 'set', key_id, 'list', f"[{value_as_str}]"], check=True)
def register_profile_uuid(self, profile_uuid: TermProfileUuid):
'''
profile_uuid: eg '2e0dec33-5ec2-4355-b671-7922010cee9f'
'''
assert is_valid_uuid(profile_uuid), f'invalid uuid: "{profile_uuid}"'
profiles_key_id = 'org.gnome.Terminal.ProfilesList'
profile_uuids = self.get_list(profiles_key_id)
assert profile_uuid not in profile_uuids
profile_uuids.append(profile_uuid)
self.set_list(profiles_key_id, profile_uuids) # side effect: this adds the dconf key "/org/gnome/terminal/legacy/profiles:/list"
class GnomeTerminal():
DCONF_PROFILES_PATH = '/org/gnome/terminal/legacy/profiles:'
# DCONF_PROFILES_PATH='/org/gnome/terminal/legacy/profiles:'
@staticmethod
def create_new_profile(profile_name: TermProfileName) -> TermProfileUuid:
'''
copied from https://askubuntu.com/questions/270469/how-can-i-create-a-new-profile-for-gnome-terminal-via- command-line
'''
# 20260616-16:24:12 graffy@graffy-ws2:~$ dconf list '/org/gnome/terminal/legacy/profiles:/'
# :2a05665f-baa7-484e-b179-4e94f3814238/
# :2e0dec33-5ec2-4355-b671-7922010cee9f/
# last command status : [0]
old_profile_uuids = GnomeTerminal.get_term_profiles()
# local profile_ids=($(dconf list $DCONF_PROFILES_PATH/ | grep ^: |\
# sed 's/\///g' | sed 's/://g'))
# local profile_name="$1"
# local profile_ids_old="$(dconf read "$DCONF_PROFILES_PATH"/list | tr -d "]")"
proc = subprocess.run(['uuidgen'], check=True, stdout=subprocess.PIPE)
lines = proc.stdout.decode('utf8').split('\n')
assert len(lines) == 2
profile_uuid = lines[0]
m = re.match(r'^[a-z0-9\-]+$', profile_uuid)
logging.info("creating profile profile_uuid=%s with profile_name=%s", profile_uuid, profile_name)
# echo "new_profiles_list=$new_profiles_list"
# dconf write $DCONF_PROFILES_PATH/list "$new_profiles_list"
proc = subprocess.run(['dconf', 'write', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/visible-name", f"'{profile_name}'"], check=True)
# dconf write /org/gnome/terminal/legacy/profiles:/:dc932a58-6a38-4511-876a-d453b5e33a26/visible-name "'toto'"
if True:
# check that the profile was successfully created
assert GnomeTerminal.get_term_profile_name(profile_uuid) == profile_name
existing_profile_uuid = GnomeTerminal.get_terminal_profile_uuid(profile_name)
assert existing_profile_uuid is not None, f'failed to find the profile for name {profile_name}, it is expected to be {profile_uuid}'
assert existing_profile_uuid == profile_uuid
gsettings = GnomeSettings()
gsettings.register_profile_uuid(profile_uuid)
return profile_uuid
@staticmethod
def get_term_profiles() -> List[TermProfileUuid]:
proc = subprocess.run(['dconf', 'list', f"{GnomeTerminal.DCONF_PROFILES_PATH}/"], check=True, stdout=subprocess.PIPE)
profile_uuids = []
for line in proc.stdout.decode('utf8').split('\n'):
# eg ':b1dcc9dd-5262-4d8d-a863-c897e6d979b9/'
m = re.match(r'^:(?P<uuid>[0-9a-f\-]+)/$', line)
if m is None:
assert line in ['list', ''], f'unexpected format for line: {line}'
else:
profile_uuids.append(m['uuid'])
return profile_uuids
@staticmethod
def get_term_profile_name(profile_uuid: TermProfileUuid) -> Optional[TermProfileName]:
profile_name = None
proc = subprocess.run(['dconf', 'read', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/visible-name"], check=True, stdout=subprocess.PIPE)
lines = proc.stdout.decode('utf8').split('\n')
if (len(lines) >= 2):
assert len(lines) == 2, lines
assert lines[1] == ''
m = re.match(r"^\'(?P<profile_name>[^']+)'$", lines[0])
assert m, f'unexpected line : {line}'
profile_name = m['profile_name']
else:
logging.debug('name lines for profile %s : %s', profile_uuid, str(lines))
pass # (the default profile has no name)
#assert False, f'unexpected case: the terminal profile profile_uuid={profile_uuid} has no name!!!'
return profile_name
@staticmethod
def get_terminal_profile_uuid(profile_name: TermProfileName) -> TermProfileUuid:
'''
returns the id of the profile which has the given name.
if the named profile doesn't exist, it creates it first
local profile_name="$1" # eg physix.ipr.univ-rennes1.fr
'''
profile_uuid = None
for profile_uuid in GnomeTerminal.get_term_profiles():
logging.debug(f'processing profile_uuid={profile_uuid}')
prof_name = GnomeTerminal.get_term_profile_name(profile_uuid)
logging.debug('profile_uuid = %s, prof_name = %s', profile_uuid, prof_name)
if profile_name == prof_name:
return profile_uuid
logging.info('the profile named %s doesnt exist... create it then', profile_name)
GnomeTerminal.check_validity()
profile_uuid = GnomeTerminal.create_new_profile(profile_name)
GnomeTerminal.check_validity()
return profile_uuid
@staticmethod
def set_terminal_profile_bg_color(profile_name: TermProfileName, bg_color: RgbColor):
# function set_terminal_profile_bg_color()
# profile_name # eg physix.ipr.univ-rennes1.fr
# bg_color # eg RgbColor(17, 25, 24)
logging.debug('setting color for profile %s : %s', profile_name, str(bg_color))
profile_uuid = GnomeTerminal.get_terminal_profile_uuid(profile_name)
logging.debug("profile_name=%s profile_uuid=%s", profile_name, profile_uuid)
fg_color = RgbColor(255, 255, 255)
subprocess.run(['dconf', 'write', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/use-theme-colors", 'false'], check=True, stdout=subprocess.PIPE)
subprocess.run(['dconf', 'write', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/background-color", f"'rgb({bg_color.red}, {bg_color.green}, {bg_color.blue})'"], check=True, stdout=subprocess.PIPE)
subprocess.run(['dconf', 'write', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/foreground-color", f"'rgb({fg_color.red}, {fg_color.green}, {fg_color.blue})'"], check=True, stdout=subprocess.PIPE)
@staticmethod
def delete_term_profile(profile_uuid: TermProfileUuid):
# dconf reset -f /org/gnome/terminal/legacy/profiles:/2a05665f-baa7-484e-b179-4e94f3814238
logging.warning('deleting terminal profile %s', profile_uuid)
subprocess.run(['dconf', 'reset', '-f', f"{GnomeTerminal.DCONF_PROFILES_PATH}/:{profile_uuid}/"], check=True)
@staticmethod
def delete_all_term_profiles():
for term_profile_uuid in GnomeTerminal.get_term_profiles():
GnomeTerminal.delete_term_profile(term_profile_uuid)
subprocess.run(['dconf', 'reset', '-f', f"{GnomeTerminal.DCONF_PROFILES_PATH}/list"], check=True)
@staticmethod
def check_validity():
proc = subprocess.run(['dconf', 'list', f"{GnomeTerminal.DCONF_PROFILES_PATH}/"], check=True, stdout=subprocess.PIPE)
profile_uuids = []
for line in proc.stdout.decode('utf8').split('\n'):
m = re.match(r'^:(?P<uuid>[0-9a-f\-]+)/$', line)
if m:
uuid = m['uuid']
assert is_valid_uuid(uuid)
else:
assert line in ['list', ''], f'unexpected key: {line}'
class GnomeTerminalLauncher(ITerminalLauncher):
def launch_terminal(self, terminal_label: TerminalLabel, bg_color: RgbColor, command: List[str]):
'''
command: the command to execute in the terminal eg ['/usr/bin/ssh', f'{str(ssh_target)}']
'''
if False: # deleting the term profiles affects the already open terminals, that then switch to default colors
# delete all profiles to delete the broken ones and the duplicates
GnomeTerminal.delete_all_term_profiles()
GnomeTerminal.set_terminal_profile_bg_color(terminal_label, bg_color)
profile_uuid = GnomeTerminal.get_terminal_profile_uuid(terminal_label)
logging.info('using terminal profile %s (uuid=%s)', terminal_label, profile_uuid)
launch_term_command = ['gnome-terminal', f'--window-with-profile={profile_uuid}', f'--title={terminal_label}', '--'] + command
logging.debug('launch_term_command = %s', launch_term_command)
subprocess.run(launch_term_command, check=True)
def is_valid_host_fqdn(host_fqdn: str) -> bool:
m = re.match(r'^[a-z0-9.\-]+$', host_fqdn)
return m is not None
def get_host_real_fqdn(host_id: HostId) -> HostId:
# 20260617-11:47:29 graffy@graffy-ws2:~/work/graffyworkenv.git$ host alambix.ipr.univ-rennes.fr
# alambix.ipr.univ-rennes.fr is an alias for alambix-frontal.ipr.univ-rennes.fr.
# alambix-frontal.ipr.univ-rennes.fr has address 129.20.27.230
# last command status : [0]
command = "host %s | tail -1 | awk '{print $1}'" % host_id
proc = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE)
lines = proc.stdout.decode('utf8').split('\n')
host_real_fqdn = lines[0]
assert is_valid_host_fqdn(host_real_fqdn), f'invalid host name: {host_real_fqdn}'
return host_real_fqdn
class SshHostConfig():
'''
Host flique-macpro6.tunnel
hostname flique-macpro6.ipr.univ-rennes1.fr
user francois
ProxyCommand ssh graffy@tunnel1.ipr.univ-rennes1.fr nc %h %p 2> /dev/null
Host physix-frontal.ipr.univ-rennes1.fr
hostname physix-frontal.ipr.univ-rennes1.fr
user graffy
IdentityFile ~/.ssh/homekey
Host gpgpu
hostname physix58.ipr.univ-rennes1.fr
ProxyCommand ssh -i ~/.ssh/homekey graffy@physix.ipr.univ-rennes1.fr nc %h %p 2> /dev/null
IdentityFile ~/.ssh/homekey
Host gpu-p100
hostname physix90.ipr.univ-rennes1.fr
ProxyCommand ssh -i ~/.ssh/homekey graffy@physix.ipr.univ-rennes1.fr nc %h %p 2> /dev/null
IdentityFile ~/.ssh/homekey
Host cloud.tunnel
hostname cloud.ipr.univ-rennes.fr
ProxyCommand ssh graffy@tunnel1.ipr.univ-rennes.fr nc %h %p 2> /dev/null
Host adminpod-1.tunnel
hostname adminpod-1.ipr.univ-rennes.fr
ProxyCommand ssh graffy@tunnel1.ipr.univ-rennes1.fr nc %h %p 2> /dev/null
Host mazinger.tunnel
hostname mazinger.ipr.univ-rennes.fr
ProxyCommand ssh graffy@tunnel1.ipr.univ-rennes1.fr nc %h %p 2> /dev/null
# git.ipr.univ-rennes.fr
Host vmipr-p1 git
hostname vmipr-p1.univ-rennes.fr
User graffy
IdentityFile ~/.ssh/id_ed25519
'''
names: List[SshHostConfigId] # eg ['vmipr-p1', 'git']
attrs: Dict[SshHostConfigAttr, str] # eg {'hostname': 'vmipr-p1.univ-rennes.fr', 'User': 'graffy', 'IdentityFile': '~/.ssh/id_ed25519'}
def __init__(self):
self.names = []
self.attrs = {}
class SshConfig():
host_configs = List[SshHostConfig]
def __init__(self):
self.host_configs = []
def get_host_name(self, host_config_id) -> Optional[str]:
host_name = None
for host_conf in self.host_configs:
if host_config_id in host_conf.names:
host_name = host_conf.attrs['hostname']
break
return host_name
def parse_ssh_config(ssh_config_file_path) -> SshConfig:
ssh_config = SshConfig()
with open(ssh_config_file_path, 'rt', encoding='utf8') as f:
for line in f.readlines():
logging.debug('line = %s', line)
# skip comments
if re.match(r'^\s*#.*$', line):
continue
match = re.match(r'^\s*(?P<attr_name>[^\s]+)\s+(?P<attr_value>.*)$', line)
if match:
# Host vmipr-p1 git
# hostname vmipr-p1.univ-rennes.fr
# User graffy
# IdentityFile ~/.ssh/id_ed25519
attr_name = match['attr_name'].lower()
attr_value = match['attr_value']
logging.debug('attr_name = %s', attr_name)
if attr_name == 'host':
host_config_ids = attr_value.split(' ')
ssh_config.host_configs.append(SshHostConfig())
ssh_config.host_configs[-1].names = host_config_ids
else:
ssh_config.host_configs[-1].attrs[attr_name] = attr_value
continue
return ssh_config
def main():
logging.basicConfig(level=logging.INFO)
arg_parser = argparse.ArgumentParser('cssh (colored ssh) opens a terminal window with a background color that depends on the fully qualified domain name of the target machine')
arg_parser.add_argument('ssh_target', help='eg root@alambix50.ipr.univ-rennes.fr')
args = arg_parser.parse_args()
logging.debug(args)
ssh_target = SshTarget(args.ssh_target) # eg 'root@alambix50.ipr.univ-rennes.fr'
ssh_config_file_path: Path = Path(os.getenv('HOME')) / '.ssh' / 'config'
host_real_name = None
if ssh_config_file_path.exists():
ssh_config = parse_ssh_config(ssh_config_file_path)
hn = ssh_config.get_host_name(ssh_target.host_id)
if hn:
host_real_name = hn
if host_real_name is None:
host_real_name = get_host_real_fqdn(ssh_target.host_id)
logging.debug('host_real_name = %s', host_real_name)
logging.debug('ssh_target.host_id = %s', ssh_target.host_id)
# AS_ROOT='false'
# if [ "$(echo $@ | grep '^root@' > /dev/null)" ]
# then
# AS_ROOT='true'
# fi
# if [ $(echo $HOST_FQDN | grep '^simpatix') ]
# then
# BG_COLOR="$SAILOR_BLUE"
# elif [ $(echo $HOST_FQDN | grep '^physix') ]
# then
# BG_COLOR="$TAUPE"
# else
# case $HOST_FQDN in
# 'pr079234.spm.univ-rennes1.fr')
# BG_COLOR="$SOLARIZED_BACKGROUND"
# ;;
# 'puppet3.ipr.univ-rennes1.fr')
# BG_COLOR="$PRUNE"
# ;;
# *)
# BG_COLOR="$DARK_OLIVE"
# ;;
# esac
# fi
#if [ "$AS_ROOT" = 'true' ]
#then
# BG_COLOR="$DARK_ORANGE"
#fi
color_saturation = 0.3
color_value = 0.2
proc = subprocess.run(['uname'], check=True, stdout=subprocess.PIPE)
os_name = proc.stdout.decode('utf8').strip()
terminal_launcher: Optional[ITerminalLauncher] = None
if os_name == 'Darwin':
terminal_launcher = DarwinTerminalLauncher()
elif os_name == 'Linux':
terminal_launcher = GnomeTerminalLauncher()
else:
raise NotImplementedError(f'unhandled os : "{os_name}"')
proc = subprocess.run(['make_color.py', host_real_name, str(color_value), str(color_saturation), 'linux'], check=True, stdout=subprocess.PIPE)
bg_color_as_str = proc.stdout.decode('utf8').strip()
match = re.match(r'\((?P<red>[0-9]+), (?P<green>[0-9]+), (?P<blue>[0-9]+)\)', bg_color_as_str)
assert match, f'unexpected color as string: {bg_color_as_str}'
bg_color = RgbColor(match['red'], match['green'], match['blue'])
logging.debug('bg_color = %s', str(bg_color)) # eg (35, 43, 51)
command = ['/usr/bin/ssh', f'{str(ssh_target)}']
terminal_launcher.launch_terminal(terminal_label=ssh_target.host_id, bg_color=bg_color, command=command)
main()