summaryrefslogtreecommitdiff
path: root/cross-project-tests/debuginfo-tests/dexter/dex/debugger/DebuggerControllers/ConditionalController.py
blob: f84c4fd153c785bd21fd93d7d6f17f5bb46e58c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# DExTer : Debugging Experience Tester
# ~~~~~~   ~         ~~         ~   ~~
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
"""Conditional Controller Class for DExTer.-"""


import os
import time
from collections import defaultdict
from itertools import chain

from dex.debugger.DebuggerControllers.ControllerHelpers import (
    in_source_file,
    update_step_watches,
)
from dex.debugger.DebuggerControllers.DebuggerControllerBase import (
    DebuggerControllerBase,
)
from dex.debugger.DebuggerBase import DebuggerBase
from dex.utils.Exceptions import DebuggerException
from dex.utils.Timeout import Timeout
from dex.dextIR import LocIR

class BreakpointRange:
    """A range of breakpoints and a set of conditions.

    The leading breakpoint (on line `range_from`) is always active.

    When the leading breakpoint is hit the trailing range should be activated
    when `expression` evaluates to any value in `values`. If there are no
    conditions (`expression` is None) then the trailing breakpoint range should
    always be activated upon hitting the leading breakpoint.

    Args:
       expression: None for no conditions, or a str expression to compare
       against `values`.

       hit_count: None for no limit, or int to set the number of times the
                  leading breakpoint is triggered before it is removed.
    """

    def __init__(
        self,
        expression: str,
        path: str,
        range_from: int,
        range_to: int,
        values: list,
        hit_count: int,
        finish_on_remove: bool,
        is_continue: bool = False,
        function: str = None,
        addr: str = None,
    ):
        self.expression = expression
        self.path = path
        self.range_from = range_from
        self.range_to = range_to
        self.conditional_values = values
        self.max_hit_count = hit_count
        self.current_hit_count = 0
        self.finish_on_remove = finish_on_remove
        self.is_continue = is_continue
        self.function = function
        self.addr = addr

    def limit_steps(
        expression: str,
        path: str,
        range_from: int,
        range_to: int,
        values: list,
        hit_count: int,
    ):
        return BreakpointRange(
            expression,
            path,
            range_from,
            range_to,
            values,
            hit_count,
            False,
        )

    def finish_test(
        expression: str, path: str, on_line: int, values: list, hit_count: int
    ):
        return BreakpointRange(
            expression,
            path,
            on_line,
            on_line,
            values,
            hit_count,
            True,
        )

    def continue_from_to(
        expression: str,
        path: str,
        from_line: int,
        to_line: int,
        values: list,
        hit_count: int,
    ):
        return BreakpointRange(
            expression,
            path,
            from_line,
            to_line,
            values,
            hit_count,
            finish_on_remove=False,
            is_continue=True,
        )

    def step_function(function: str, path: str, hit_count: int):
        return BreakpointRange(
            None,
            path,
            None,
            None,
            None,
            hit_count,
            finish_on_remove=False,
            is_continue=False,
            function=function,
        )

    def has_conditions(self):
        return self.expression is not None

    def get_conditional_expression_list(self):
        conditional_list = []
        for value in self.conditional_values:
            # (<expression>) == (<value>)
            conditional_expression = "({}) == ({})".format(self.expression, value)
            conditional_list.append(conditional_expression)
        return conditional_list

    def add_hit(self):
        self.current_hit_count += 1

    def should_be_removed(self):
        if self.max_hit_count is None:
            return False
        return self.current_hit_count >= self.max_hit_count


