Beeswarm是一个主动蜜罐系统,通过部署一些模拟真实用户的节点与蜜罐系统通信,从而引诱窃听了这些会话的攻击者攻击蜜罐系统,以捕获发现攻击。
一、介绍
蜜罐系统一般不会主动产生流量,而是被动的等待攻击流量。Beeswarm则是一款主动诱骗攻击者的蜜罐,可以模拟客户端与服务器的通信(诱饵通信),诱骗黑客攻击蜜罐,以对付企图通过网络监听获取敏感信息的攻击者。诱饵通信中,包括大量攻击者可能非常感兴趣的信息,如用户名口令、管理后台等。如果有攻击者在网络中进行窃听,获取了诱饵通信的内容,并使用这些敏感信息(如使用诱饵登录凭证)登录系统,Beeswarm就能发现网络攻击。此外,Beeswarm也做了很好的细节处理,如对于交互式的协议(如ssh和telnet),诱饵会话的流量模式将会匹配人类的打字速度,目的是使诱饵会话流量看起来合法而且能够吸引黑客。
图 1 Beeswarm欺骗框架示例
二、安装方法
1 实验环境
Beeswarm包括Beeswarm Server、Beeswarm Drone Client、Beeswarm Drone Honeypot。Beeswarm DroneClient在网络中和Beeswarm Drone Honeypot通信并故意泄漏凭证等信息,目的是检测网络中是否有攻击者(图1 中的Adversary)窃听了网络流量并尝试使用窃取的凭证登录蜜罐系统,以发现攻击。BeeswarmServer对Beeswarm Drone Client、Beeswarm Drone Honeypot进行管理并收集Beeswarm DroneClient、Beeswarm Drone Honeypot上报的信息,通过BeeswarmServer提供的可视化接口可以查看Beeswarm Drone Client、Beeswarm Drone Honeypot、诱饵会话以及捕获到的攻击的信息。在本次实验中使用了三台安装Ubuntu Server 16.04 LTS系统的虚拟机,分别充当BeeswarmServer,Beeswarm Drone Client,Beeswarm Drone Honeypot。
2 Beeswarm Server安装
首先安装依赖类库:
sudo apt-get install libffi-dev build-essentialpython-dev python-pip libssl-dev libxml2-dev libxslt1-dev
sudo pip install pydes –allow-external pydes –allow-unverified pydes
运行第二条命令的时候有一些告警信息,不影响安装。然后安装Beeswarm:
sudo pip install beeswarm
安装成功后使用如下命令启动Beeswarm服务:
mkdir server_workdir
cd server_workdir
beeswarm –server
在启动的过程中需要输入服务器的IP或者域名,我们的输入为"192.168.71.130",安装完成后系统会给出管理员密码,这个密码是登录系统进行配置时的管理员密码,用户名是admin。
使用浏览器通过HTTPS协议访问服务器,端口号默认是5000(https://服务器IP:5000),登录界面如下图所示:
图 2 管理员登录界面
用户名为admin,密码为安装阶段给出的密码,然后登录(图3),此时还没有配置蜜罐和Drone客户端,因此没有数据。
图 3 登录后的界面
3 BeeswarmDrone Client、BeeswarmDrone Honeypot安装
在Beeswarm中Drone Client和Drone Honeypot都是Drone。在第二台虚拟机上重复上述安装步骤,直到Beeswarm安装成功,然后执行如下指令:
mkdir drone_workdir
cd drone_workdir
使用浏览器访问服务器(https:// 192.168.71.130:5000),输入用户名和密码,选择+Drone菜单项,可以得到启动Drones时需要告诉Drones的Beeswarm服务器的URL,用于让服务器识别Drones:
图 4 选择+Drone
之后执行如下命令(如果系统想要配置为Drone Honeypot,那么要用管理员权限运行,因为蜜罐端要占用一些熟知端口,如果仅仅是部署Drone客户端则不用),将上一步 得到的URL作为参数输入:
sudo beeswarm –config https://192.168.71.130:5000/ws/drone/add/82a93121-22dc-4150-a017-976c3ea73f50
运行命令后Drone与服务器连接等待配置:
图 5 Drone等待配置
当Drones和服务器连接后,通过浏览器访问服务器进行配置,配置页面如图3,选择Drone菜单项,然后选Unassigned,此时可以看到待配置的选项,见图6。
图 6 待配置的Drone
选择蓝色的按钮进入模式选择页面,有蜜罐和客户端两种类型,先选择蜜罐类型:
图 7 Drone类型选择
选择之后进入蜜罐节点配置页面,需要选择开启哪些服务;
图 8 蜜罐开启的服务选择
配置完成后蜜罐主机开始监听端口;
图 9 蜜罐主机监听端口
在第三台虚拟机上重复上述步骤,但是在图7的模式选择中选Client;Client模式配置页面如图10;
图 10 client配置页面
三、使用方法
在Beeswarm蜜罐系统中,Beeswarm Drone Client需要部署在想要进行检测的网段,一旦完成部署,Beeswarm Drone Client将会不断访问系统(图11是Beeswarm Drone Client 与Honeypot 交互示例)。当BeeswarmDrone Client所在网段有攻击者进行窃听并使用窃听得到的登录凭证访问蜜罐系统时,Beeswarm就能发现攻击者的存在。
图 11 Beeswarm Drone Client 与Honeypot 交互示例
图 12 通过wireshark抓包获取的telnet会话内容
Beeswarm Drone Client 和BeeswarmDrone Honeypot会将信息报告给服务器,访问服务器可以看到Honeypot、Clients、产生的诱饵会话和攻击信息。
图 13 运行结果
四、Beeswarm相关资料
1.蜜罐系统简介 :http://www.beeswarm-ids.org/
2.源码下载地址:https://github.com/honeynet/beeswarm
*本文原创作者:ArkTeam/HP,转载请注明来自FreeBuf黑客与极客(FreeBuf.COM)
黑客技术官网地址:http://www.hackdig.com/
*本文原创作者:VillanCh,本文属FreeBuf原创奖励计划,未经许可禁止转载
回顾
我们在上一篇文章中讨论 TDD 开发渗透工具,并且完成了我们 HTTP 代理扫描器的一个功能模块。那么我们接下来就继续使用 TDD 完成接下来的部分,大家也深入理解一下这种开发方式的好处(可是代码量大啊~hhhh)。
如何继续上次的任务?
先运行测试用例!
如果上次的任务没有完成,你就需要继续调整你的代码,修改,通过测试用例,这就是你首先要做的,当然如果上次的任务你完成了,测试用例顺利通过,那么当然好啊,接下来你就可以直接再写一个测试用例,开始一个新的功能或者模块了。
继续
在之前的文章中我们完成了针对一个地址的代理验证,那么,我们接下来应该怎么做呢?如果我们有一堆地址需要检测,这样我们就要处理两大问题:
安全的多线程(多进程)并发
正确收取结果
短小精干的并发处理
这样的话,涉及到多线程编程,保证线程安全什么 BlaBla,对于一个渗透测试爱好者来说,可能会稍微有一点挑战:(我平时写的爆破甚至都是单线程的!),写多线程还是有点麻烦的!
当然我这里也有考虑到这一点,在写过很多类似的脚本之后,最近也算是首创了一种短小精悍的并发框架,具体的开发流程自然也是 TDD,这里我就不会和上次一样很细致的一步一步来讲到底是怎么写的。我们就一起来看一下吧!
def test_pool(self): '''测试并发''' #定义一个任务函数,参数 *args 不定长 def demo_task(*args): '''simulate the plugin.run''' print '[!] Computing!' time.sleep(args[0]) print 'Sleep : ', args[0] print '[!] Finished!' print returns = 'Runtime Length : %s' % str(args) return returns #基本使用方法 #1. 建立一个并发池对象, #2. 添加任务 #3. 启动 #4. 通过一个队列获取结果 pool = Pool() pool.add_task(demo_task, 7) pool.add_task(demo_task, 3) q = pool.run() self.assertIsInstance(q, Queue) r = q.get() print r self.assertIsInstance(r, str) pool.add_task(demo_task, 4) pool.add_task(demo_task, 2) pool.add_task(demo_task, 1) pool.add_task(demo_task, 6) pool.add_task(demo_task, 7) pool.add_task(demo_task, 1) pool.add_task(demo_task, 6) pool.run() r = q.get() print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' ,r
大家会发现,在测试的过程中,已经写明了这个并发池的基本使用方法,当然这个使用方法不是根据 Pool 源代码写出的,而是我自己"瞎"写的:也就是说,我并不知道源代码,但是我设想我的小框架应该这么写,所以我就这么写了,至于怎么实现,不是我写测试用例需要考虑的问题。所以,现在觉得,这个框架是非常的用户友好啊:至少对我自己来说用起来是非常的舒服。
换句话来说,我在写上面的代码的时候,其实并不知道自己的控制并发的小玩意叫什么内部内部什么构造,需要什么模块,需要什么函数,或者有没有什么特殊属性之类的,就只是"我觉得他应该是这样的"而已。
至于写下了测试用例以后发生了什么我觉得就不用再多说了,大家都可以猜得到就是不停测试不停修改代码直到成功完成需要的功能。(当然这个测试程序的测试代码就不会面面俱到,满足我们需求的代码就OK)
期待的运行结果如下:
[!] Computing! [!] Computing! Sleep : 3 [!] Finished! Runtime Length : (3,) [!] Computing! [!] Computing! [!] Computing! [!] Computing! [!] Computing! [!] Computing! [!] Computing! Sleep : 1 Sleep : [!] Finished! 1 [!] Finished! ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Runtime Length : (1,) . ---------------------------------------------------------------------- Ran 1 test in 4.033s OK Sleep : 2 [!] Finished! Sleep : 7 [!] Finished! Sleep : 4 [!] Finished! Sleep : 6 [!] Finished! Sleep : 6 [!] Finished! Sleep : 7 [!] Finished!
获取 3s 和 1s 的两个结果,其他的分开操作不去获取结果,测试在 4.033s 结束,其余的进程后台运行,但是不会被获取结果:成功达到了初衷。
源码在下面:
class Pool(object): """Thread or Proccess Pool to support the concurrence of many tasks""" #---------------------------------------------------------------------- def __init__(self, thread_max=50, mode='Thread'): """Constructor""" modes = ['thread', 'process'] self.mode = mode.lower() if mode in modes else 'thread' self.task_list = [] self.result_queue = Queue() self.signal_name = self._uuid1_str() self.lock = threading.Lock() if self.mode == 'thread' else multiprocessing.Lock() self.thread_max = thread_max self.current_thread_count = 0 def _uuid1_str(self): '''Returns: random UUID tag ''' return str(uuid.uuid1()) def add_task(self, func, *args, **argv): '''Add task to Pool and wait to exec Params: func : A callable obj, the entity of the current task args : the args of [func] argv : the argv of [func] ''' assert callable(func), '[!] Function can 't be called' ret = {} ret['func'] = func ret['args'] = args ret['argv'] = argv ret['uuid'] = self.signal_name self.task_list.append(ret) def run(self): """""" self._init_signal() Thread(target=self._run).start() return self.result_queue #---------------------------------------------------------------------- def _run(self): """""" for i in self.task_list: #print self.current_thread_count while self.thread_max <= self.current_thread_count: time.sleep(0.3) self._start_task(i) def _start_task(self, task): """""" self.current_thread_count = self.current_thread_count + 1 try: if self.mode == 'thread': #print 'Start' Thread(target=self._worker, args=(task,)).start() elif self.mode == 'process': Process(target=self._worker, args=(task,)).start() except TypeError: self.current_thread_count = self.current_thread_count - 1 def _worker(self, dictobj): """""" func = dictobj['func'] args = dictobj['args'] argv = dictobj['argv'] result = func(*args, **argv) self.lock.acquire() self._add_result_to_queue(result=result) self.lock.release() def _add_result_to_queue(self, **kw): """""" assert kw.has_key('result'), '[!] Result Error!' self.result_queue.put(kw['result']) self.current_thread_count = self.current_thread_count - 1
嗯,总之,这一百行不到代码可以实现:
控制线程数量监控:current_threads
简单易用的一次执行多个任务接口
异步获取结果
随时添加任务
我们想要的是批量执行扫描 IP 代理,所以上面的东西完全满足我们需求了。
下面我就简单来介绍一下上面的可以控制并发的 Pool 的流程,方便需要的读者可以参考一下:
但是由于我们的工具是自己使用,或者说是实验用,就不考虑 Treading/eventlet/stackless
的选择问题了,也不纠结给这个小工具命名的问题。(虽然我觉得这样说是有一些不负责任的)
获取与解析目标
拿到目标地址,我们既然是扫描肯定是批量扫描,那么我们肯定是要批量获取 IP 地址。
我们说的获取 IP 地址当然不是 DNS 查询,我们说的当然是我们需要怎么处理很多个 IP 地址:用户指定一个范围;或者仅仅是随机扫描特定区域的网段。想想,大多也就这样了,不会有新的途径了吧。然后 IP 地址有了,端口呢?我们怎么样选择扫描的端口?首先,我们肯定不能扫描全部的端口,因为这样速度太慢了,而且比如 3306 5432 111 22 25 这些端口显然不可能是一个 http 代理端口,我们就没必要对这些端口进行检查,根据经验来说 80-90, 8080-8090 这些端口是一个 http 代理的可能行比较大,再缩小一点,80,8080,8123,3128,9000。 就先这么多吧!
那么我们获取和解析目标的思路就有了: 获取一堆 IP 地址,然后用这一堆 IP 地址与想要检查的端口进行组合,然后把组合结果分配给我们上面刚刚完成的并发框架,然后收取结果。
好的,思路我们清楚之后,就需要开始动手完成这个部分了,没错,首先就是编写测试:
class ParseTargetTest(unittest.case.TestCase): """Test Parse Target""" #-------------------------------------------------------------- def runTest(self): IPs_1 = '123.1.2.0/24' IPs_2 = '123.1.2.3-123.1.3.5' ip_gen = generate_target(IPs_1, PORTS) self.assertIsInstance(ip_gen, iter) for i in ip_gen: ip_port = i.split(':') try: IPy.IP(ip_port[0]) except ValueError: pass ip_gen = generate_target(IPs_2, PORTS) self.assertIsInstance(ip_gen, iter) for i in ip_gen: ip_port = i.split(':') try: IPy.IP(ip_port[0]) except ValueError: pass
这样的测试如果跑通的话,我们只需要通过一个 generate_target 这个函数,就可以解析 IP 地址了,可以解析 IP 地址段,也可以解析单个 IP 地址,返回值是通过 yield 产生 generator。
根据这样的测试用例我们使用 IPy 模块,进行基本的 IP 运算(IP 地址网络运算),我们很容易就可以解决上面的问题:
import unittest import re import IPy from IPy import IP PORTS = ['80', '8080', '8123', '3128', '82'] #---------------------------------------------------------------------- def generate_target(IPs, ports): """Generate the target for scanning HTTP proxy Returns: A Generator: """ gen = None if '-' in IPs: pairs = IPs.split('-') start = pairs[0] end = pairs[1] gen = int2ips(IP(start).int(), IP(end).int()) else: gen = IP(IPs) for i in gen: for port in ports:
没有评论:
发表评论