Hi, I’m Vilian Iaumbaev! I recently built a system that automatically handles new crash reports for both iOS and Android — it makes tracking and fixing issues a whole lot easier.

Why we did this.

Manually assigning crashes to the right developers quickly became a tedious and unreliable task. It was easy to forget something, overlook edge cases, or skip over complex issues. We wanted to make the process more predictable and systematic — so that no one on the team had to waste time triaging crashes, and no critical issue would fall through the cracks.

Summary

To get started, you’ll need Google tools like CrashlyticsGoogle Cloud Platform, and Jira.

Once your projects are set up in Google services, configure data transfer from Crashlytics to GCP using the integration page. After that, all crash reports will appear in BigQuery tables.

The crash data structures for iOS and Android are nearly identical, with just a few small differences — which means we can use a single script to process both.

So now you have your crashes in BigQuery which means you can execute some work on this data. You can request all crash data and analyze it as you want on your side.

I have choosen Python language and will explain you on this example. Firstly we need to get all crashes data to be analyzied but if you have large amount of data on over of million users you better to preprocess all data on Google side, make some aggregations.

Plan

  1. Learn some basic SQL to get crash data from BigQuery
  2. Query crash data using Python
  3. Get all committers from the repository and merge duplicates
  4. Map each issue to a repo file and its owner
  5. Create a Jira task for the file owner, if a task doesn’t already exist

Learn some SQL basics to get data from BigQuery

BigQuery uses its own SQL dialect, which is similar to standard SQL but offers additional convenience for data analysis. For our integration, we needed to work with the complete crash dataset, but in an aggregated form. Specifically, we grouped individual crash reports into unique crash signatures and then aggregated relevant data within each group — such as the number of occurrences, affected user count, version breakdown, and more. You can find the SQL script below and test it in your own environment via the following link: https://console.cloud.google.com/bigquery

WITH pre as(
SELECT
    issue_id,
    ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles,
    ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files,
    ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries,
    ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols,
    COUNT(DISTINCT event_id) as total_events,
    COUNT(DISTINCT installation_uuid) as total_users,
    '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) 
        || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info
    FROM `YOUR TABLE NAME`
    WHERE 1=1
        AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
        AND event_timestamp < CURRENT_TIMESTAMP()
        AND error_type = "FATAL"
    GROUP BY issue_id, application.display_version
)

SELECT
    issue_id,
    ARRAY_CONCAT_AGG(issue_titles) as issue_titles,
    ARRAY_CONCAT_AGG(blame_files) as blame_files,
    ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries,
    ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols,
    SUM(total_events) as total_events,
    SUM(total_users) as total_users,
    '[' || STRING_AGG(events_info, ",") || ']' as events_info
FROM pre
WHERE 1=1
    AND issue_id IS NOT NULL
    AND events_info IS NOT NULL
GROUP BY issue_id
ORDER BY total_users DESC

As a result, you will get one row per unique issue_id, along with the following aggregated fields:

[
  { "version": "1.0.1", "events": 131, "users": 110 },
  { "version": "1.2.1", "events": 489, "users": 426 }
]

Request crashes data from BigQuery using Python

To get started, install the BigQuery Python client library from PyPI. After the installation, create a BigQueryExecutor.py file — this module will handle all communication with Google Cloud BigQuery.

import os
import json
import tempfile
from google.oauth2 import service_account
from google.cloud import bigquery
from collections import Counter

class BigQueryExecutor:
    def __init__(self, credentialsJson: str, bqProjectId: str = ''):
        temp_file_path=''
        with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
            json.dump(json.loads(credentialsJson), temp_file, indent=4)
            temp_file_path = temp_file.name
        credentials = service_account.Credentials.from_service_account_file(temp_file_path)
        os.remove(temp_file_path)
        self.client = bigquery.Client(project=bqProjectId, credentials=credentials)

To start using the script, you’ll need just two things:

Once you have these, you can authenticate and start executing queries through the script.

Google service account json credential

