Source code for honeypots.honeypot

# Checkpot - Honeypot Checker
# Copyright (C) 2018  Vlad Florea
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# DISCLAIMER: All prerequisites (containers, additional programs, etc.)
# and libraries that might be needed to run this program are property
# of their original authors and carry their own separate licenses that
# you should read to inform yourself about their terms.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# As this software is PROVIDED WITH ABSOLUTELY NO WARRANTY OF ANY KIND,
# YOU USE THIS SOFTWARE AT YOUR OWN RISK!
#
# By using this tool YOU TAKE FULL LEGAL RESPONSIBILITY FOR ANY
# POSSIBLE OUTCOME.
#
# We strongly recommend that you read all the information in the README.md
# file (found in the root folder of this project) and even the
# documentation (which you can find locally in the /docs/ folder or
# at http://checkpot.readthedocs.io/) to make sure you fully
# understand how this tool works and consult all laws that apply
# to your use case.
#
# We strongly suggest that you keep this notice intact for all files.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
# The local copy of the license should be located in the root folder of the app
# in the file gpl-3.0.txt.
#
# You can contact the author and team at any time via the Checkpot official
# GitHub page: https://github.com/honeynet/checkpot or via the Honeynet
# official Slack channel: https://gsoc-slack.honeynet.org/ or
#                         https://honeynetpublic.slack.com/
#
# You can contact us at any time and with any question, we are here to help!
#
# If you consider any information in this copyright notice might be incorrect,
# outdated, arises any questions, raises any problems, etc. please contact us
# and we will make it our top priority to fix it. We have the deepest respect
# for the work of all authors and for all of our users.

import nmap
import platform
import urllib.request
import urllib.error
import socket


