您的当前位置:首页正文

钉钉推送多台服务器IPV6地址

2024-11-10 来源:个人技术集锦

问题背景:

解决方案:

优化点:

  • 稳定性提升: 确保脚本在获取IPv6地址失败时提供明确的错误提示。
  • 信息格式化: 优化钉钉推送的消息格式,使其更加清晰易读,包括当前时间、IPv4地址和IPv6地址等信息。
  • 自动化处理: 添加异常处理机制,避免脚本因为连接问题而挂起。

        整体逻辑如下:

#!/usr/bin/env python3

import paramiko
from paramiko import SSHException, AuthenticationException
import logging
import time
import re
import hmac
import hashlib
import base64
import urllib.parse
import datetime
import requests
import os

# 设置日志记录配置
logging.basicConfig(filename="ssh_log.log", level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')


def create_ssh_client(host, port, username, password, timeout=10):
    """创建SSH客户端并连接到远程服务器"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        logging.info(f"Connecting to {host}:{port} as {username}")
        ssh.connect(host, port, username, password, timeout=timeout)
        logging.info(f"Successfully connected to {host}")
        return ssh
    except AuthenticationException as auth_error:
        logging.error(f"Authentication failed: {auth_error}")
        print(f"Authentication failed: {auth_error}")
    except SSHException as ssh_error:
        logging.error(f"SSH connection failed: {ssh_error}")
        print(f"SSH connection failed: {ssh_error}")
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        print(f"An error occurred: {e}")
    return None


def close_ssh_connection(ssh):
    """关闭SSH连接"""
    if ssh:
        ssh.close()
        logging.info("SSH connection closed")


def execute_ssh_command(ssh, command):
    """在远程服务器上执行SSH命令"""
    try:
        start_time = time.time()
        logging.info(f"Executing command: {command}")
        stdin, stdout, stderr = ssh.exec_command(command)
        output = stdout.read().decode()
        error = stderr.read().decode()

        if error:
            logging.error(f"Command error: {error}")
            print("Command Error:\n", error)
            return None

        end_time = time.time()
        logging.info(f"Command executed in {end_time - start_time:.2f} seconds")
        return output
    except Exception as e:
        logging.error(f"An error occurred while executing the command: {e}")
        print(f"An error occurred while executing the command: {e}")
        return None


def extract_ipv6(output, interface):
    """提取指定接口的IPv6地址"""
    ipv6_pattern = re.compile(rf'{interface}.*?\n\s*inet6\s+([\da-fA-F:]+)/\d+\s+(scope global|scope link)', re.S)
    matches = ipv6_pattern.findall(output)
    if matches:
        ipv6_addresses = [match[0] for match in matches]
        logging.info(f"IPv6 addresses for {interface}: {ipv6_addresses}")
        return ipv6_addresses
    else:
        logging.warning(f"No IPv6 address found for interface {interface}")
        return None


def process_ipv6_addresses(ssh, command, interface_name):
    """执行SSH命令并处理指定接口的IPv6地址"""
    output = execute_ssh_command(ssh, command)
    if output:
        ipv6_addresses = extract_ipv6(output, interface_name)
        if ipv6_addresses:
            return ipv6_addresses
        else:
            return None
    else:
        return None


def generate_dingding_url(secret, base_url):
    """生成带有签名和时间戳的钉钉机器人URL"""
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return f"{base_url}&timestamp={timestamp}&sign={sign}"


def send_dingding_message(url, message):
    """发送钉钉消息"""
    payload = {'msgtype': 'text', 'text': {'content': message}}
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        print("消息已成功发送到钉钉机器人")
    else:
        print("发送消息失败,状态码:", response.status_code)
        print("响应内容:", response.text)


def main():
    # 主机信息列表
    hosts = [
        {"host": "192.168.1.1", "note": "server1", "port": 22, "username": os.getenv("SSH_USERNAME"), 
         "password": os.getenv("SSH_PASSWORD"), "interface": "br-lan"},
        {"host": "192.168.1.2", "note": "server2", "port": 22, "username": os.getenv("SSH_USERNAME"), 
         "password": os.getenv("SSH_PASSWORD"), "interface": "ens192"},
        {"host": "192.168.1.3", "note": "server3", "port": 22, "username": os.getenv("SSH_USERNAME"), 
         "password": os.getenv("SSH_PASSWORD"), "interface": "ens192"},
        {"host": "192.168.1.4", "note": "server4", "port": 22, "username": os.getenv("SSH_USERNAME"), 
         "password": os.getenv("SSH_PASSWORD"), "interface": "ens34"}
    ]

    # 钉钉机器人的API URL和密钥(从环境变量中获取)
    dingding_base_url = os.getenv("DINGDING_BASE_URL")
    secret = os.getenv("DINGDING_SECRET")

    messages = []

    for host_info in hosts:
        ssh = None
        try:
            ssh = create_ssh_client(
                host_info["host"],
                host_info["port"],
                host_info["username"],
                host_info["password"]
            )

            if ssh:
                ipv6_addresses = process_ipv6_addresses(ssh, "ip addr", host_info["interface"])
                # 生成消息内容
                new_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                ipv6_message = '\n'.join(ipv6_addresses) if ipv6_addresses else '获取失败'
                message = (f"时间:{new_time}\n"
                           f"Host: {host_info['host']}\n"
                           f"备注: {host_info['note']}\n"
                           f"IPv4 address: {host_info['host']}\n"
                           f"IPv6 address: {ipv6_message}")
                messages.append(message)
            else:
                messages.append(f"Host: {host_info['host']}\nIPv4 address: {host_info['host']}\nIPv6 address: 获取失败")

        except Exception as e:
            logging.error(f"An error occurred while processing host {host_info['host']}: {e}")
            messages.append(f"Host: {host_info['host']}\nIPv4 address: {host_info['host']}\nIPv6 address: 获取失败")

        finally:
            close_ssh_connection(ssh)

    # 发送所有钉钉消息
    dingding_news = '\n\n'.join(messages)
    dingding_url = generate_dingding_url(secret, dingding_base_url)
    send_dingding_message(dingding_url, dingding_news)


if __name__ == "__main__":
    main()

Top