Source code for scylla.tcpping

import socket
import time
from timeit import default_timer as timer

from six.moves import zip_longest


[docs]def avg(x): return sum(x) / float(len(x))
[docs]class Socket(object): def __init__(self, family, type_, timeout): s = socket.socket(family, type_) s.settimeout(timeout) self._s = s
[docs] def connect(self, host, port=80): self._s.connect((host, int(port)))
[docs] def shutdown(self): self._s.shutdown(socket.SHUT_RD)
[docs] def close(self): self._s.close()
[docs]class Timer(object): def __init__(self): self._start = 0 self._stop = 0
[docs] def start(self): self._start = timer()
[docs] def stop(self): self._stop = timer()
[docs] def cost(self, funcs, args): # TODO: handle ConnectionRefusedError self.start() for func, arg in zip_longest(funcs, args): if arg: func(*arg) else: func() self.stop() return self._stop - self._start
[docs]class Ping(object): def __init__(self, host: str, port: int, timeout=1): self.timer = Timer() self._successes = 0 self._failed = 0 self._conn_times = [] self._host = host self._port = port self._timeout = timeout def _create_socket(self, family, type_): return Socket(family, type_, self._timeout) def _success_rate(self): count = self._successes + self._failed try: rate = float(self._successes) / count rate = '{0:.2f}'.format(rate) except ZeroDivisionError: rate = '0.00' return float(rate) def _get_conn_times(self) -> [float]: return self._conn_times if self._conn_times != [] else [0]
[docs] def get_maximum(self) -> float: return max(self._get_conn_times())
[docs] def get_minimum(self) -> float: return min(self._get_conn_times())
[docs] def get_average(self) -> float: return avg(self._get_conn_times())
[docs] def get_success_rate(self): return self._success_rate()
[docs] def ping(self, count=10, sleep=0.3): for n in range(1, count + 1): s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM) try: time.sleep(sleep) cost_time = self.timer.cost( (s.connect, s.shutdown), ((self._host, self._port), None)) s_runtime = 1000 * cost_time self._conn_times.append(s_runtime) except socket.timeout: self._failed += 1 except ConnectionResetError: self._failed += 1 else: self._successes += 1 finally: s.close()
[docs]def ping(host: str, port: int, count: int = 10, sleep: float = 0.2) -> (int, float): """ Ping a server and port with tcp socket :param host: the hostname :param port: the port number :param count: number of connection tries, by default it is 10 :param sleep: length of sleep time in between sequent pings, by default it is 0.3 :return: a tuple for (average_latency, success_rate) """ p = Ping(host=host, port=port) p.ping(count=count, sleep=sleep) return p.get_average(), p.get_success_rate()