|
| 1 | +from dataclasses import dataclass |
| 2 | +from http import HTTPStatus |
1 | 3 | from time import sleep |
2 | | -from typing import Optional, List |
| 4 | +from typing import Optional |
| 5 | +from typing import List |
3 | 6 |
|
4 | 7 | from intezer_sdk import api |
5 | 8 | from intezer_sdk.api import IntezerApiClient |
6 | 9 | from intezer_sdk._api import IntezerApi |
7 | 10 |
|
8 | | - |
| 11 | +@dataclass |
9 | 12 | class Block: |
10 | | - def __init__(self, address: int, software_type: str, families: List[str]): |
11 | | - self.address = address |
12 | | - self.software_type = software_type |
13 | | - self.families = families |
14 | | - self.is_common = software_type == "common" |
| 13 | + address: int |
| 14 | + software_type: str |
| 15 | + families: List[str] |
| 16 | + |
| 17 | + @property |
| 18 | + def is_common(self): |
| 19 | + return self.software_type == 'common' |
15 | 20 |
|
16 | 21 |
|
17 | 22 | class CodeReuse: |
18 | | - """ |
| 23 | + ''' |
19 | 24 | Get code reuse for a file. |
20 | 25 |
|
21 | 26 | Parameters: |
22 | 27 | api_client (Optional[IntezerApiClient]): An optional Intezer API client instance. If not provided, the global |
23 | 28 | API client will be used. |
24 | | - """ |
| 29 | + ''' |
25 | 30 |
|
26 | 31 | def __init__(self, api_client: Optional[IntezerApiClient] = None): |
27 | 32 | self._api = IntezerApi(api_client or api.get_global_api()) |
28 | 33 |
|
29 | 34 | def _get_result_from_task(self, result_url: str): |
30 | 35 | response = self._api.api.request_with_refresh_expired_access_token( |
31 | | - "GET", result_url) |
32 | | - while response.status_code == 202: |
| 36 | + 'GET', result_url) |
| 37 | + while response.status_code == HTTPStatus.ACCEPTED: |
33 | 38 | sleep(2) |
34 | 39 | response = self._api.api.request_with_refresh_expired_access_token( |
35 | | - "GET", result_url) |
| 40 | + 'GET', result_url) |
36 | 41 | response.raise_for_status() |
37 | 42 | return response.json()['result'] |
38 | 43 |
|
39 | 44 | def get_code_blocks(self, sha256: str) -> List[Block]: |
40 | | - """ |
| 45 | + ''' |
41 | 46 | Retrieves a report containing information about reused code blocks for the given SHA-256 hash. |
42 | 47 |
|
43 | 48 | Parameters: |
44 | 49 | sha256_hash (str): The SHA-256 hash of the file to analyze. |
45 | 50 |
|
46 | 51 | Returns: |
47 | 52 | List[Block]: A sorted list of Block objects representing the code blocks found in the analysis. |
48 | | - """ |
| 53 | + ''' |
49 | 54 | result_url = self._api.get_code_reuse_by_code_block(sha256) |
50 | 55 | # This endpoint acts different. We don't get a status and instead have to use |
51 | 56 | # the HTTP status code to wait for the report. |
52 | 57 | result = self._get_result_from_task(result_url) |
53 | 58 | blocks: list[Block] = [] |
54 | | - for addr, val in result["blocks"].items(): |
| 59 | + for address, block in result['blocks'].items(): |
55 | 60 | blocks.append( |
56 | | - Block(int(addr), val["software_type"], val["code_reuse"])) |
| 61 | + Block(int(address), block['software_type'], block['code_reuse'])) |
57 | 62 | return sorted(blocks, key=lambda b: b.address) |
0 commit comments