To create a service account, go to Google Cloud Console and assign it the BigQuery Data Editor role.

Once the account is created, open it, navigate to the “Keys” tab, click “Add key”, and choose “JSON”. This will generate and download a JSON credentials file for the service account.

A service account JSON typically looks like this:

{
  "type": "service_account",
  "project_id": YOUR_PROJECT,
  "private_key_id": private_key_id,
  "private_key": GCP_PRIVATE_KEY,
  "client_email": "email",
  "client_id": "id",
  "auth_uri": "auth_uri",
  "token_uri": "token_uri",
  "auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
  "client_x509_cert_url": "url",
  "universe_domain": "universe_domain"
}

For testing purposes, you can convert the JSON credentials into a single-line string and embed it directly into your script. However, this approach is not recommended for production — use a Secrets Manager to securely store and manage your credentials instead.

You can also extract your bqProjectId from the project_id field inside the credentials JSON.

Models

To work with BigQuery data in a type-safe manner, it’s useful to define data models that reflect the structure of the query results. This allows you to write cleaner, safer, and more maintainable code.

Below is an example of such model classes:

class BQCrashlyticsVersionsModel:
    def __init__(self, 
                 version: str,
                 events: int, 
                 users: int
                 ):
        self.version = version
        self.events = events
        self.users = users

class BQCrashlyticsIssueModel:
    def __init__(self, 
                 issue_id: str, 
                 issue_title: str,
                 blame_file: str, 
                 blame_library: str, 
                 blame_symbol: str, 
                 total_events: int, 
                 total_users: int, 
                 versions: list[BQCrashlyticsVersionsModel]
                 ):
        self.issue_id = issue_id
        self.issue_title = issue_title
        self.blame_file = blame_file
        self.blame_library = blame_library
        self.blame_symbol = blame_symbol
        self.total_events = total_events
        self.total_users = total_users
        self.versions = versions

getCrashlyticsIssues Function

And finally, we can fetch data from BigQuery.

Add the following method to your existing BigQueryExecutor class — it will execute the SQL query described earlier in the BigQuery SQL section and return the results parsed into model instances.

    def getCrashlyticsIssues(self, lastHoursCount: int, tableName: str) -> list[BQCrashlyticsIssueModel]:
        firstEventsInfo = """'[' || STRING_AGG(events_info, ",") || ']' as events_info"""
        asVersions = """
        '{"version":"' || application.display_version || '","events":' || COUNT(DISTINCT event_id) 
        || ',"users":' || COUNT(DISTINCT installation_uuid) || '}' AS events_info
        """
        query = f"""
        WITH pre as(
        SELECT
            issue_id,
            ARRAY_AGG(DISTINCT issue_title IGNORE NULLS) as issue_titles,
            ARRAY_AGG(DISTINCT blame_frame.file IGNORE NULLS) as blame_files,
            ARRAY_AGG(DISTINCT blame_frame.library IGNORE NULLS) as blame_libraries,
            ARRAY_AGG(DISTINCT blame_frame.symbol IGNORE NULLS) as blame_symbols,
            COUNT(DISTINCT event_id) as total_events,
            COUNT(DISTINCT installation_uuid) as total_users,
            {asVersions}
            FROM `{tableName}`
            WHERE 1=1
                AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {lastHoursCount} HOUR)
                AND event_timestamp < CURRENT_TIMESTAMP()
                AND error_type = "FATAL"
            GROUP BY issue_id, application.display_version
        )

        SELECT
            issue_id,
            ARRAY_CONCAT_AGG(issue_titles) as issue_titles,
            ARRAY_CONCAT_AGG(blame_files) as blame_files,
            ARRAY_CONCAT_AGG(blame_libraries) as blame_libraries,
            ARRAY_CONCAT_AGG(blame_symbols) as blame_symbols,
            SUM(total_events) as total_events,
            SUM(total_users) as total_users,
            {firstEventsInfo}
        FROM pre
        WHERE 1=1
            AND issue_id IS NOT NULL
            AND events_info IS NOT NULL
        GROUP BY issue_id
        ORDER BY total_users DESC
        """
        bqRows = self.client.query(query).result()
        rows: list[BQCrashlyticsIssueModel] = []
        def mergeArray(array: list[str]) -> str:
            if not array:
                return ''
            counter = Counter(array)
            most_common = counter.most_common(1)
            return most_common[0][0] if most_common else ''
        for row in bqRows:
            issueModel = BQCrashlyticsIssueModel(
                issue_id=row.issue_id,
                issue_title=mergeArray(row.issue_titles),
                blame_file=mergeArray(row.blame_files), 
                blame_library=mergeArray(row.blame_libraries), 
                blame_symbol=mergeArray(row.blame_symbols),
                total_events=row.total_events,
                total_users=row.total_users,
                versions=[BQCrashlyticsVersionsModel(version=jj['version'], events=jj['events'], users=jj['users']) for jj in json.loads(row.events_info)]
            )
            rows.append(issueModel)
        return rows

