Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot
ENV DB_DATA_DIR=$DATA_CACHE_DIR/db
ENV DB_NAME=sourcebot
ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot"
ENV REDIS_URL="redis://localhost:6379"
ENV SRC_TENANT_ENFORCEMENT_MODE=strict

ARG SOURCEBOT_VERSION=unknown
Expand Down
133 changes: 119 additions & 14 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, Repo } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq';
import { Settings, WithRequired } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "./logger.js";
import os from 'os';
import { Redis } from 'ioredis';
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js";
import { CONFIG_REPO_UPSERT_TIMEOUT_MS } from "./environment.js";

interface IConnectionManager {
scheduleConnectionSync: (connection: Connection) => Promise<void>;
Expand All @@ -23,15 +22,18 @@ type JobPayload = {
};

export class ConnectionManager implements IConnectionManager {
private queue = new Queue<JobPayload>(QUEUE_NAME);
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('ConnectionManager');

constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
) {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
});
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis,
Expand Down Expand Up @@ -113,6 +115,7 @@ export class ConnectionManager implements IConnectionManager {
// appear in the repoData array above, and so the RepoToConnection record won't be re-created.
// Repos that have no RepoToConnection records are considered orphaned and can be deleted.
await this.db.$transaction(async (tx) => {
const deleteStart = performance.now();
await tx.connection.update({
where: {
id: job.data.connectionId,
Expand All @@ -123,21 +126,123 @@ export class ConnectionManager implements IConnectionManager {
}
}
});
const deleteDuration = performance.now() - deleteStart;
this.logger.info(`Deleted all RepoToConnection records for connection ${job.data.connectionId} in ${deleteDuration}ms`);

await Promise.all(repoData.map((repo) => {
return tx.repo.upsert({
where: {
external_id_external_codeHostUrl: {
external_id: repo.external_id,
external_codeHostUrl: repo.external_codeHostUrl,
},
const existingRepos: Repo[] = await tx.repo.findMany({
where: {
external_id: {
in: repoData.map(repo => repo.external_id),
},
create: repo,
update: repo as Prisma.RepoUpdateInput,
external_codeHostUrl: {
in: repoData.map(repo => repo.external_codeHostUrl),
},
},
});
const existingRepoKeys = existingRepos.map(repo => `${repo.external_id}-${repo.external_codeHostUrl}`);

const existingRepoData = repoData.filter(repo => existingRepoKeys.includes(`${repo.external_id}-${repo.external_codeHostUrl}`));
const [toCreate, toUpdate] = repoData.reduce<[Prisma.RepoCreateManyInput[], Prisma.RepoUpdateManyMutationInput[]]>(([toCreate, toUpdate], repo) => {
const existingRepo = existingRepoData.find((r: RepoData) => r.external_id === repo.external_id && r.external_codeHostUrl === repo.external_codeHostUrl);
if (existingRepo) {
const updateRepo: Prisma.RepoUpdateManyMutationInput = {
name: repo.name,
cloneUrl: repo.cloneUrl,
imageUrl: repo.imageUrl,
isFork: repo.isFork,
isArchived: repo.isArchived,
metadata: repo.metadata,
external_id: repo.external_id,
external_codeHostType: repo.external_codeHostType,
external_codeHostUrl: repo.external_codeHostUrl,
}
toUpdate.push(updateRepo);
} else {
const createRepo: Prisma.RepoCreateManyInput = {
name: repo.name,
cloneUrl: repo.cloneUrl,
imageUrl: repo.imageUrl,
isFork: repo.isFork,
isArchived: repo.isArchived,
metadata: repo.metadata,
orgId: job.data.orgId,
external_id: repo.external_id,
external_codeHostType: repo.external_codeHostType,
external_codeHostUrl: repo.external_codeHostUrl,
}
toCreate.push(createRepo);
}
return [toCreate, toUpdate];
}, [[], []]);

if (toCreate.length > 0) {
const createStart = performance.now();
const createdRepos = await tx.repo.createManyAndReturn({
data: toCreate,
});
}));

}, { timeout: parseInt(CONFIG_REPO_UPSERT_TIMEOUT_MS) });
await tx.repoToConnection.createMany({
data: createdRepos.map(repo => ({
repoId: repo.id,
connectionId: job.data.connectionId,
})),
});

const createDuration = performance.now() - createStart;
this.logger.info(`Created ${toCreate.length} repos in ${createDuration}ms`);
}

if (toUpdate.length > 0) {
const updateStart = performance.now();

// Build values string for update query
const updateValues = toUpdate.map(repo => `(
'${repo.name}',
'${repo.cloneUrl}',
${repo.imageUrl ? `'${repo.imageUrl}'` : 'NULL'},
${repo.isFork},
${repo.isArchived},
'${JSON.stringify(repo.metadata)}'::jsonb,
'${repo.external_id}',
'${repo.external_codeHostType}',
'${repo.external_codeHostUrl}'
)`).join(',');

// Update repos and get their IDs in one quercy
const updateSql = `
WITH updated AS (
UPDATE "Repo" r
SET
name = v.name,
"cloneUrl" = v.clone_url,
"imageUrl" = v.image_url,
"isFork" = v.is_fork,
"isArchived" = v.is_archived,
metadata = v.metadata,
"updatedAt" = NOW()
FROM (
VALUES ${updateValues}
) AS v(name, clone_url, image_url, is_fork, is_archived, metadata, external_id, external_code_host_type, external_code_host_url)
WHERE r.external_id = v.external_id
AND r."external_codeHostUrl" = v.external_code_host_url
RETURNING r.id
)
SELECT id FROM updated
`;
const updatedRepoIds = await tx.$queryRawUnsafe<{id: number}[]>(updateSql);

// Insert repo-connection mappings
const createConnectionSql = `
INSERT INTO "RepoToConnection" ("repoId", "connectionId", "addedAt")
SELECT id, ${job.data.connectionId}, NOW()
FROM unnest(ARRAY[${updatedRepoIds.map(r => r.id).join(',')}]) AS id
`;
await tx.$executeRawUnsafe(createConnectionSql);

const updateDuration = performance.now() - updateStart;
this.logger.info(`Updated ${toUpdate.length} repos in ${updateDuration}ms`);
}
});
}


Expand Down
5 changes: 2 additions & 3 deletions packages/backend/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ export const FALLBACK_GITHUB_TOKEN = getEnv(process.env.FALLBACK_GITHUB_TOKEN);
export const FALLBACK_GITLAB_TOKEN = getEnv(process.env.FALLBACK_GITLAB_TOKEN);
export const FALLBACK_GITEA_TOKEN = getEnv(process.env.FALLBACK_GITEA_TOKEN);

export const CONFIG_REPO_UPSERT_TIMEOUT_MS = getEnv(process.env.CONFIG_REPO_UPSERT_TIMEOUT_MS, '15000')!;

export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
export const REDIS_URL = getEnv(process.env.REDIS_URL, 'redis://localhost:6379')!;
6 changes: 2 additions & 4 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import { DEFAULT_SETTINGS } from './constants.js';
import { Redis } from 'ioredis';
import { ConnectionManager } from './connectionManager.js';
import { RepoManager } from './repoManager.js';
import { INDEX_CONCURRENCY_MULTIPLE } from './environment.js';
import { INDEX_CONCURRENCY_MULTIPLE, REDIS_URL } from './environment.js';

const logger = createLogger('main');

export const main = async (db: PrismaClient, context: AppContext) => {
const redis = new Redis({
host: 'localhost',
port: 6379,
const redis = new Redis(REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {
Expand Down
57 changes: 41 additions & 16 deletions packages/backend/src/repoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import os from 'os';

interface IRepoManager {
blockingPollLoop: () => void;
scheduleRepoIndexing: (repo: RepoWithConnections) => Promise<void>;
scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise<void>;
dispose: () => void;
}

Expand All @@ -25,8 +25,8 @@ type JobPayload = {
}

export class RepoManager implements IRepoManager {
private queue = new Queue<JobPayload>(QUEUE_NAME);
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('RepoManager');

constructor(
Expand All @@ -35,6 +35,9 @@ export class RepoManager implements IRepoManager {
redis: Redis,
private ctx: AppContext,
) {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
});
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), {
connection: redis,
Expand All @@ -46,26 +49,48 @@ export class RepoManager implements IRepoManager {

public async blockingPollLoop() {
while(true) {
this.fetchAndScheduleRepoIndexing();
this.garbageCollectRepo();
await this.fetchAndScheduleRepoIndexing();
await this.garbageCollectRepo();

await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
}
}

public async scheduleRepoIndexing(repo: RepoWithConnections) {
public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.update({
where: { id: repo.id },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
});

const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
if (!acc[repo.orgId]) {
acc[repo.orgId] = [];
}
acc[repo.orgId].push(repo);
return acc;
}, {});

for (const orgId in reposByOrg) {
const orgRepos = reposByOrg[orgId];
// Set priority based on number of repos (more repos = lower priority)
// This helps prevent large orgs from overwhelming the queue
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);

await this.queue.addBulk(orgRepos.map(repo => ({
name: 'repoIndexJob',
data: { repo },
opts: {
priority: priority
}
})));

this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`);
}


await this.queue.add('repoIndexJob', {
repo
});
this.logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err: unknown) => {
this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
});
}

Expand All @@ -91,9 +116,9 @@ export class RepoManager implements IRepoManager {
}
});

for (const repo of repos) {
await this.scheduleRepoIndexing(repo);
}
if (repos.length > 0) {
await this.scheduleRepoIndexingBulk(repos);
}
}

private async garbageCollectRepo() {
Expand Down