安全一直是我观注的重点,为防止规则直接上线误杀正常请求,通过交换机流量静像到我的一台机器,进行流量静像分析,匹配我的规则,抓除需要加入白名单的url,然后联动nginx 和前端防火墙,行成一套入侵防御系统,以下只为针对七层做分析,稍后会对四层数据一起分析入库

config下whiteurl.py中添加需要过滤的白名单即可,当然可以是txt,可根据自己的实际情况进行修改,核心代码就这么多,IP库自己下一个就可以了

waf

代码如下:
waf.py

# !/usr/bin/env python
#-*- coding: utf-8 -*-
#=============================================================================
#     FileName: test.py
#         Desc:
#       Author: 苦咖啡
#        Email: voilet@qq.com
#     HomePage: http://blog.kukafei520.net
#      Version: 0.0.1
#   LastChange: 2014-09-01
#      History:
#=============================================================================
import pcap
import dpkt
import sys
import time
import re

#初始化ip库
from api.QQWry import *
from check_data import hack_filter, hackerinfo
#导入白名单
from config.whiteurl import *

pc = pcap.pcap("p2p1")    #注,参数可为网卡名,如eth0
pc.setfilter('tcp port 80')    #设置监听过滤器



for ts, buf in pc:    #ptime为收到时间,pdata为收到数据

    eth = dpkt.ethernet.Ethernet(buf)
    ip = eth.data
    tcp = ip.data
    src_ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.src)))
    src_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(ts + 28800))

    try:
        if tcp.dport == 80:

            http = dpkt.http.Request(tcp.data)

            #截取url以便白名单验证
            get_data_url = http.uri.split("?")[0]

            check_data = hack_filter(http)
            result = check_data.run()

            if result["status"] and get_data_url not in url_list:

                hack_data = hackerinfo(http, result["acl"], src_ip, src_time)
                hack_status = hack_data.run()

                print hack_status

    except:pass

源码下载地址:

https://github.com/voilet/waf

分析程序如下
check_data.py

# !/usr/bin/env python
#-*- coding: utf-8 -*-
#=============================================================================
#     FileName: check_data.py
#         Desc:
#       Author: 苦咖啡
#        Email: voilet@qq.com
#     HomePage: http://blog.kukafei520.net
#      Version: 0.0.1
#   LastChange: 2014-09-03
#      History:
#=============================================================================
import acl
import re

#初始化ip库
from api.QQWry import *

#导入白名单
from config.whiteurl import *
import acl

tt = IPSearch('/home/data/hacker/api/QQWry.Dat')

class hack_filter:
    """
    黑客数据行为分析
    user-agent, cookies, uri, body
    匹配特征即触发上报机志
    """

    def __init__(self, http_data):
        self.uri = http_data.uri
        self.user_agent = http_data.headers["user-agent"]
        if http_data.headers.get("cookie"):
            self.cookie = http_data.headers["cookie"]
        else:
            self.cookie = False
        if http_data.body:
            self.body = http_data.body
        else:
            self.body = False

    def run(self):
        """
        开始匹配分析
        """
        if self.cookie:
            for rule in acl.cookie_acl:
                result = re.compile(rule).findall(self.cookie)
                if result:
                    return {"status": True, "acl": rule}
        if self.body:
            for rule in acl.post_acl:
                result = re.compile(rule).findall(self.body)

                if result:
                    return {"status": True, "acl": rule}

        for rule in acl.args:
            result = re.compile(rule).findall(self.uri)
            if result:
                return {"status": True, "acl": rule}


        for rule in acl.useragent:
            result = re.compile(rule).findall(self.user_agent)
            if result:
                return {"status": True, "acl": rule}

        for rule in acl.url_list:
            result = re.compile(rule).findall(self.uri)
            if result:
                return {"status": True, "acl": rule}

        return {"status": False, "message": "no find acl"}

class hackerinfo:
    """
    黑客详细信息分析
    """
    def __init__(self, http_data, acl, src_ip, src_time):
        self.uri = http_data.uri
        self.user_agent = http_data.headers["user-agent"]
        self.src_ip = src_ip
        self.src_time = src_time
        self.host = http_data.headers["host"]
        self.method = http_data.method
        self.acl = acl
        self.headers = http_data.headers

        if http_data.headers.get("cookie"):
            self.cookie = http_data.headers["cookie"]
        else:
            self.cookie = ""
        if http_data.body:
            self.body = http_data.body
        else:
            self.body = ""

    def run(self):
        """
        黑客详细信息
        :return:
        """


        if self.method == 'POST':


            city_data = tt.find(self.src_ip)
            hacker_city = unicode(city_data[0], 'gb2312').encode('utf-8')
            hacker_addr = unicode(city_data[1], 'gb2312').encode('utf-8')
            domain = "http://%s%s" % (self.host, self.uri)

            s = "时间: %s\n" \
                "攻击URL: %s\n" \
                "域名: %s\n" \
                "攻击ip: %s\n" \
                "攻击所在地: %s%s\n" \
                "User-Agent: %s\n" \
                "状态: POST\n" \
                "提交数据: %s\n" \
                "详细信息: %s\n" \
                "匹配规则: %s\n--------------------------------------\n" % \
                (self.src_time, domain, self.src_ip, self.host, hacker_city, hacker_addr, self.user_agent, self.body, self.headers, self.acl)

            return s

        else:
            city_data = tt.find(self.src_ip)
            hacker_city = unicode(city_data[0], 'gb2312').encode('utf-8')
            hacker_addr = unicode(city_data[1], 'gb2312').encode('utf-8')
            domain = "http://%s%s" % (self.host, self.uri)

            s = "时间: %s\n" \
                "攻击URL: %s\n" \
                "域名: %s\n" \
                "攻击ip: %s\n" \
                "攻击所在地: %s%s\n" \
                "User-Agent: %s\n" \
                "状态: GET\n" \
                "详细信息: %s\n" \
                "匹配规则: %s\n--------------------------------------\n" % \
                (self.src_time, domain, self.src_ip, self.host, hacker_city, hacker_addr, self.user_agent, self.headers, self.acl)

            return s

