Creating a Safe Browsing, Serverless API to scan iOS apps with Corellium & Frida on AWS Lambda
The Challenge
You're checking your messages in the morning, and your new friend James Willy has sent you a link to a cool new iOS app he's developed. While you'd like to give feedback, the last Visual Studio project he sent you didn't load properly, so to be safe, it might be a good idea to scan the IPA.
You could send the IPA's URL to Google's Safe Browsing platform and cross your fingers that their static analysis works. If only there was a Safe Browsing API that you knew worked well for iOS apps.
How difficult could it be to create a Safe Browsing API to scan for malicious apps? Must be pretty difficult, given that even Apple outsources it to Google.
Let's challenge that assumption, and create a prototype to scan iOS applications found on websites. We'll do it with zero physical devices, nothing running on our local machines once deployed, and for a few dollars a day (even with thousands of requests).
We'll solve this using:
- Corellium's virtual hardware and Python API
- Frida to scan memory for an "evil" pattern (e.g. as a starting place for you to add your own malware analysis)
- FastAPI as our RESTful endpoint that implements part of the Safe Browsing Lookup API (v4)
- AWS Lambda for deploying our solution as a serverless application
By the end of this article, you'll even have a lovely Swagger UI that looks like this!
Part 1 - Setting up Corellium
If you haven't already, go generate yourself an API token in the web UI. That's it.
Next, go ahead and create a jailbroken device on Corellium. Wait a bit for it to be booted, and that's it for setup.
For our application, you'll need to get:
- The instance ID, which can be found in the URL (circled in red)
- The project ID, which can be found in the SSH section (circled in green)
(Both of these values can be obtained from the Corellium API as well.)
Part 2 - Creating SSH tunnels in pure Python
Because we'd like to not care what OS our serverless application runs on, we're going to use pure Python (instead of calling out to an SSH binary).
First, let's create an API connection to Corellium.
configuration = corellium_api.Configuration(host="https://app.corellium.com/api")
configuration.access_token = CORELLIUM_API_TOKEN
api_client = corellium_api.ApiClient(configuration)
api_instance = corellium_api.CorelliumApi(api_client)
Let's generate a new temporary SSH key.
rsa_key = paramiko.RSAKey.generate(bits=2048)
And add it to our project.
project_key = {
"type": "ssh",
"label": "paramiko",
"key": "{} {} {}".format(rsa_key.get_name(), rsa_key.get_base64(), "paramiko"),
}
await api_instance.v1_add_project_key(PROJECT_ID, project_key)
At this point, we can create a tunnel to Corellium's gateway!
transport = paramiko.Transport(
("proxy.corellium.com", 22),
disabled_algorithms=dict(pubkeys=["rsa-sha2-512", "rsa-sha2-256"]),
)
transport.connect(
hostkey=None, # Might want to verify in production!
username=PROJECT_ID,
pkey=rsa_key,
)
Readers paying attention might wonder why the "disabled_algorithms" option is there. I'll save you the trial and error: it's because Paramiko defaults to a sig version that is unsupported by the SSH gateway on Corellium's proxy. https://stackoverflow.com/a/70567773
Using the transport tunnel to Corellium's gateway, we can now open up an SSH connection to the iOS device itself.
ssh_channel = transport.open_channel("direct-tcpip", (service_ip, 22), ("", 0))
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(
hostname=service_ip,
username="root",
password="alpine",
sock=ssh_channel,
allow_agent=False,
)
Part 3 - Port Forwarding Frida
The tunnel forwarding is a bit convoluted and not particularly interesting, so I've omitted it from this section. (I took an example from paramiko and modified it to work asynchronously.)
def forward_tunnel(local_port, remote_host, remote_port):
# Trust me, this works.
# ...
forward_tunnel(27042, service_ip, 27042
Lastly, we need to forward Frida's port. We do this instead of modifying Frida to listen on 0.0.0.0, as this allows us to disconnect WiFi if needed without Frida disconnecting.
api_instance.v1_patch_instance(
instance_id=INSTANCE_ID,
patch_instance_options={
"proxy": [
{"devicePort": 22, "routerPort": 22, "firstAvailable": True},
{
"devicePort": 32,
"routerPort": 32,
"firstAvailable": True,
"expose": True,
},
{"devicePort": 27042, "routerPort": 27042},
]
},
)
We're now able to use Frida's Python API as if we had the device plugged right into our machine!
device = frida.get_device_manager().add_remote_device("localhost:27042")
Part 4 - Frida Scripting
For the purpose of this demonstration, we're going to make a very simple Frida script that just looks for an "evil" byte pattern.
const pattern = '54 65 73 74 64 72 6f 69 64';
Process.enumerateRanges('---', {
onMatch: function (current_range) {
Memory.scan(current_range.base, current_range.size, pattern, {
onMatch(current_address, current_size) {
send(JSON.stringify({evil: true, module: Process.getModuleByAddress(current_address) }, null, 2))
return 'stop';
},
onComplete() {
}
});
},
onComplete: function () {
}
});
Part 5 - Installing apps and Frida messaging
As this service we're building takes in a URL to check, we're going to need to write a function that:
1. Downloads app at that URL.
2. Copies it to our Corellium device.
3. Installs the app on the Corellium device.
4. Runs the app with our Frida script hooks.
5. Uninstalls the app (alternatively, a safer option would be to restore the VM's state at the cost of speed).
async def install_ipa_and_run_script(ipa_url):
with urlopen(ipa_url) as zipresp:
plist_file = None
app_name = None
ipa_bytes = BytesIO(zipresp.read())
with ZipFile(ipa_bytes, mode="r") as zfile:
for zelem in zfile.infolist():
if not zelem.is_dir():
tmp_path = Path(zelem.filename)
tmp_parts = tmp_path.parts
if (
tmp_parts[0] == "Payload"
and tmp_parts[1].endswith(".app")
and tmp_parts[2] == "Info.plist"
):
plist_file = zelem.filename
break
if plist_file is not None:
with zfile.open(plist_file) as zbytes:
info_plist = plistlib.loads(zbytes.read(), fmt=plistlib.FMT_BINARY)
app_name = info_plist["CFBundleIdentifier"]
if app_name is not None:
ipa_bytes.seek(0)
ipa_hash = hashlib.sha256(ipa_bytes.read())
ipa_dest_filename = f"/tmp/{ipa_hash.hexdigest()}.ipa"
logging.info(f"Copying {app_name} to device as {ipa_dest_filename}...")
ipa_bytes.seek(0)
sftp_client.putfo(ipa_bytes, ipa_dest_filename)
logging.info(f"Installing {app_name}...")
agent_install_body = {
"path": ipa_dest_filename
}
api_response_instance_install = await api_instance.v1_agent_install_app(
instance_id=INSTANCE_ID,
agent_install_body=agent_install_body,
)
logging.info("Spawning app with Frida...")
pid = device.spawn([app_name])
device.resume(pid)
session = device.attach(pid)
script = session.create_script(
"""
... <removed>
"""
)
detected_evil = False
def on_message(message, data):
nonlocal detected_evil
parsed_message = json.loads(message["payload"])
if parsed_message["evil"]:
detected_evil = True
device.kill(pid)
script.unload()
script.on("message", on_message)
script.load()
for _ in range(10):
if detected_evil:
break
time.sleep(0.1)
logging.info(f"Uninstalling {app_name}...")
await api_instance.v1_agent_uninstall_app(
instance_id=INSTANCE_ID, bundle_id=app_name
)
session.detach()
return (detected_evil, ipa_hash)
This script buffers the entire zip into memory (meaning up to ~4GB of usage), and I would encourage the reader to spend a bit of time on optimizing it if they intend on doing something similar at scale.
Part 6 - FastAPI
For those who have not used it before, FastAPI is a handy web framework for building RESTful APIs. It's incredibly easy to use.
Here's an example of how we're going to implement the "/v4/threatMatches:find" endpoint.
app = FastAPI(
title="safe-browsing-v4-corellium", description=description, version="0.0.1"
)
@app.post(
"/v4/threatMatches:find",
responses={
200: {"model": FindThreatMatchesResponse, "description": "Successful response"},
},
tags=["threatMatches"],
response_model_by_alias=True,
)
async def safebrowsing_threat_matches_find(
key: str = Query(None, description="API key.", example="1337"),
find_threat_matches_request: FindThreatMatchesRequest = Body(None, description=""),
) -> FindThreatMatchesResponse:
"""Locates threat entries for Safe Browsing."""
# ... <scripting from part 5 here>
For brevity's sake, I'm only including one class model example. The rest can be found in the full script.
class ThreatMatch(BaseModel):
"""
ThreatMatch - A positive result for a ThreatEntry request.
cache_duration: The cache_duration of this ThreatMatch [Optional].
platform_type: The platform_type of this ThreatMatch [Optional].
threat: The threat of this ThreatMatch [Optional].
threat_entry_metadata: The threat_entry_metadata of this ThreatMatch [Optional].
threat_entry_type: The threat_entry_type of this ThreatMatch [Optional].
threat_type: The threat_type of this ThreatMatch [Optional].
"""
cache_duration: Optional[str] = Field(
alias="cacheDuration", default=None, example="30s"
)
platform_type: Optional[str] = Field(
alias="platformType", default=None, example="IOS"
)
threat: Optional[ThreatEntry] = Field(alias="threat", default=None)
threat_entry_metadata: Optional[ThreatEntryMetadata] = Field(
alias="threatEntryMetadata", default=None
)
threat_entry_type: Optional[str] = Field(
alias="threatEntryType", default=None, example="URL"
)
threat_type: Optional[str] = Field(
alias="threatType", default=None, example=["UNWANTED_SOFTWARE"]
)
Part 7 - Packaging for AWS Lambda
The only modification we need to do is adding this to our existing script.
handler = Mangum(app, lifespan="on")
Next, we'll create a CloudFormation template. This assumes this "template.yaml" file is in the root directory of our project, and we have both "app.py" and "requirements.txt" in the "python/" folder.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
safe-browsing-v4-corellium
Safe Browsing APIs (v4) SAM template for Corellium
Resources:
CorelliumLambdaFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: python/
Handler: app.handler
Runtime: python3.9
AutoPublishAlias: live
Architectures:
- arm64
MemorySize: 1024
Timeout: 60
ReservedConcurrentExecutions: 1
Environment:
Variables:
CORELLIUM_API_TOKEN: 'REMOVED'
PROJECT_ID: 'REMOVED'
INSTANCE_ID: 'REMOVED'
CorelliumLambdaURL:
Type: AWS::Lambda::Url
DependsOn: CorelliumLambdaFunctionAliaslive
Properties:
AuthType: NONE # NONE OR AWS_IAM
Qualifier: live
TargetFunctionArn: !GetAtt CorelliumLambdaFunction.Arn
Outputs:
CorelliumURL:
Description: "CorelliumLambdaFunction URL Endpoint"
Value:
Fn::GetAtt: CorelliumLambdaURL.FunctionUrl
After saving all three files, run:
sam build
Part 8 - Deploying to AWS Lambda & Costs
Simply run:
sam deploy --region us-east-2
If you run into this unhelpful error message, it means you need to request a quota of >100 concurrent executions in your AWS account.
Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [10].
In terms of costs, data and duration should be minimal, as we're running our Lambda in the same region as Corellium. Here's my current AWS bill:
Part 9 - Demo
Does it work?! Yep!
Thanks to Corellium's virtual iOS devices, creating these types of services is manageable whether you're a large tech giant or a bored person with free time on your weekend.
Unlock Superior Mobile Security Testing with Corellium
Equip your security teams with unprecedented tools for both manual and automated testing, freeing up valuable engineering time and saving money. Discover the power of Corellium’s high-fidelity virtual devices and spin-up near limitless combinations of device and OS with one-click jailbreak/root access. Book a meeting today to see how we can streamline your processes and reduce costs.