# -*- coding: utf-8 -*-
"""
Python wrapper for the iperf3 libiperf.so.0 library. The module consists of two
classes, :class:`Client` and :class:`Server`, that inherit from the base class
:class:`IPerf3`. They provide a nice (if i say so myself) and pythonic way to
interact with the iperf3 utility.
At the moment the module redirects stdout and stderr to a pipe and returns the
received data back after each ``client.run()`` or ``server.run()`` call. In
later releases there will be an option to toggle this on or off.
A user should never have to utilise the :class:`IPerf3` class directly, this
class provides common settings for the :class:`Client` and :class:`Server`
classes.
To get started quickly see the :ref:`examples` page.
.. moduleauthor:: Mathijs Mortimer <mathijs@mortimer.nl>
"""
from ctypes import util, cdll, c_char_p, c_int, c_char, c_void_p, c_uint64
import os
import select
import json
import threading
from socket import SOCK_DGRAM, SOCK_STREAM
try:
from queue import Queue
except ImportError:
from Queue import Queue # Python2 compatibility
__version__ = '0.1.10'
MAX_UDP_BULKSIZE = (65535 - 8 - 20)
def more_data(pipe_out):
"""Check if there is more data left on the pipe
:param pipe_out: The os pipe_out
:rtype: bool
"""
r, _, _ = select.select([pipe_out], [], [], 0)
return bool(r)
def read_pipe(pipe_out):
"""Read data on a pipe
Used to capture stdout data produced by libiperf
:param pipe_out: The os pipe_out
:rtype: unicode string
"""
out = b''
while more_data(pipe_out):
out += os.read(pipe_out, 1024)
return out.decode('utf-8')
def output_to_pipe(pipe_in):
"""Redirects stdout and stderr to a pipe
:param pipe_out: The pipe to redirect stdout and stderr to
"""
os.dup2(pipe_in, 1) # stdout
# os.dup2(pipe_in, 2) # stderr
def output_to_screen(stdout_fd, stderr_fd):
"""Redirects stdout and stderr to a pipe
:param stdout_fd: The stdout file descriptor
:param stderr_fd: The stderr file descriptor
"""
os.dup2(stdout_fd, 1)
# os.dup2(stderr_fd, 2)
[docs]class IPerf3(object):
"""The base class used by both the iperf3 :class:`Server` and :class:`Client`
.. note:: You should not use this class directly
"""
def __init__(self,
role,
verbose=True,
lib_name=None):
"""Initialise the iperf shared library
:param role: 'c' = client; 's' = server
:param verbose: enable verbose output
:param lib_name: optional name and path for libiperf.so.0 library
"""
if lib_name is None:
lib_name = util.find_library('libiperf')
if lib_name is None:
# If we still couldn't find it lets try the manual approach
lib_name = 'libiperf.so.0'
try:
self.lib = cdll.LoadLibrary(lib_name)
except OSError:
raise OSError(
"Couldn't find shared library {}, is iperf3 installed?".format(
lib_name
)
)
# Set the appropriate C types.
self.lib.iperf_client_end.restype = c_int
self.lib.iperf_client_end.argtypes = (c_void_p,)
self.lib.iperf_free_test.restxpe = None
self.lib.iperf_free_test.argtypes = (c_void_p,)
self.lib.iperf_new_test.restype = c_void_p
self.lib.iperf_new_test.argtypes = None
self.lib.iperf_defaults.restype = c_int
self.lib.iperf_defaults.argtypes = (c_void_p,)
self.lib.iperf_get_test_role.restype = c_char
self.lib.iperf_get_test_role.argtypes = (c_void_p,)
self.lib.iperf_set_test_role.restype = None
self.lib.iperf_set_test_role.argtypes = (c_void_p, c_char,)
self.lib.iperf_get_test_bind_address.restype = c_char_p
self.lib.iperf_get_test_bind_address.argtypes = (c_void_p,)
self.lib.iperf_set_test_bind_address.restype = None
self.lib.iperf_set_test_bind_address.argtypes = (c_void_p, c_char_p,)
self.lib.iperf_get_test_server_port.restype = c_int
self.lib.iperf_get_test_server_port.argtypes = (c_void_p,)
self.lib.iperf_set_test_server_port.restype = None
self.lib.iperf_set_test_server_port.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_test_json_output.restype = c_int
self.lib.iperf_get_test_json_output.argtypes = (c_void_p,)
self.lib.iperf_set_test_json_output.restype = None
self.lib.iperf_set_test_json_output.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_verbose.restype = c_int
self.lib.iperf_get_verbose.argtypes = (c_void_p,)
self.lib.iperf_set_verbose.restype = None
self.lib.iperf_set_verbose.argtypes = (c_void_p, c_int)
self.lib.iperf_strerror.restype = c_char_p
self.lib.iperf_strerror.argtypes = (c_int,)
self.lib.iperf_get_test_server_hostname.restype = c_char_p
self.lib.iperf_get_test_server_hostname.argtypes = (c_void_p,)
self.lib.iperf_set_test_server_hostname.restype = None
self.lib.iperf_set_test_server_hostname.argtypes = (
c_void_p, c_char_p,
)
self.lib.iperf_get_test_protocol_id.restype = c_int
self.lib.iperf_get_test_protocol_id.argtypes = (c_void_p,)
self.lib.set_protocol.restype = c_int
self.lib.set_protocol.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_test_duration.restype = c_int
self.lib.iperf_get_test_duration.argtypes = (c_void_p,)
self.lib.iperf_set_test_duration.restype = None
self.lib.iperf_set_test_duration.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_test_rate.restype = c_uint64
self.lib.iperf_get_test_rate.argtypes = (c_void_p,)
self.lib.iperf_set_test_rate.restype = None
self.lib.iperf_set_test_rate.argtypes = (c_void_p, c_uint64,)
self.lib.iperf_get_test_blksize.restype = c_int
self.lib.iperf_get_test_blksize.argtypes = (c_void_p,)
self.lib.iperf_set_test_blksize.restype = None
self.lib.iperf_set_test_blksize.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_test_num_streams.restype = c_int
self.lib.iperf_get_test_num_streams.argtypes = (c_void_p,)
self.lib.iperf_set_test_num_streams.restype = None
self.lib.iperf_set_test_num_streams.argtypes = (c_void_p, c_int,)
self.lib.iperf_has_zerocopy.restype = c_int
self.lib.iperf_has_zerocopy.argtypes = None
self.lib.iperf_set_test_zerocopy.restype = None
self.lib.iperf_set_test_zerocopy.argtypes = (c_void_p, c_int,)
self.lib.iperf_get_test_reverse.restype = c_int
self.lib.iperf_get_test_reverse.argtypes = (c_void_p,)
self.lib.iperf_set_test_reverse.restype = None
self.lib.iperf_set_test_reverse.argtypes = (c_void_p, c_int,)
self.lib.iperf_run_client.restype = c_int
self.lib.iperf_run_client.argtypes = (c_void_p,)
self.lib.iperf_run_server.restype = c_int
self.lib.iperf_run_server.argtypes = (c_void_p,)
self.lib.iperf_reset_test.restype = None
self.lib.iperf_reset_test.argtypes = (c_void_p,)
try:
# Only available from iperf v3.1 and onwards
self.lib.iperf_get_test_json_output_string.restype = c_char_p
self.lib.iperf_get_test_json_output_string.argtypes = (c_void_p,)
except AttributeError:
pass
# The test C struct iperf_test
self._test = self._new()
self.defaults()
# stdout/strerr redirection variables
self._stdout_fd = os.dup(1)
self._stderr_fd = os.dup(2)
self._pipe_out, self._pipe_in = os.pipe() # no need for pipe write
# Generic test settings
self.role = role
self.json_output = True
self.verbose = verbose
def __del__(self):
"""Cleanup the test after the :class:`IPerf3` class is terminated"""
os.close(self._stdout_fd)
os.close(self._stderr_fd)
os.close(self._pipe_out)
os.close(self._pipe_in)
try:
# In the current version of libiperf, the control socket isn't
# closed on iperf_client_end(), see proposed pull request:
# https://github.com/esnet/iperf/pull/597
# Workaround for testing, don't ever do this..:
#
# sck=self.lib.iperf_get_control_socket(self._test)
# os.close(sck)
self.lib.iperf_client_end(self._test)
self.lib.iperf_free_test(self._test)
except AttributeError:
# self.lib doesn't exist, likely because iperf3 wasn't installed or
# the shared library libiperf.so.0 could not be found
pass
def _new(self):
"""Initialise a new iperf test
struct iperf_test *iperf_new_test()
"""
return self.lib.iperf_new_test()
[docs] def defaults(self):
"""Set/reset iperf test defaults."""
self.lib.iperf_defaults(self._test)
@property
def role(self):
"""The iperf3 instance role
valid roles are 'c'=client and 's'=server
:rtype: 'c' or 's'
"""
try:
self._role = c_char(
self.lib.iperf_get_test_role(self._test)
).value.decode('utf-8')
except TypeError:
self._role = c_char(
chr(self.lib.iperf_get_test_role(self._test))
).value.decode('utf-8')
return self._role
@role.setter
def role(self, role):
if role.lower() in ['c', 's']:
self.lib.iperf_set_test_role(
self._test,
c_char(role.lower().encode('utf-8'))
)
self._role = role
else:
raise ValueError("Unknown role, accepted values are 'c' and 's'")
@property
def bind_address(self):
"""The bind address the iperf3 instance will listen on
use * to listen on all available IPs
:rtype: string
"""
result = c_char_p(
self.lib.iperf_get_test_bind_address(self._test)
).value
if result:
self._bind_address = result.decode('utf-8')
else:
self._bind_address = '*'
return self._bind_address
@bind_address.setter
def bind_address(self, address):
self.lib.iperf_set_test_bind_address(
self._test,
c_char_p(address.encode('utf-8'))
)
self._bind_address = address
@property
def port(self):
"""The port the iperf3 server is listening on"""
self._port = self.lib.iperf_get_test_server_port(self._test)
return self._port
@port.setter
def port(self, port):
self.lib.iperf_set_test_server_port(self._test, int(port))
self._port = port
@property
def json_output(self):
"""Toggles json output of libiperf
Turning this off will output the iperf3 instance results to
stdout/stderr
:rtype: bool
"""
enabled = self.lib.iperf_get_test_json_output(self._test)
if enabled:
self._json_output = True
else:
self._json_output = False
return self._json_output
@json_output.setter
def json_output(self, enabled):
if enabled:
self.lib.iperf_set_test_json_output(self._test, 1)
else:
self.lib.iperf_set_test_json_output(self._test, 0)
self._json_output = enabled
@property
def verbose(self):
"""Toggles verbose output for the iperf3 instance
:rtype: bool
"""
enabled = self.lib.iperf_get_verbose(self._test)
if enabled:
self._verbose = True
else:
self._verbose = False
return self._verbose
@verbose.setter
def verbose(self, enabled):
if enabled:
self.lib.iperf_set_verbose(self._test, 1)
else:
self.lib.iperf_set_verbose(self._test, 0)
self._verbose = enabled
@property
def _errno(self):
"""Returns the last error ID
:rtype: int
"""
return c_int.in_dll(self.lib, "i_errno").value
@property
def iperf_version(self):
"""Returns the version of the libiperf library
:rtype: string
"""
# TODO: Is there a better way to get the const char than allocating 30?
VersionType = c_char * 30
return VersionType.in_dll(self.lib, "version").value.decode('utf-8')
def _error_to_string(self, error_id):
"""Returns an error string from libiperf
:param error_id: The error_id produced by libiperf
:rtype: string
"""
strerror = self.lib.iperf_strerror
strerror.restype = c_char_p
return strerror(error_id).decode('utf-8')
[docs] def run(self):
"""Runs the iperf3 instance.
This function has to be instantiated by the Client and Server
instances
:rtype: NotImplementedError
"""
raise NotImplementedError
[docs]class Client(IPerf3):
"""An iperf3 client connection.
This opens up a connection to a running iperf3 server
Basic Usage::
>>> import iperf3
>>> client = iperf3.Client()
>>> client.duration = 1
>>> client.server_hostname = '127.0.0.1'
>>> client.port = 5201
>>> client.run()
{'intervals': [{'sum': {...
"""
def __init__(self, *args, **kwargs):
"""Initialise the iperf shared library"""
super(Client, self).__init__(role='c', *args, **kwargs)
# Internal variables
self._blksize = None
self._server_hostname = None
self._port = None
self._num_streams = None
self._zerocopy = False
self._bandwidth = None
self._protocol = None
@property
def server_hostname(self):
"""The server hostname to connect to.
Accepts DNS entries or IP addresses.
:rtype: string
"""
result = c_char_p(
self.lib.iperf_get_test_server_hostname(self._test)
).value
if result:
self._server_hostname = result.decode('utf-8')
else:
self._server_hostname = None
return self._server_hostname
@server_hostname.setter
def server_hostname(self, hostname):
self.lib.iperf_set_test_server_hostname(
self._test,
c_char_p(hostname.encode('utf-8'))
)
self._server_hostname = hostname
@property
def protocol(self):
"""The iperf3 instance protocol
valid protocols are 'tcp' and 'udp'
:rtype: str
"""
proto_id = self.lib.iperf_get_test_protocol_id(self._test)
if proto_id == SOCK_STREAM:
self._protocol = 'tcp'
elif proto_id == SOCK_DGRAM:
self._protocol = 'udp'
return self._protocol
@protocol.setter
def protocol(self, protocol):
if protocol == 'tcp':
self.lib.set_protocol(self._test, int(SOCK_STREAM))
elif protocol == 'udp':
self.lib.set_protocol(self._test, int(SOCK_DGRAM))
if self.blksize > MAX_UDP_BULKSIZE:
self.blksize = MAX_UDP_BULKSIZE
self._protocol = protocol
@property
def duration(self):
"""The test duration in seconds."""
self._duration = self.lib.iperf_get_test_duration(self._test)
return self._duration
@duration.setter
def duration(self, duration):
self.lib.iperf_set_test_duration(self._test, duration)
self._duration = duration
@property
def bandwidth(self):
"""Target bandwidth in bits/sec"""
self._bandwidth = self.lib.iperf_get_test_rate(self._test)
return self._bandwidth
@bandwidth.setter
def bandwidth(self, bandwidth):
self.lib.iperf_set_test_rate(self._test, bandwidth)
self._bandwidth = bandwidth
@property
def blksize(self):
"""The test blksize."""
self._blksize = self.lib.iperf_get_test_blksize(self._test)
return self._blksize
@blksize.setter
def blksize(self, bulksize):
# iperf version < 3.1.3 has some weird bugs when bulksize is
# larger than MAX_UDP_BULKSIZE
if self.protocol == 'udp' and bulksize > MAX_UDP_BULKSIZE:
bulksize = MAX_UDP_BULKSIZE
self.lib.iperf_set_test_blksize(self._test, bulksize)
self._blksize = bulksize
@property
def bulksize(self):
"""The test bulksize.
Deprecated argument, use blksize instead to ensure consistency
with iperf3 C libary
"""
# Keeping bulksize argument for backwards compatibility with
# iperf3-python < 0.1.7
return self.blksize
@bulksize.setter
def bulksize(self, bulksize):
# Keeping bulksize argument for backwards compatibility with
# iperf3-python < 0.1.7
self.blksize = bulksize
@property
def num_streams(self):
"""The number of streams to use."""
self._num_streams = self.lib.iperf_get_test_num_streams(self._test)
return self._num_streams
@num_streams.setter
def num_streams(self, number):
self.lib.iperf_set_test_num_streams(self._test, number)
self._num_streams = number
@property
def zerocopy(self):
"""Toggle zerocopy.
Use the sendfile() system call for "Zero Copy" mode. This uses much
less CPU. This is not supported on all systems.
**Note** there isn't a hook in the libiperf library for getting the
current configured value. Relying on zerocopy.setter function
:rtype: bool
"""
return self._zerocopy
@zerocopy.setter
def zerocopy(self, enabled):
if enabled and self.lib.iperf_has_zerocopy():
self.lib.iperf_set_test_zerocopy(self._test, 1)
self._zerocopy = True
else:
self.lib.iperf_set_test_zerocopy(self._test, 0)
self._zerocopy = False
@property
def reverse(self):
"""Toggles direction of test
:rtype: bool
"""
enabled = self.lib.iperf_get_test_reverse(self._test)
if enabled:
self._reverse = True
else:
self._reverse = False
return self._reverse
@reverse.setter
def reverse(self, enabled):
if enabled:
self.lib.iperf_set_test_reverse(self._test, 1)
else:
self.lib.iperf_set_test_reverse(self._test, 0)
self._reverse = enabled
[docs] def run(self):
"""Run the current test client.
:rtype: instance of :class:`TestResult`
"""
if self.json_output:
output_to_pipe(self._pipe_in) # Disable stdout
error = self.lib.iperf_run_client(self._test)
if not self.iperf_version.startswith('iperf 3.1'):
data = read_pipe(self._pipe_out)
if data.startswith('Control connection'):
data = '{' + data.split('{', 1)[1]
else:
data = c_char_p(
self.lib.iperf_get_test_json_output_string(self._test)
).value
if data:
data = data.decode('utf-8')
output_to_screen(self._stdout_fd, self._stderr_fd) # enable stdout
if not data or error:
data = '{"error": "%s"}' % self._error_to_string(self._errno)
return TestResult(data)
[docs]class Server(IPerf3):
"""An iperf3 server connection.
This starts an iperf3 server session. The server terminates after each
succesful client connection so it might be useful to run Server.run()
in a loop.
The C function iperf_run_server is called in a seperate thread to make
sure KeyboardInterrupt(aka ctrl+c) can still be captured
Basic Usage::
>>> import iperf3
>>> server = iperf3.Server()
>>> server.run()
{'start': {...
"""
def __init__(self, *args, **kwargs):
"""Initialise the iperf3 server instance"""
super(Server, self).__init__(role='s', *args, **kwargs)
[docs] def run(self):
"""Run the iperf3 server instance.
:rtype: instance of :class:`TestResult`
"""
def _run_in_thread(self, data_queue):
"""Runs the iperf_run_server
:param data_queue: thread-safe queue
"""
output_to_pipe(self._pipe_in) # disable stdout
error = self.lib.iperf_run_server(self._test)
output_to_screen(self._stdout_fd, self._stderr_fd) # enable stdout
# TODO json_output_string not available on earlier iperf3 builds
# have to build in a version check using self.iperf_version
# The following line should work on later versions:
# data = c_char_p(
# self.lib.iperf_get_test_json_output_string(self._test)
# ).value
data = read_pipe(self._pipe_out)
if not data or error:
data = '{"error": "%s"}' % self._error_to_string(self._errno)
self.lib.iperf_reset_test(self._test)
data_queue.put(data)
if self.json_output:
data_queue = Queue()
t = threading.Thread(
target=_run_in_thread, args=[self, data_queue]
)
t.daemon = True
t.start()
while t.is_alive():
t.join(.1)
return TestResult(data_queue.get())
else:
# setting json_output to False will output test to screen only
self.lib.iperf_run_server(self._test)
self.lib.iperf_reset_test(self._test)
return None
[docs]class TestResult(object):
"""Class containing iperf3 test results.
:param text: The raw result from libiperf as text
:param json: The raw result from libiperf asjson/dict
:param error: Error captured during test, None if all ok
:param time: Start time
:param timesecs: Start time in seconds
:param system_info: System info
:param version: Iperf Version
:param local_host: Local host ip
:param local_port: Local port number
:param remote_host: Remote host ip
:param remote_port: Remote port number
:param reverse: Test ran in reverse direction
:param protocol: 'TCP' or 'UDP'
:param num_streams: Number of test streams
:param blksize:
:param omit:
:param duration: Test duration in seconds
:param local_cpu_total: The local total CPU load
:param local_cpu_user: The local user CPU load
:param local_cpu_system: The local system CPU load
:param remote_cpu_total: The remote total CPU load
:param remote_cpu_user: The remote user CPU load
:param remote_cpu_system: The remote system CPU load
TCP test specific
:param tcp_mss_default:
:param retransmits: amount of retransmits (Only returned from client)
:param sent_bytes: Sent bytes
:param sent_bps: Sent bits per second
:param sent_kbps: sent kilobits per second
:param sent_Mbps: Sent Megabits per second
:param sent_kB_s: Sent kiloBytes per second
:param sent_MB_s: Sent MegaBytes per second
:param received_bytes: Received bytes
:param received_bps: Received bits per second
:param received_kbps: Received kilobits per second
:param received_Mbps: Received Megabits per second
:param received_kB_s: Received kiloBytes per second
:param received_MB_s: Received MegaBytes per second
UDP test specific
:param bytes:
:param bps:
:param jitter_ms:
:param kbps:
:param Mbps:
:param kB_s:
:param MB_s:
:param packets:
:param lost_packets:
:param lost_percent:
:param seconds:
"""
def __init__(self, result):
"""Initialise TestResult
:param result: raw json output from :class:`Client` and :class:`Server`
"""
# The full result data
self.text = result
self.json = json.loads(result)
if 'error' in self.json:
self.error = self.json['error']
else:
self.error = None
# start time
self.time = self.json['start']['timestamp']['time']
self.timesecs = self.json['start']['timestamp']['timesecs']
# generic info
self.system_info = self.json['start']['system_info']
self.version = self.json['start']['version']
# connection details
connection_details = self.json['start']['connected'][0]
self.local_host = connection_details['local_host']
self.local_port = connection_details['local_port']
self.remote_host = connection_details['remote_host']
self.remote_port = connection_details['remote_port']
# test setup
self.tcp_mss_default = self.json['start'].get('tcp_mss_default')
self.protocol = self.json['start']['test_start']['protocol']
self.num_streams = self.json['start']['test_start']['num_streams']
self.blksize = self.json['start']['test_start']['blksize']
self.omit = self.json['start']['test_start']['omit']
self.duration = self.json['start']['test_start']['duration']
# system performance
cpu_utilization_perc = self.json['end']['cpu_utilization_percent']
self.local_cpu_total = cpu_utilization_perc['host_total']
self.local_cpu_user = cpu_utilization_perc['host_user']
self.local_cpu_system = cpu_utilization_perc['host_system']
self.remote_cpu_total = cpu_utilization_perc['remote_total']
self.remote_cpu_user = cpu_utilization_perc['remote_user']
self.remote_cpu_system = cpu_utilization_perc['remote_system']
# TCP specific test results
if self.protocol == 'TCP':
sent_json = self.json['end']['sum_sent']
self.sent_bytes = sent_json['bytes']
self.sent_bps = sent_json['bits_per_second']
recv_json = self.json['end']['sum_received']
self.received_bytes = recv_json['bytes']
self.received_bps = recv_json['bits_per_second']
# Bits are measured in 10**3 terms
# Bytes are measured in 2**10 terms
# kbps = Kilobits per second
# Mbps = Megabits per second
# kB_s = kiloBytes per second
# MB_s = MegaBytes per second
self.sent_kbps = self.sent_bps / 1000
self.sent_Mbps = self.sent_kbps / 1000
self.sent_kB_s = self.sent_bps / (8 * 1024)
self.sent_MB_s = self.sent_kB_s / 1024
self.received_kbps = self.received_bps / 1000
self.received_Mbps = self.received_kbps / 1000
self.received_kB_s = self.received_bps / (8 * 1024)
self.received_MB_s = self.received_kB_s / 1024
# retransmits only returned from client
self.retransmits = sent_json.get('retransmits')
# UDP specific test results
elif self.protocol == 'UDP':
self.bytes = self.json['end']['sum']['bytes']
self.bps = self.json['end']['sum']['bits_per_second']
self.jitter_ms = self.json['end']['sum']['jitter_ms']
self.kbps = self.bps / 1000
self.Mbps = self.kbps / 1000
self.kB_s = self.kbps / (8 * 1024)
self.MB_s = self.Mbps / 1024
self.packets = self.json['end']['sum']['packets']
self.lost_packets = self.json['end']['sum']['lost_packets']
self.lost_percent = self.json['end']['sum']['lost_percent']
self.seconds = self.json['end']['sum']['seconds']
@property
def reverse(self):
if self.json['start']['test_start']['reverse']:
return True
else:
return False
@property
def type(self):
if 'connecting_to' in self.json['start']:
return 'client'
else:
return 'server'
def __repr__(self):
"""Print the result as received from iperf3"""
return self.text