#
import os
import sys
+import psutil
+
+from socket import AF_INET, SOCK_STREAM
try:
import cPickle as pickle #@UnusedImport
#
def __isNetworkConnectionActiveOnPort(port):
- # :NOTE: Under windows:
- # netstat options -l and -t are unavailable
- # grep command is unavailable
- if sys.platform == "win32":
- cmd = ['netstat','-a','-n','-p','tcp']
+ # psutil realization
+ ports =[]
+ template = (AF_INET, SOCK_STREAM, "LISTEN")
+ for c in psutil.net_connections(kind='inet'):
+ if (c.family, c.type, c.status) == template:
+ ports.append(c.laddr.port)
+ if port in ports:
+ return True
else:
- cmd = ['netstat','-ant']
- pass
-
- err = None
- try:
- from subprocess import Popen, PIPE, STDOUT
- p = Popen(cmd, stdout=PIPE, stderr=STDOUT)
- out, err = p.communicate()
- except:
- print("Error when trying to access active network connections.")
- if err: print(err)
- print("... Presumably this package is not installed...Please install netstat if available for your distribution.")
- print("... Trying socket based approach")
- try:
- import socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- result = sock.connect_ex(("127.0.0.1", port))
- if result == 0:
- print("Port %r: Closed" % (port))
- sock.close()
- return True
- else:
- sock.close()
- return False
- except:
- import traceback
- traceback.print_exc()
- return False
-
- from io import StringIO
- buf = StringIO(out.decode('utf-8', 'ignore'))
- ports = buf.readlines()
- # search for TCP - LISTEN connections
- import re
- regObj = re.compile( ".*tcp.*:([0-9]+).*:.*listen", re.IGNORECASE );
- for item in ports:
- try:
- p = int(regObj.match(item).group(1))
- if p == port: return True
- except:
- pass
- return False
-#
+ return False
+ #
def getPort(preferredPort=None):
logger.debug("GET PORT")