diff options
Diffstat (limited to 'lldb/test/python_api/event/TestEvents.py')
| -rw-r--r-- | lldb/test/python_api/event/TestEvents.py | 306 |
1 files changed, 0 insertions, 306 deletions
diff --git a/lldb/test/python_api/event/TestEvents.py b/lldb/test/python_api/event/TestEvents.py deleted file mode 100644 index a9a7edcbbfa3..000000000000 --- a/lldb/test/python_api/event/TestEvents.py +++ /dev/null @@ -1,306 +0,0 @@ -""" -Test lldb Python event APIs. -""" - -import os, time -import re -import unittest2 -import lldb, lldbutil -from lldbtest import * - -class EventAPITestCase(TestBase): - - mydir = os.path.join("python_api", "event") - - @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") - @python_api_test - @dsym_test - def test_listen_for_and_print_event_with_dsym(self): - """Exercise SBEvent API.""" - self.buildDsym() - self.do_listen_for_and_print_event() - - @python_api_test - @dwarf_test - def test_listen_for_and_print_event_with_dwarf(self): - """Exercise SBEvent API.""" - self.buildDwarf() - self.do_listen_for_and_print_event() - - @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") - @python_api_test - @dsym_test - def test_wait_for_event_with_dsym(self): - """Exercise SBListener.WaitForEvent() API.""" - self.buildDsym() - self.do_wait_for_event() - - @python_api_test - @dwarf_test - def test_wait_for_event_with_dwarf(self): - """Exercise SBListener.WaitForEvent() API.""" - self.buildDwarf() - self.do_wait_for_event() - - @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") - @python_api_test - @dsym_test - def test_add_listener_to_broadcaster_with_dsym(self): - """Exercise some SBBroadcaster APIs.""" - self.buildDsym() - self.do_add_listener_to_broadcaster() - - @python_api_test - @dwarf_test - def test_add_listener_to_broadcaster_with_dwarf(self): - """Exercise some SBBroadcaster APIs.""" - self.buildDwarf() - self.do_add_listener_to_broadcaster() - - def setUp(self): - # Call super's setUp(). - TestBase.setUp(self) - # Find the line number to of function 'c'. - self.line = line_number('main.c', '// Find the line number of function "c" here.') - - def do_listen_for_and_print_event(self): - """Create a listener and use SBEvent API to print the events received.""" - exe = os.path.join(os.getcwd(), "a.out") - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - - # Now launch the process, and do not stop at the entry point. - process = target.LaunchSimple(None, None, os.getcwd()) - self.assertTrue(process.GetState() == lldb.eStateStopped, - PROCESS_STOPPED) - - # Get a handle on the process's broadcaster. - broadcaster = process.GetBroadcaster() - - # Create an empty event object. - event = lldb.SBEvent() - - # Create a listener object and register with the broadcaster. - listener = lldb.SBListener("my listener") - rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) - self.assertTrue(rc, "AddListener successfully retruns") - - traceOn = self.TraceOn() - if traceOn: - lldbutil.print_stacktraces(process) - - # Create MyListeningThread class to wait for any kind of event. - import threading - class MyListeningThread(threading.Thread): - def run(self): - count = 0 - # Let's only try at most 4 times to retrieve any kind of event. - # After that, the thread exits. - while not count > 3: - if traceOn: - print "Try wait for event..." - if listener.WaitForEventForBroadcasterWithType(5, - broadcaster, - lldb.SBProcess.eBroadcastBitStateChanged, - event): - if traceOn: - desc = lldbutil.get_description(event) - print "Event description:", desc - print "Event data flavor:", event.GetDataFlavor() - print "Process state:", lldbutil.state_type_to_str(process.GetState()) - print - else: - if traceOn: - print "timeout occurred waiting for event..." - count = count + 1 - return - - # Let's start the listening thread to retrieve the events. - my_thread = MyListeningThread() - my_thread.start() - - # Use Python API to continue the process. The listening thread should be - # able to receive the state changed events. - process.Continue() - - # Use Python API to kill the process. The listening thread should be - # able to receive the state changed event, too. - process.Kill() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - def do_wait_for_event(self): - """Get the listener associated with the debugger and exercise WaitForEvent API.""" - exe = os.path.join(os.getcwd(), "a.out") - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - #print "breakpoint:", breakpoint - self.assertTrue(breakpoint and - breakpoint.GetNumLocations() == 1, - VALID_BREAKPOINT) - - # Get the debugger listener. - listener = self.dbg.GetListener() - - # Now launch the process, and do not stop at entry point. - error = lldb.SBError() - process = target.Launch (listener, None, None, None, None, None, None, 0, False, error) - self.assertTrue(error.Success() and process, PROCESS_IS_VALID) - - # Get a handle on the process's broadcaster. - broadcaster = process.GetBroadcaster() - self.assertTrue(broadcaster, "Process with valid broadcaster") - - # Create an empty event object. - event = lldb.SBEvent() - self.assertFalse(event, "Event should not be valid initially") - - # Create MyListeningThread to wait for any kind of event. - import threading - class MyListeningThread(threading.Thread): - def run(self): - count = 0 - # Let's only try at most 3 times to retrieve any kind of event. - while not count > 3: - if listener.WaitForEvent(5, event): - #print "Got a valid event:", event - #print "Event data flavor:", event.GetDataFlavor() - #print "Event type:", lldbutil.state_type_to_str(event.GetType()) - return - count = count + 1 - print "Timeout: listener.WaitForEvent" - - return - - # Use Python API to kill the process. The listening thread should be - # able to receive a state changed event. - process.Kill() - - # Let's start the listening thread to retrieve the event. - my_thread = MyListeningThread() - my_thread.start() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - self.assertTrue(event, - "My listening thread successfully received an event") - - def do_add_listener_to_broadcaster(self): - """Get the broadcaster associated with the process and wait for broadcaster events.""" - exe = os.path.join(os.getcwd(), "a.out") - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - #print "breakpoint:", breakpoint - self.assertTrue(breakpoint and - breakpoint.GetNumLocations() == 1, - VALID_BREAKPOINT) - - # Now launch the process, and do not stop at the entry point. - process = target.LaunchSimple(None, None, os.getcwd()) - self.assertTrue(process.GetState() == lldb.eStateStopped, - PROCESS_STOPPED) - - # Get a handle on the process's broadcaster. - broadcaster = process.GetBroadcaster() - self.assertTrue(broadcaster, "Process with valid broadcaster") - - # Create an empty event object. - event = lldb.SBEvent() - self.assertFalse(event, "Event should not be valid initially") - - # Create a listener object and register with the broadcaster. - listener = lldb.SBListener("TestEvents.listener") - rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) - self.assertTrue(rc, "AddListener successfully retruns") - - # The finite state machine for our custom listening thread, with an - # initail state of 0, which means a "running" event is expected. - # It changes to 1 after "running" is received. - # It cahnges to 2 after "stopped" is received. - # 2 will be our final state and the test is complete. - self.state = 0 - - # Create MyListeningThread to wait for state changed events. - # By design, a "running" event is expected following by a "stopped" event. - import threading - class MyListeningThread(threading.Thread): - def run(self): - #print "Running MyListeningThread:", self - - # Regular expression pattern for the event description. - pattern = re.compile("data = {.*, state = (.*)}$") - - # Let's only try at most 6 times to retrieve our events. - count = 0 - while True: - if listener.WaitForEventForBroadcasterWithType(5, - broadcaster, - lldb.SBProcess.eBroadcastBitStateChanged, - event): - desc = lldbutil.get_description(event) - #print "Event description:", desc - match = pattern.search(desc) - if not match: - break; - if self.context.state == 0 and match.group(1) == 'running': - self.context.state = 1 - continue - elif self.context.state == 1 and match.group(1) == 'stopped': - # Whoopee, both events have been received! - self.context.state = 2 - break - else: - break - print "Timeout: listener.WaitForEvent" - count = count + 1 - if count > 6: - break - - return - - # Use Python API to continue the process. The listening thread should be - # able to receive the state changed events. - process.Continue() - - # Start the listening thread to receive the "running" followed by the - # "stopped" events. - my_thread = MyListeningThread() - # Supply the enclosing context so that our listening thread can access - # the 'state' variable. - my_thread.context = self - my_thread.start() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - # We are no longer interested in receiving state changed events. - # Remove our custom listener before the inferior is killed. - broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) - - # The final judgement. :-) - self.assertTrue(self.state == 2, - "Both expected state changed events received") - - -if __name__ == '__main__': - import atexit - lldb.SBDebugger.Initialize() - atexit.register(lambda: lldb.SBDebugger.Terminate()) - unittest2.main() |