[docs]class Honeypot: """ Holds all data known about one Honeypot. Used for decoupling the acquisition of the data from its usages. """ __debug = False # enables debug prints scan_id = 0 websites = [] # cached web page data for current honeypot css = [] # cached css data for current honeypot
[docs] def __init__(self, address, scan_os=False, verbose_scan=True): """ :param address: ip address of the target :param scan_os: scan for Operating System information (requires elevated privileges) :param verbose_scan: print progress bars and stats when running a scan """ self.address = address self.scan_os = scan_os self.host = None if verbose_scan: try: self._nm = nmap.PrintProgressPortScanner() except AttributeError: # not running the modded version of python-nmap print("\nWARNING: Cannot display progress bars as you have an unsupported version of python-nmap. " "Please install from requirements.txt.") print("Example: pip install -r requirements.txt\n") self._nm = nmap.PortScanner() else: self._nm = nmap.PortScanner()
[docs] def scan(self, port_range=None, fast=False): """ Runs a scan on this Honeypot for data acquisition. """ args = '-sV -n --stats-every 1s' if fast: args += ' -Pn -T5' if port_range: args += ' -p '+port_range if self.scan_os: args += ' -O' if platform.system() == 'Windows': # No sudo on Windows systems, let UAC handle this # FIXME workaround for the subnet python-nmap-bug.log also? # FIXME somehow this also makes the command history of the terminal vanish? self._nm.scan(hosts=self.address, arguments=args, sudo=False) else: try: # FIXME this is just a workaround for the bug shown in python-nmap-bug.log self._nm.scan(hosts=self.address, arguments=args, sudo=True) except Exception as e: if self.__debug: print(e.__class__, "occured trying again with get_last_output") self._nm.get_nmap_last_output() self._nm.scan(hosts=self.address, arguments=args, sudo=True) else: try: # FIXME this is just a workaround for the bug shown in python-nmap-bug.log self._nm.scan(hosts=self.address, arguments=args, sudo=False) except Exception as e: if self.__debug: print(e.__class__, "occured trying again with get_last_output") self._nm.get_nmap_last_output() self._nm.scan(hosts=self.address, arguments=args, sudo=False) hosts = self._nm.all_hosts() if hosts: self.host = hosts[0] else: self.host = None raise ScanFailure("Requested host not available")
# TODO error on connection refused, check if self._nm[self.host]['status']['reason'] = conn_refused # TODO also add -Pn option? @property def os(self): if self.scan_os and self.host and 'osmatch' in self._nm[self.host]: if self._nm[self.host]['osmatch'] and self._nm[self.host]['osmatch'][0]['osclass']: return self._nm[self.host]['osmatch'][0]['osclass'][0]['osfamily'] @property def ip(self): return self._nm[self.host]['addresses']['ipv4']
[docs] def has_tcp(self, port_number): """ Checks if the Honeypot has a certain port open. :param port_number: port number :return: port status boolean """ return self._nm[self.host].has_tcp(port_number)
[docs] def get_service_ports(self, service_name, protocol): """ Checks if the Honeypot has a certain service available. :param service_name: name of the service to search for :param protocol: 'tcp' or 'udp' :return: list of port numbers (a certain service can run on multiple ports) """ results = [] if protocol not in self._nm[self.host]: return results for port, attributes in self._nm[self.host][protocol].items(): if attributes['name'] == service_name: results.append(port) return results
[docs] def get_service_name(self, port, protocol): """ Get name of service running on requested port :param port: target port :param protocol: 'tcp' or 'udp' :return: service name """ if protocol not in self._nm[self.host]: return None return self._nm[self.host][protocol][port]["name"]
[docs] def get_all_ports(self, protocol): """ Returns all open ports on the honeypot :param protocol: 'tcp' / 'udp' :return: list of ports """ if protocol not in self._nm[self.host]: return [] else: return list((self._nm[self.host][protocol]).keys())
[docs] def get_service_product(self, protocol, port): """ Get the product description for a certain port :param protocol: 'tcp' / 'udp' :param port: port number :return: description string """ # TODO cache requests for all parsers if protocol not in self._nm[self.host]: return None else: return self._nm[self.host][protocol][port]['product']
[docs] def run_nmap_script(self, script, port, protocol='tcp'): """ Runs a .nse script on the specified port range :param script: <script_name>.nse :param port: port / port range :param protocol: 'tcp'/'udp' :return: script output as string :raises: ScanFailure """ tmp = nmap.PortScanner() tmp.scan(hosts=self.address, arguments="--script " + script + " -p " + str(port)) port_info = tmp[self.address][protocol][int(port)] if 'script' in port_info: return port_info['script'][script.split('.')[0]] else: raise ScanFailure("Script execution failed")
[docs] def get_banner(self, port, protocol='tcp'): """ Grab banner on specified port :param port: port number :param protocol: 'tcp' / 'udp' :return: banner as string :raises: ScanFailure """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(5) try: s.connect((self.address, port)) recv = s.recv(1024) except socket.error as e: raise ScanFailure("Banner grab failed for port", port, e) return recv
[docs] def get_websites(self): """ Gets websites for all active web servers found on the target :return: list of website content strings """ if self.websites and self.scan_id == id(self.host): # if cache is not empty and we are still on the most recent scan return self.websites # refresh cache self.websites = [] target_ports = self.get_service_ports('http', 'tcp') # TODO target_ports += self.get_service_ports('https', 'tcp') for port in target_ports: try: request = urllib.request.urlopen('http://' + self.ip + ':' + str(port) + '/', timeout=5) if request.headers.get_content_charset() is None: content = request.read() else: content = request.read().decode(request.headers.get_content_charset()) self.websites.append(content) except Exception as e: if self.__debug: print('Failed to fetch homepage for site', self.ip, str(port), e) return self.websites
[docs] def get_websites_css(self): """ Gets website stylesheet for all active web servers found on the target :return: list of stylesheet strings """ # TODO create a Website class containing stylesheet and others? if self.css and self.scan_id == id(self.host): # if cache is not empty and we are still on the most recent scan return self.css # refresh cache self.css = [] target_ports = self.get_service_ports('http', 'tcp') # target_ports += self.get_service_ports('https', 'tcp') for port in target_ports: try: request = urllib.request.urlopen('http://' + self.ip + ':' + str(port) + '/style.css', timeout=5) if request.headers.get_content_charset() is None: content = request.read() else: content = request.read().decode(request.headers.get_content_charset()) self.css.append(content) except Exception as e: if self.__debug: print('Failed to fetch stylesheet for site', self.ip, str(port), e) return self.css
[docs]class ScanFailure(Exception): """Raised when one of the data gathering methods fails"""
[docs] def __init__(self, *report): """ :param report: description of the error """ self.value = " ".join(str(r) for r in report)
def __str__(self): return repr(self.value) def __repr__(self): return 'ScanFailure exception ' + self.value