Now we can execute our SQL request to BigQuery directly from Python.

Here’s a full example of how to run the query and work with the results:

executor = BigQueryExecutor(credentialsJson=_jsonToken, bqProjectId=_bqProjectId)
allCrashes = executor.getCrashlyticsIssues(lastHoursCount=24, tableName="tableNAME_IOS_REALTIME")
for i in allCrashes:
    print(i.issue_title)
[libswift_Concurrency.dylib] swift::swift_Concurrency_fatalError(unsigned int, char const*, ...)
[SwiftUI] static DisplayList.ViewUpdater.Platform.updateClipShapesAsync(layer:oldState:newState:)
Foundation
[SwiftUI] __swift_memcpy112_8
[libswift_Concurrency.dylib] swift::AsyncTask::waitFuture(swift::AsyncTask*, swift::AsyncContext*, void (swift::AsyncContext* swift_async_context) swiftasynccall*, swift::AsyncContext*, swift::OpaqueValue*)
[libobjc.A.dylib] objc_opt_respondsToSelector
[VectorKit] void md::COverlayRenderLayer::layoutRibbon<md::Ribbons::PolylineOverlayRibbonDescriptor>(std::__1::unique_ptr<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor>, std::__1::default_delete<md::PolylineOverlayLayer<md::Ribbons::PolylineOverlayRibbonDescriptor> > > const&, ggl::CommandBuffer*, md::PolylineOverlayLayoutContext&, unsigned int, unsigned long long, bool, bool, float)
[libswiftCore.dylib] _swift_release_dealloc
[libobjc.A.dylib] objc_msgSend

Hooray! 🎉 Now that we’re able to fetch crash data from BigQuery, we can move on to the next step — taking the top 5 most frequent crashes and automatically creating Jira tasks for them.

Get all commiters of repository and merge them

Before assigning crash issues to developers, we first need to identify potential owners for each crash. To do that, we’ll start by gathering all commit authors from the repository.

Since we’re using GitHub, we should be aware of a few specific details:

The main goal at this step is to extract and normalize the list of Git authors with their names and emails, using the following command:

git log | grep ‘^Author’ | sort | uniq -c

import re

class GitUserModel:
    def __init__(self,
                 nicknames: set[str], 
                 emails: set[str], 
                 gitLogins: set[str]
                 ):
        self.nicknames = nicknames
        self.emails = emails
        self.gitLogins = gitLogins
    
def returnPossibleNicknames(text: str) -> set[str]:
    res = [findEmail(text), loginFromEmail(text), findGitNoreplyLogin(text)]
    return set(list(filter(None, res)))

def findEmail(text: str) -> str:
    e = re.match(r"(([A-Za-z0-9+\.\_\-]*@[A-Za-z0-9+]*\.[A-Za-z0-9+]*))", text)
    if e:
        return e.group(1)

