问题背景:
解决方案:
优化点:
整体逻辑如下:
#!/usr/bin/env python3
import paramiko
from paramiko import SSHException, AuthenticationException
import logging
import time
import re
import hmac
import hashlib
import base64
import urllib.parse
import datetime
import requests
import os
# 设置日志记录配置
logging.basicConfig(filename="ssh_log.log", level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def create_ssh_client(host, port, username, password, timeout=10):
"""创建SSH客户端并连接到远程服务器"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
logging.info(f"Connecting to {host}:{port} as {username}")
ssh.connect(host, port, username, password, timeout=timeout)
logging.info(f"Successfully connected to {host}")
return ssh
except AuthenticationException as auth_error:
logging.error(f"Authentication failed: {auth_error}")
print(f"Authentication failed: {auth_error}")
except SSHException as ssh_error:
logging.error(f"SSH connection failed: {ssh_error}")
print(f"SSH connection failed: {ssh_error}")
except Exception as e:
logging.error(f"An error occurred: {e}")
print(f"An error occurred: {e}")
return None
def close_ssh_connection(ssh):
"""关闭SSH连接"""
if ssh:
ssh.close()
logging.info("SSH connection closed")
def execute_ssh_command(ssh, command):
"""在远程服务器上执行SSH命令"""
try:
start_time = time.time()
logging.info(f"Executing command: {command}")
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode()
error = stderr.read().decode()
if error:
logging.error(f"Command error: {error}")
print("Command Error:\n", error)
return None
end_time = time.time()
logging.info(f"Command executed in {end_time - start_time:.2f} seconds")
return output
except Exception as e:
logging.error(f"An error occurred while executing the command: {e}")
print(f"An error occurred while executing the command: {e}")
return None
def extract_ipv6(output, interface):
"""提取指定接口的IPv6地址"""
ipv6_pattern = re.compile(rf'{interface}.*?\n\s*inet6\s+([\da-fA-F:]+)/\d+\s+(scope global|scope link)', re.S)
matches = ipv6_pattern.findall(output)
if matches:
ipv6_addresses = [match[0] for match in matches]
logging.info(f"IPv6 addresses for {interface}: {ipv6_addresses}")
return ipv6_addresses
else:
logging.warning(f"No IPv6 address found for interface {interface}")
return None
def process_ipv6_addresses(ssh, command, interface_name):
"""执行SSH命令并处理指定接口的IPv6地址"""
output = execute_ssh_command(ssh, command)
if output:
ipv6_addresses = extract_ipv6(output, interface_name)
if ipv6_addresses:
return ipv6_addresses
else:
return None
else:
return None
def generate_dingding_url(secret, base_url):
"""生成带有签名和时间戳的钉钉机器人URL"""
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"{base_url}×tamp={timestamp}&sign={sign}"
def send_dingding_message(url, message):
"""发送钉钉消息"""
payload = {'msgtype': 'text', 'text': {'content': message}}
response = requests.post(url, json=payload)
if response.status_code == 200:
print("消息已成功发送到钉钉机器人")
else:
print("发送消息失败,状态码:", response.status_code)
print("响应内容:", response.text)
def main():
# 主机信息列表
hosts = [
{"host": "192.168.1.1", "note": "server1", "port": 22, "username": os.getenv("SSH_USERNAME"),
"password": os.getenv("SSH_PASSWORD"), "interface": "br-lan"},
{"host": "192.168.1.2", "note": "server2", "port": 22, "username": os.getenv("SSH_USERNAME"),
"password": os.getenv("SSH_PASSWORD"), "interface": "ens192"},
{"host": "192.168.1.3", "note": "server3", "port": 22, "username": os.getenv("SSH_USERNAME"),
"password": os.getenv("SSH_PASSWORD"), "interface": "ens192"},
{"host": "192.168.1.4", "note": "server4", "port": 22, "username": os.getenv("SSH_USERNAME"),
"password": os.getenv("SSH_PASSWORD"), "interface": "ens34"}
]
# 钉钉机器人的API URL和密钥(从环境变量中获取)
dingding_base_url = os.getenv("DINGDING_BASE_URL")
secret = os.getenv("DINGDING_SECRET")
messages = []
for host_info in hosts:
ssh = None
try:
ssh = create_ssh_client(
host_info["host"],
host_info["port"],
host_info["username"],
host_info["password"]
)
if ssh:
ipv6_addresses = process_ipv6_addresses(ssh, "ip addr", host_info["interface"])
# 生成消息内容
new_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ipv6_message = '\n'.join(ipv6_addresses) if ipv6_addresses else '获取失败'
message = (f"时间:{new_time}\n"
f"Host: {host_info['host']}\n"
f"备注: {host_info['note']}\n"
f"IPv4 address: {host_info['host']}\n"
f"IPv6 address: {ipv6_message}")
messages.append(message)
else:
messages.append(f"Host: {host_info['host']}\nIPv4 address: {host_info['host']}\nIPv6 address: 获取失败")
except Exception as e:
logging.error(f"An error occurred while processing host {host_info['host']}: {e}")
messages.append(f"Host: {host_info['host']}\nIPv4 address: {host_info['host']}\nIPv6 address: 获取失败")
finally:
close_ssh_connection(ssh)
# 发送所有钉钉消息
dingding_news = '\n\n'.join(messages)
dingding_url = generate_dingding_url(secret, dingding_base_url)
send_dingding_message(dingding_url, dingding_news)
if __name__ == "__main__":
main()