from trappy.stats.Topology import Topology
from bart.sched.SchedMultiAssert import SchedMultiAssert
from bart.sched.SchedAssert import SchedAssert
import trappy
import os
import operator
import json
#Define a CPU Topology (for multi-cluster systems)
BIG = [1, 2]
LITTLE = [0, 3, 4, 5]
CLUSTERS = [BIG, LITTLE]
topology = Topology(clusters=CLUSTERS)
BASE_PATH = "/Users/kapileshwarsingh/AnalysisRawData/LPC/sched_deadline/"
THRESHOLD = 10.0
def between_threshold(a, b):
return abs(((a - b) * 100.0) / b) < THRESHOLD
The thread periodic_yeild is woken up at 30ms intervals where it calls sched_yield and relinquishes its time-slice. The expectation is that the task will have a duty cycle < 1% and a period of 30ms.
There are two threads, and the rank=1 conveys that the condition is true for one of the threads with the name "periodic_yeild"
TRACE_FILE = os.path.join(BASE_PATH, "yield")
run = trappy.Run(TRACE_FILE, "cpuhog")
# Assert Period
s = SchedMultiAssert(run, topology, execnames="periodic_yield")
if s.assertPeriod(30, between_threshold, rank=1):
print "PASS: Period"
print json.dumps(s.getPeriod(), indent=3)
print ""
# Assert DutyCycle
if s.assertDutyCycle(1, operator.lt, window=(0,4), rank=2):
print "PASS: DutyCycle"
print json.dumps(s.getDutyCycle(window=(0,4)), indent=3)
The reservation of a CPU hogging task is set to 10ms for every 100ms. The assertion ensures a duty cycle of 10%
TRACE_FILE = os.path.join(BASE_PATH, "cpuhog")
run = trappy.Run(TRACE_FILE, "cpuhog")
s = SchedMultiAssert(run, topology, execnames="cpuhog")
s.plot().view()
# Assert DutyCycle
if s.assertDutyCycle(10, between_threshold, window=(0, 5), rank=1):
print "PASS: DutyCycle"
print json.dumps(s.getDutyCycle(window=(0, 5)), indent=3)
A CPU hogging task has reservations set in the increasing order starting from 10% followed by a 2s period of normal execution
TRACE_FILE = os.path.join(BASE_PATH, "cancel_dl_timer")
run = trappy.Run(TRACE_FILE, "cpuhog")
s = SchedAssert(run, topology, execname="cpuhog")
s.plot().view()
NUM_PHASES = 10
PHASE_DURATION = 2
start = s.getStartTime()
DUTY_CYCLE_FACTOR = 10
for phase in range(NUM_PHASES + 1):
window = (start + (phase * PHASE_DURATION),
start + ((phase + 1) * PHASE_DURATION))
if phase % 2 == 0:
DUTY_CYCLE = (phase + 2) * DUTY_CYCLE_FACTOR / 2
else:
DUTY_CYCLE = 100
print "WINDOW -> [{:.2f}, {:.2f}]".format(window[0],
window[1])
if s.assertDutyCycle(DUTY_CYCLE, between_threshold, window=window):
print "PASS: Expected={} Actual={:.2f} THRESHOLD={}".format(DUTY_CYCLE,
s.getDutyCycle(window=window),
THRESHOLD)
else:
print "FAIL: Expected={} Actual={:.2f} THRESHOLD={}".format(DUTY_CYCLE,
s.getDutyCycle(window=window),
THRESHOLD)
print ""