class ConditionalController(DebuggerControllerBase):
    def __init__(self, context, step_collection):
        self._bp_ranges = None
        self._watches = set()
        self._step_index = 0
        self._pause_between_steps = context.options.pause_between_steps
        self._max_steps = context.options.max_steps
        # Map {id: BreakpointRange}
        self._leading_bp_handles = {}
        super(ConditionalController, self).__init__(context, step_collection)
        self._build_bp_ranges()

    def _build_bp_ranges(self):
        commands = self.step_collection.commands
        self._bp_ranges = []

        cond_controller_cmds = ["DexLimitSteps", "DexStepFunction", "DexContinue"]
        if not any(c in commands for c in cond_controller_cmds):
            raise DebuggerException(
                f"No conditional commands {cond_controller_cmds}, cannot conditionally step."
            )

        if "DexLimitSteps" in commands:
            for c in commands["DexLimitSteps"]:
                bpr = BreakpointRange.limit_steps(
                    c.expression,
                    c.path,
                    c.from_line,
                    c.to_line,
                    c.values,
                    c.hit_count,
                )
                self._bp_ranges.append(bpr)
        if "DexFinishTest" in commands:
            for c in commands["DexFinishTest"]:
                bpr = BreakpointRange.finish_test(
                    c.expression, c.path, c.on_line, c.values, c.hit_count + 1
                )
                self._bp_ranges.append(bpr)
        if "DexContinue" in commands:
            for c in commands["DexContinue"]:
                bpr = BreakpointRange.continue_from_to(
                    c.expression, c.path, c.from_line, c.to_line, c.values, c.hit_count
                )
                self._bp_ranges.append(bpr)
        if "DexStepFunction" in commands:
            for c in commands["DexStepFunction"]:
                bpr = BreakpointRange.step_function(
                    c.get_function(), c.path, c.hit_count
                )
                self._bp_ranges.append(bpr)

    def _set_leading_bps(self):
        # Set a leading breakpoint for each BreakpointRange, building a
        # map of {leading bp id: BreakpointRange}.
        for bpr in self._bp_ranges:
            if bpr.has_conditions():
                # Add a conditional breakpoint for each condition.
                for cond_expr in bpr.get_conditional_expression_list():
                    id = self.debugger.add_conditional_breakpoint(
                        bpr.path, bpr.range_from, cond_expr
                    )
                    self._leading_bp_handles[id] = bpr
            elif bpr.function is not None:
                id = self.debugger.add_function_breakpoint(bpr.function)
                self._leading_bp_handles[id] = bpr
            else:
                # Add an unconditional breakpoint.
                id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
                self._leading_bp_handles[id] = bpr

    def _run_debugger_custom(self, cmdline):
        # TODO: Add conditional and unconditional breakpoint support to dbgeng.
        if self.debugger.get_name() == "dbgeng":
            raise DebuggerException(
                "DexLimitSteps commands are not supported by dbgeng"
            )

        self.step_collection.clear_steps()
        self._set_leading_bps()

        for command_obj in chain.from_iterable(self.step_collection.commands.values()):
            self._watches.update(command_obj.get_watches())

        self.debugger.launch(cmdline)
        time.sleep(self._pause_between_steps)

        exit_desired = False
        timed_out = False
        total_timeout = Timeout(self.context.options.timeout_total)

        step_function_backtraces: list[list[str]] = []
        self.instr_bp_ids = set()

        while not self.debugger.is_finished:
            breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
            while self.debugger.is_running and not timed_out:
                # Check to see whether we've timed out while we're waiting.
                if total_timeout.timed_out():
                    self.context.logger.error(
                        "Debugger session has been "
                        f"running for {total_timeout.elapsed}s, timeout reached!"
                    )
                    timed_out = True
                if breakpoint_timeout.timed_out():
                    self.context.logger.error(
                        f"Debugger session has not "
                        f"hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout "
                        "reached!"
                    )
                    timed_out = True

            if timed_out or self.debugger.is_finished:
                break

            step_info = self.debugger.get_step_info(self._watches, self._step_index)
            backtrace = None
            if step_info.current_frame:
                backtrace = [f.function for f in step_info.frames]

            record_step = False
            debugger_continue = False
            bp_to_delete = []
            for bp_id in self.debugger.get_triggered_breakpoint_ids():
                try:
                    # See if this is one of our leading breakpoints.
                    bpr = self._leading_bp_handles[bp_id]
                    record_step = True
                except KeyError:
                    # This is a trailing bp. Mark it for removal.
                    bp_to_delete.append(bp_id)
                    if bp_id in self.instr_bp_ids:
                        self.instr_bp_ids.remove(bp_id)
                    else:
                        record_step = True
                    continue

                bpr.add_hit()
                if bpr.should_be_removed():
                    if bpr.finish_on_remove:
                        exit_desired = True
                    bp_to_delete.append(bp_id)
                    del self._leading_bp_handles[bp_id]

                if bpr.function is not None:
                    if step_info.frames:
                        # Add this backtrace to the stack. While the current
                        # backtrace matches the top of the stack we'll step,
                        # and while there's a backtrace in the stack that
                        # is a subset of the current backtrace we'll step-out.
                        if (
                            len(step_function_backtraces) == 0
                            or backtrace != step_function_backtraces[-1]
                        ):
                            step_function_backtraces.append(backtrace)

                            # Add an address breakpoint so we don't fall out
                            # the end of nested DexStepFunctions with a DexContinue.
                            addr = self.debugger.get_pc(frame_idx=1)
                            instr_id = self.debugger.add_instruction_breakpoint(addr)
                            # Note the breakpoint so we don't log the source location
                            # it in the trace later.
                            self.instr_bp_ids.add(instr_id)

                elif bpr.is_continue:
                    debugger_continue = True
                    if bpr.range_to is not None:
                        self.debugger.add_breakpoint(bpr.path, bpr.range_to)

                else:
                    # Add a range of trailing breakpoints covering the lines
                    # requested in the DexLimitSteps command. Ignore first line as
                    # that's covered by the leading bp we just hit and include the
                    # final line.
                    for line in range(bpr.range_from + 1, bpr.range_to + 1):
                        id = self.debugger.add_breakpoint(bpr.path, line)

            # Remove any trailing or expired leading breakpoints we just hit.
            self.debugger.delete_breakpoints(bp_to_delete)

            debugger_next = False
            debugger_out = False
            if not debugger_continue and step_info.current_frame and step_info.frames:
                while len(step_function_backtraces) > 0:
                    match_subtrace = False  # Backtrace contains a target trace.
                    match_trace = False  # Backtrace matches top of target stack.

                    # The top of the step_function_backtraces stack contains a
                    # backtrace that we want to step through. Check if the
                    # current backtrace ("backtrace") either matches that trace
                    # or otherwise contains it.
                    target_backtrace = step_function_backtraces[-1]
                    if len(backtrace) >= len(target_backtrace):
                        match_trace = len(backtrace) == len(target_backtrace)
                        # Check if backtrace contains target_backtrace, matching
                        # from the end (bottom of call stack) backwards.
                        match_subtrace = (
                            backtrace[-len(target_backtrace) :] == target_backtrace
                        )

                    if match_trace:
                        # We want to step through this function; do so and
                        # log the steps in the step trace.
                        debugger_next = True
                        record_step = True
                        break
                    elif match_subtrace:
                        # There's a function we care about buried in the
                        # current backtrace. Step-out until we get to it.
                        debugger_out = True
                        break
                    else:
                        # Drop backtraces that are not match_subtraces of the current
                        # backtrace; the functions we wanted to step through
                        # there are no longer reachable.
                        step_function_backtraces.pop()

            if record_step and step_info.current_frame:
                self._step_index += 1
                # Record the step.
                update_step_watches(
                    step_info, self._watches, self.step_collection.commands
                )
                self.step_collection.new_step(self.context, step_info)

            if exit_desired:
                break
            elif debugger_next:
                self.debugger.step_next()
            elif debugger_out:
                self.debugger.step_out()
            else:
                self.debugger.go()
            time.sleep(self._pause_between_steps)