summaryrefslogtreecommitdiff
path: root/lldb/test/python_api/event/TestEvents.py
diff options
context:
space:
mode:
authorBill Wendling <isanbard@gmail.com>2012-04-18 21:39:23 +0000
committerBill Wendling <isanbard@gmail.com>2012-04-18 21:39:23 +0000
commit392e4fbdd9b152efff4c051286f6b2c21270c902 (patch)
tree4ac339be2c4c7c596f068b59d5e512b157c7b433 /lldb/test/python_api/event/TestEvents.py
parenteb1c2bdc1f55fbc5d1e7bb86e9f0e038b0f5adb7 (diff)
Creating release_31 branchllvmorg-3.1.0-rc1
llvm-svn: 155059 llvm-svn: 155053 llvm-svn: 155051
Diffstat (limited to 'lldb/test/python_api/event/TestEvents.py')
-rw-r--r--lldb/test/python_api/event/TestEvents.py306
1 files changed, 0 insertions, 306 deletions
diff --git a/lldb/test/python_api/event/TestEvents.py b/lldb/test/python_api/event/TestEvents.py
deleted file mode 100644
index a9a7edcbbfa3..000000000000
--- a/lldb/test/python_api/event/TestEvents.py
+++ /dev/null
@@ -1,306 +0,0 @@
-"""
-Test lldb Python event APIs.
-"""
-
-import os, time
-import re
-import unittest2
-import lldb, lldbutil
-from lldbtest import *
-
-class EventAPITestCase(TestBase):
-
- mydir = os.path.join("python_api", "event")
-
- @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
- @python_api_test
- @dsym_test
- def test_listen_for_and_print_event_with_dsym(self):
- """Exercise SBEvent API."""
- self.buildDsym()
- self.do_listen_for_and_print_event()
-
- @python_api_test
- @dwarf_test
- def test_listen_for_and_print_event_with_dwarf(self):
- """Exercise SBEvent API."""
- self.buildDwarf()
- self.do_listen_for_and_print_event()
-
- @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
- @python_api_test
- @dsym_test
- def test_wait_for_event_with_dsym(self):
- """Exercise SBListener.WaitForEvent() API."""
- self.buildDsym()
- self.do_wait_for_event()
-
- @python_api_test
- @dwarf_test
- def test_wait_for_event_with_dwarf(self):
- """Exercise SBListener.WaitForEvent() API."""
- self.buildDwarf()
- self.do_wait_for_event()
-
- @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
- @python_api_test
- @dsym_test
- def test_add_listener_to_broadcaster_with_dsym(self):
- """Exercise some SBBroadcaster APIs."""
- self.buildDsym()
- self.do_add_listener_to_broadcaster()
-
- @python_api_test
- @dwarf_test
- def test_add_listener_to_broadcaster_with_dwarf(self):
- """Exercise some SBBroadcaster APIs."""
- self.buildDwarf()
- self.do_add_listener_to_broadcaster()
-
- def setUp(self):
- # Call super's setUp().
- TestBase.setUp(self)
- # Find the line number to of function 'c'.
- self.line = line_number('main.c', '// Find the line number of function "c" here.')
-
- def do_listen_for_and_print_event(self):
- """Create a listener and use SBEvent API to print the events received."""
- exe = os.path.join(os.getcwd(), "a.out")
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
-
- # Now launch the process, and do not stop at the entry point.
- process = target.LaunchSimple(None, None, os.getcwd())
- self.assertTrue(process.GetState() == lldb.eStateStopped,
- PROCESS_STOPPED)
-
- # Get a handle on the process's broadcaster.
- broadcaster = process.GetBroadcaster()
-
- # Create an empty event object.
- event = lldb.SBEvent()
-
- # Create a listener object and register with the broadcaster.
- listener = lldb.SBListener("my listener")
- rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
- self.assertTrue(rc, "AddListener successfully retruns")
-
- traceOn = self.TraceOn()
- if traceOn:
- lldbutil.print_stacktraces(process)
-
- # Create MyListeningThread class to wait for any kind of event.
- import threading
- class MyListeningThread(threading.Thread):
- def run(self):
- count = 0
- # Let's only try at most 4 times to retrieve any kind of event.
- # After that, the thread exits.
- while not count > 3:
- if traceOn:
- print "Try wait for event..."
- if listener.WaitForEventForBroadcasterWithType(5,
- broadcaster,
- lldb.SBProcess.eBroadcastBitStateChanged,
- event):
- if traceOn:
- desc = lldbutil.get_description(event)
- print "Event description:", desc
- print "Event data flavor:", event.GetDataFlavor()
- print "Process state:", lldbutil.state_type_to_str(process.GetState())
- print
- else:
- if traceOn:
- print "timeout occurred waiting for event..."
- count = count + 1
- return
-
- # Let's start the listening thread to retrieve the events.
- my_thread = MyListeningThread()
- my_thread.start()
-
- # Use Python API to continue the process. The listening thread should be
- # able to receive the state changed events.
- process.Continue()
-
- # Use Python API to kill the process. The listening thread should be
- # able to receive the state changed event, too.
- process.Kill()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- def do_wait_for_event(self):
- """Get the listener associated with the debugger and exercise WaitForEvent API."""
- exe = os.path.join(os.getcwd(), "a.out")
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
- #print "breakpoint:", breakpoint
- self.assertTrue(breakpoint and
- breakpoint.GetNumLocations() == 1,
- VALID_BREAKPOINT)
-
- # Get the debugger listener.
- listener = self.dbg.GetListener()
-
- # Now launch the process, and do not stop at entry point.
- error = lldb.SBError()
- process = target.Launch (listener, None, None, None, None, None, None, 0, False, error)
- self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
-
- # Get a handle on the process's broadcaster.
- broadcaster = process.GetBroadcaster()
- self.assertTrue(broadcaster, "Process with valid broadcaster")
-
- # Create an empty event object.
- event = lldb.SBEvent()
- self.assertFalse(event, "Event should not be valid initially")
-
- # Create MyListeningThread to wait for any kind of event.
- import threading
- class MyListeningThread(threading.Thread):
- def run(self):
- count = 0
- # Let's only try at most 3 times to retrieve any kind of event.
- while not count > 3:
- if listener.WaitForEvent(5, event):
- #print "Got a valid event:", event
- #print "Event data flavor:", event.GetDataFlavor()
- #print "Event type:", lldbutil.state_type_to_str(event.GetType())
- return
- count = count + 1
- print "Timeout: listener.WaitForEvent"
-
- return
-
- # Use Python API to kill the process. The listening thread should be
- # able to receive a state changed event.
- process.Kill()
-
- # Let's start the listening thread to retrieve the event.
- my_thread = MyListeningThread()
- my_thread.start()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- self.assertTrue(event,
- "My listening thread successfully received an event")
-
- def do_add_listener_to_broadcaster(self):
- """Get the broadcaster associated with the process and wait for broadcaster events."""
- exe = os.path.join(os.getcwd(), "a.out")
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
- #print "breakpoint:", breakpoint
- self.assertTrue(breakpoint and
- breakpoint.GetNumLocations() == 1,
- VALID_BREAKPOINT)
-
- # Now launch the process, and do not stop at the entry point.
- process = target.LaunchSimple(None, None, os.getcwd())
- self.assertTrue(process.GetState() == lldb.eStateStopped,
- PROCESS_STOPPED)
-
- # Get a handle on the process's broadcaster.
- broadcaster = process.GetBroadcaster()
- self.assertTrue(broadcaster, "Process with valid broadcaster")
-
- # Create an empty event object.
- event = lldb.SBEvent()
- self.assertFalse(event, "Event should not be valid initially")
-
- # Create a listener object and register with the broadcaster.
- listener = lldb.SBListener("TestEvents.listener")
- rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
- self.assertTrue(rc, "AddListener successfully retruns")
-
- # The finite state machine for our custom listening thread, with an
- # initail state of 0, which means a "running" event is expected.
- # It changes to 1 after "running" is received.
- # It cahnges to 2 after "stopped" is received.
- # 2 will be our final state and the test is complete.
- self.state = 0
-
- # Create MyListeningThread to wait for state changed events.
- # By design, a "running" event is expected following by a "stopped" event.
- import threading
- class MyListeningThread(threading.Thread):
- def run(self):
- #print "Running MyListeningThread:", self
-
- # Regular expression pattern for the event description.
- pattern = re.compile("data = {.*, state = (.*)}$")
-
- # Let's only try at most 6 times to retrieve our events.
- count = 0
- while True:
- if listener.WaitForEventForBroadcasterWithType(5,
- broadcaster,
- lldb.SBProcess.eBroadcastBitStateChanged,
- event):
- desc = lldbutil.get_description(event)
- #print "Event description:", desc
- match = pattern.search(desc)
- if not match:
- break;
- if self.context.state == 0 and match.group(1) == 'running':
- self.context.state = 1
- continue
- elif self.context.state == 1 and match.group(1) == 'stopped':
- # Whoopee, both events have been received!
- self.context.state = 2
- break
- else:
- break
- print "Timeout: listener.WaitForEvent"
- count = count + 1
- if count > 6:
- break
-
- return
-
- # Use Python API to continue the process. The listening thread should be
- # able to receive the state changed events.
- process.Continue()
-
- # Start the listening thread to receive the "running" followed by the
- # "stopped" events.
- my_thread = MyListeningThread()
- # Supply the enclosing context so that our listening thread can access
- # the 'state' variable.
- my_thread.context = self
- my_thread.start()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- # We are no longer interested in receiving state changed events.
- # Remove our custom listener before the inferior is killed.
- broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
-
- # The final judgement. :-)
- self.assertTrue(self.state == 2,
- "Both expected state changed events received")
-
-
-if __name__ == '__main__':
- import atexit
- lldb.SBDebugger.Initialize()
- atexit.register(lambda: lldb.SBDebugger.Terminate())
- unittest2.main()