def loginFromEmail(text: str) -> str:
    e = re.match(r"(([A-Za-z0-9+\.\_\-]*))@[A-Za-z0-9+]*\.[A-Za-z0-9+]*", text)
    if e:
        return e.group(1)

def findGitNoreplyLogin(text: str) -> str:
    gu = re.match(r"\d+\+(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text)
    if gu:
        return gu.group(1)
    else:
        gu = re.match(r"(([A-Za-z0-9+\.\_\-]*))@users\.noreply\.github\.com", text)
        if gu:
            return gu.group(1)

class GitBlamer:
    def getAllRepoUsersMap(self, projectRootPath: str) -> list[GitUserModel]:
        users: list[GitUserModel] = []
        allGitLog = os.popen("cd {}; git log | grep '^Author' | sort | uniq -c".format(projectRootPath)).read()
        for line in allGitLog.split('\n'):
            user = self._createUserFromBlameLine(line)
            if user:
                users.append(user)
        self._enrichUsersNicknames(users=users)
        users = self._mergeSameUsers(users)
        users = sorted(users, key=lambda x: list(x.emails)[0] if x.emails else list(x.gitLogins)[0] if x.gitLogins else "")
        return users
    
    def _createUserFromBlameLine(self, line):
        m = re.match(r".* Author: (.*) <(.*)>", line)
        user = GitUserModel(nicknames=set(), emails=set(), gitLogins=set())
        if m:
            val=set()
            if m.group(1): val.add(m.group(1))
            if m.group(2): val.add(m.group(2))
            user.nicknames = val
        else:
            return
        return user
    
    def _enrichUsersNicknames(self, users: list[GitUserModel]):
        for user in users:
            possibleNicknames = set()
            for nick in user.nicknames:
                possibleNicknames = possibleNicknames.union(returnPossibleNicknames(text=nick))
                e = findEmail(text=nick)
                if e:
                    user.emails.add(e)
                gu = findGitNoreplyLogin(text=nick)
                if gu:
                    user.gitLogins.add(gu)
            user.nicknames = user.nicknames.union(possibleNicknames)

    def _mergeSameUsers(self, users: list[GitUserModel]):
        for i in range(0, len(users)):
            if i >= len(users): break
            for j in range(i+1, len(users)):
                if j >= len(users): break
                setLoweredJNicknames=set([u.lower() for u in users[j].nicknames])
                for k in range(0, j):
                    if k >= j: break
                    setLoweredKNicknames=set([u.lower() for u in users[k].nicknames])
                    isSameNickname=len(setLoweredKNicknames.intersection(setLoweredJNicknames)) > 0
                    if isSameNickname:
                        users[j].gitLogins = users[j].gitLogins.union(users[k].gitLogins)
                        users[j].emails = users[j].emails.union(users[k].emails)
                        users.pop(k)
                        break
        return users

In the code below, we attempt to match different commit identities that likely belong to the same person — for example, [email protected] and [email protected]. We also extract and group their names and GitHub usernames (where available) for convenience.

With the script below, you can launch this process and get a cleaned, deduplicated list of all committers in the repository:

projectRootPath="/IOS_project_path"
blamer = GitBlamer()
allUsers = blamer.getAllRepoUsersMap(projectRootPath=projectRootPath)
for user in allUsers:
    print(", ".join(user.nicknames))

Map each issue to file of repository and file owner

At this point, we have detailed information about our crashes and the users affected by them. This allows us to associate a specific crash with a specific user and automatically create a corresponding Jira task.

Before implementing the crash-to-user mapping logic, we separated the workflows for iOS and Android. These platforms use different symbol formats, and the criteria for linking crash files to issues also differ. To handle this cleanly, we introduced an abstract class with platform-specific implementations, enabling us to encapsulate the differences and solve the problem in a structured way.

class AbstractFileToIssueMapper:
    def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool:
        raise Exception('Not implemented method AbstractFileToIssueMapper')
    
class AndroidFileToIssueMapper(AbstractFileToIssueMapper):
    def __init__(self):
        self.libraryName = 'inDrive'

    def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool:
        if file != issue.blame_file or not issue.blame_symbol.startswith(self.libraryName):
            return False
        fileNameNoExtension = file.split('.')[0]
        fileNameIndexInSymbol = issue.blame_symbol.find(fileNameNoExtension)
        if fileNameIndexInSymbol < 0:
            return False
        relativeFilePathFromSymbol = issue.blame_symbol[0:fileNameIndexInSymbol].replace('.', '/')
        relativeFilePathFromSymbol = relativeFilePathFromSymbol + file
        return filePath.endswith(relativeFilePathFromSymbol)
    
class IosFileToIssueMapper(AbstractFileToIssueMapper):
    def __init__(self):
        self.indriveLibraryName = 'inDrive'
        self.indriveFolderName = 'inDrive'
        self.modulesFolderName = 'Modules'
        
    def isPathBelongsToIssue(self, file: str, filePath: str, issue: BQCrashlyticsIssueModel) -> bool:
        if file != issue.blame_file:
            return False
        isMatchFolder = False
        if issue.blame_library == self.indriveLibraryName: 
            isMatchFolder = filePath.startswith('{}/'.format(self.indriveFolderName))
        else:
            isMatchFolder = filePath.startswith('{}/{}/'.format(self.modulesFolderName, issue.blame_library))
        return isMatchFolder

The specific implementation may vary depending on your project, but the main responsibility of this class is to determine whether a given crash occurred in a particular file.

Once this logic is in place, we can proceed to map files to issues and assign them to the corresponding file owners.

import subprocess

class MappedIssueFileModel:
    def __init__(self,
                 fileGitLink: str,
                 filePath: str,
                 issue: BQCrashlyticsIssueModel,
                 fileOwner: GitUserModel
                 ):
        self.fileGitLink = fileGitLink
        self.filePath = filePath
        self.issue = issue
        self.fileOwner = fileOwner

class BigQueryCrashesFilesMapper:
    def getBlameOut(self, filePath: str, projectRootPath: str) -> list[str]:
        dangerousChars = re.compile(r'[;|&\r\n]|\.\.')
        if dangerousChars.search(filePath) or dangerousChars.search(projectRootPath):
            return None
        if not subprocess.check_output(['git', 'ls-files', filePath], cwd=projectRootPath, text=True):
            return None
        blameProc = subprocess.Popen(['git', 'blame', filePath, '-cwe'], cwd=projectRootPath, stdout=subprocess.PIPE, text=True)
        blameRegex=r'<[a-zA-Z0-9\+\.\_\-]*@[a-zA-Z0-9\+\.\_\-]*>'
        grepProc = subprocess.Popen(['grep', '-o', blameRegex], stdin=blameProc.stdout, stdout=subprocess.PIPE, text=True)
        blameProc.stdout.close()
        sortProc = subprocess.Popen(['sort'], stdin=grepProc.stdout, stdout=subprocess.PIPE, text=True)
        grepProc.stdout.close()
        uniqProc = subprocess.Popen(['uniq', '-c'], stdin=sortProc.stdout, stdout=subprocess.PIPE, text=True)
        sortProc.stdout.close()
        finalProc = subprocess.Popen(['sort', '-bgr'], stdin=uniqProc.stdout, stdout=subprocess.PIPE, text=True)
        uniqProc.stdout.close()
        blameOut, _ = finalProc.communicate()
        blameArray=list(filter(len, blameOut.split('\n')))
        return blameArray

    def findFileOwner(self, filePath: str, gitFileOwners: list[GitUserModel], projectRootPath: str) -> GitUserModel:
        blameArray = self.getBlameOut(filePath=filePath, projectRootPath=projectRootPath)
        if not blameArray:
            return
        foundAuthor = None
        for blameI in range(0, len(blameArray)):
            author = re.match(r".*\d+ <(.*)>", blameArray[blameI])
            if author:
                possibleNicknames = returnPossibleNicknames(text=author.group(1))
                for gitFileOwner in gitFileOwners:
                    if len(gitFileOwner.nicknames.intersection(possibleNicknames)) > 0:
                        foundAuthor = gitFileOwner
                        break
                if foundAuthor:
                    break
        return foundAuthor

    def mapBQResultsWithFiles(self,
                              fileToIssueMapper: AbstractFileToIssueMapper,
                              issues: list[BQCrashlyticsIssueModel], 
                              gitFileOwners: list[GitUserModel],
                              projectRootPath: str
                              ) -> list[MappedIssueFileModel]:
        mappedArray: list[MappedIssueFileModel] = []
        githubMainBranch = "https://github.com/inDriver/UDF/blob/master"
        for root, dirs, files in os.walk(projectRootPath):
            for file in files:
                filePath = os.path.join(root, file).removeprefix(projectRootPath).strip('/')
                gitFileOwner = None
                for issue in issues:
                    if fileToIssueMapper.isPathBelongsToIssue(file, filePath, issue):
                        if not gitFileOwner:
                            gitFileOwner = self.findFileOwner(filePath=filePath, gitFileOwners=gitFileOwners, projectRootPath=projectRootPath)
                        mappedIssue = MappedIssueFileModel(
                            fileGitLink='{}/{}'.format(githubMainBranch, filePath.strip('/')),
                            filePath=filePath,
                            issue=issue,
                            fileOwner=gitFileOwner
                            )
                        mappedArray.append(mappedIssue)
        mappedArray.sort(key=lambda x: x.issue.total_users, reverse=True)
        return mappedArray

All you need to do at this point is update the githubMainBranch property with the link to your own repository.

Next, we gather issues and file owners, map files accordingly using the code below, and get a final result — a list of issues sorted by total_users in descending order.

mapper = BigQueryCrashesFilesMapper()
mappedIssues = mapper.mapBQResultsWithFiles(
    fileToIssueMapper=IosFileToIssueMapper(),
    issues=allCrashes,
    gitFileOwners=allUsers,
    projectRootPath=projectRootPath
    )
for issue in mappedIssues:
    if issue.fileOwner:
        print(list(issue.fileOwner.nicknames)[0], issue.issue.total_users, issue.issue.issue_title)
    else:
        print('no owner', issue.issue.total_users, issue.issue.issue_title)

Create Jira task for a file owner of the crash

At this point, we have everything we need to start creating Jira tasks for crash owners. However, keep in mind that Jira configurations often vary between companies — custom fields, workflows, and permissions may differ. I recommend referring to the official Jira API documentation and using their official Python client to ensure compatibility with your setup.

Here are some practical tips based on our experience:

  1. Don’t create tasks for every issue. Focus on the top 5–10 issues based on the number of affected users or a certain impact threshold.
  2. Persist task metadata. Store information about created tasks in a persistent storage. I use BigQuery, saving data in a separate table and updating it on each script run.
  3. Recreate closed tasks if the issue reappears in newer versions of the app — this ensures that regressions aren’t ignored.
  4. Link tasks for the same issue to simplify future investigation and avoid duplication.
  5. Include as much detail as possible in the task description. Add crash aggregations, affected user counts, versions, etc.
  6. Link related crashes if they originate from the same file — this provides additional context.
  7. Notify your team in Slack (or another messaging system) when new tasks are created or existing ones need attention. Include helpful links to the crash report, task, relevant GitHub files, etc.
  8. Add error handling to your script. Use try/except blocks and send Slack alerts when something fails.
  9. Cache slow operations during development. For example, cache BigQuery crash retrievals locally to speed up iteration.
  10. Some crashes may involve shared or core libraries. In those cases, you’ll likely need to manually assign the task, but it’s still helpful to create the Jira issue automatically with full crash context.

Conclusion

This system allows us to process thousands of crash reports daily, and route them to the right developer in just a few minutes — without any manual work.

If your team is drowning in uncategorized crash issues — automate it. 🙌

Written by Vilian Iaumbaev.