规则入如
acl.py

# !/usr/bin/env python
#-*- coding: utf-8 -*-
#=============================================================================
# FileName: acl.py
# Desc:
# Author: 苦咖啡
# Email: voilet@qq.com
# HomePage: http://blog.kukafei520.net
# Version: 0.0.1
# LastChange: 2014-09-01
# History:
#=============================================================================

args = [
“\.\./”,
“\:\$”,
“\$\{“,
“select.+(from|limit)”,
“(?:(union(.*?)select))”,
“having|rongjitest”,
“sleep\((\s*)(\d*)(\s*)\)”,
“benchmark\((.*)\,(.*)\)”,
“base64_decode\(“,
“(?:from\W+information_schema\W)”,
“(?:(?:current_)user|database|schema|connection_id)\s*\(“,
“(?:etc\/\W*passwd)”,
“into(\s+)+(?:dump|out)file\s*”,
“group\s+by.+\(“,
“xwork.MethodAccessor”,
“(?:define|eval|file_get_contents|include|require|require_once|shell_exec|phpinfo|system|passthru|preg_\w+|execute|echo|print|print_r|var_dump|(fp)open|alert|showmodaldialog)\(“,
“xwork\.MethodAccessor”,
“(gopher|doc|php|glob|file|phar|zlib|ftp|ldap|dict|ogg|data)\:\/”,
“java\.lang”,
“\$_(GET|post|cookie|files|session|env|phplib|GLOBALS|SERVER)\[“,
“\<(iframe|script|body|img|layer|div|meta|style|base|object|input)”,
“(onmouseover|onerror|onload)\=”,
]

cookie_acl = [
“\.\./”,
“\:\$”,
“\$\{“,
“select.+(from|limit)”,
“(?:(union(.*?)select))”,
“having|rongjitest”,
“sleep\((\s*)(\d*)(\s*)\)”,
“benchmark\((.*)\,(.*)\)”,
“base64_decode\(“,
“(?:from\W+information_schema\W)”,
“(?:(?:current_)user|database|schema|connection_id)\s*\(“,
“(?:etc\/\W*passwd)”,
“into(\s+)+(?:dump|out)file\s*”,
“group\s+by.+\(“,
“xwork.MethodAccessor”,
“(?:define|eval|file_get_contents|include|require|require_once|shell_exec|phpinfo|system|passthru|preg_\w+|execute|echo|print|print_r|var_dump|(fp)open|alert|showmodaldialog)\(“,
“xwork\.MethodAccessor”,
“(gopher|doc|php|glob|file|phar|zlib|ftp|ldap|dict|ogg|data)\:\/”,
“java\.lang”,
“\$_(GET|post|cookie|files|session|env|phplib|GLOBALS|SERVER)\[“,
]

post_acl = [
“\.\./”,
“select.+(from|limit)”,
“(?:(union(.*?)select))”,
“having|rongjitest”,
“sleep\((\s*)(\d*)(\s*)\)”,
“benchmark\((.*)\,(.*)\)”,
“base64_decode\(“,
“(?:from\W+information_schema\W)”,
“(?:(?:current_)user|database|schema|connection_id)\s*\(“,
“(?:etc\/\W*passwd)”,
“into(\s+)+(?:dump|out)file\s*”,
“group\s+by.+\(“,
“xwork.MethodAccessor”,
“(?:define|eval|file_get_contents|include|require|require_once|shell_exec|phpinfo|system|passthru|preg_\w+|execute|echo|print|print_r|var_dump|(fp)open|alert|showmodaldialog)\(“,
“xwork\.MethodAccessor”,
“(gopher|doc|php|glob|file|phar|zlib|ftp|ldap|dict|ogg|data)\:\/”,
“java\.lang”,
“\$_(GET|post|cookie|files|session|env|phplib|GLOBALS|SERVER)\[“,
“\<(iframe|script|body|img|layer|div|meta|style|base|object|input)”,
“(onmouseover|onerror|onload)\=”,
]

url_list = [
“\.(svn|htaccess|bash_history)”,
“\.(bak|inc|old|mdb|sql|backup|java|class)$”,
“(vhost|bbs|host|wwwroot|www|site|root|hytop|flashfxp).*.rar”,
“(phpmyadmin|jmx-console|jmxinvokerservlet)”,
“java\.lang”,
“/(attachments|upimg|images|css|uploadfiles|html|uploads|templets|static|template|data|inc|forumdata|upload|includes|cache|avatar)/(\\w+).(php|jsp)”,
]

useragent = [“(HTTrack|harvest|audit|dirbuster|pangolin|nmap|sqln|-scan|hydra|Parser|libwww|BBBike|sqlmap|w3af|owasp|Nikto|fimap|havij|PycURL|zmeu|BabyKrokodil|netsparker|httperf|bench)”]



分享到: 更多

这篇日志的 QR 二维码为:

九月 4th, 2014

Posted In: python

发表评论

电子邮件地址不会被公开。 必填项已用*标注

无觅相关文章插件,快速提升流量