Backend MP4 conversion (#23)
* ADD conversion queue * ADD RemuxService for MP4 conversion * REMOVE unused conversion queue * REORGANISE Job-related classes * ADD Job stages * REVERT to old commit, using Spring Async instead * ADD asynchronous processing for video tasks * PATCH and streamline progress tracking * ADD asynchronous video processing and job restructuring * REFACTOR job service method * ADD job remux functionality * ADD remuxing endpoint * PATCH complete flag not updating in API response * ADD progress type in frontend * ADD reset functionality for job status * PATCH missing progress bar for subsequent exports * REDESIGN settings box * ADD tracking video file conversion in frontend * PATCH extension bug * REMOVE autowired decorator
This commit is contained in:
@@ -1,22 +1,17 @@
|
||||
package com.ddf.vodsystem.services;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import com.ddf.vodsystem.dto.Job;
|
||||
import com.ddf.vodsystem.services.media.RemuxService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.security.core.context.SecurityContext;
|
||||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.ddf.vodsystem.entities.Job;
|
||||
import com.ddf.vodsystem.entities.JobStatus;
|
||||
import com.ddf.vodsystem.exceptions.JobNotFound;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* Service for managing and processing jobs in a background thread.
|
||||
* Uses a blocking queue to avoid busy waiting and ensures jobs are processed sequentially.
|
||||
@@ -25,15 +20,19 @@ import jakarta.annotation.PostConstruct;
|
||||
public class JobService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JobService.class);
|
||||
private final ConcurrentHashMap<String, Job> jobs = new ConcurrentHashMap<>();
|
||||
private final BlockingQueue<Job> jobQueue = new LinkedBlockingQueue<>();
|
||||
private final ClipService clipService;
|
||||
private final RemuxService remuxService;
|
||||
private final DirectoryService directoryService;
|
||||
|
||||
/**
|
||||
* Constructs a JobService with the given CompressionService.
|
||||
* @param clipService the compression service to use for processing jobs
|
||||
*/
|
||||
public JobService(ClipService clipService) {
|
||||
public JobService(ClipService clipService,
|
||||
RemuxService remuxService, DirectoryService directoryService) {
|
||||
this.clipService = clipService;
|
||||
this.remuxService = remuxService;
|
||||
this.directoryService = directoryService;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -61,76 +60,49 @@ public class JobService {
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a job as ready and adds it to the processing queue.
|
||||
* @param uuid the UUID of the job to mark as ready
|
||||
*/
|
||||
public void jobReady(String uuid) {
|
||||
Job job = getJob(uuid);
|
||||
public void convertJob(Job job) {
|
||||
logger.info("Converting job: {}", job.getUuid());
|
||||
File tempFile = new File(job.getInputFile().getAbsolutePath() + ".temp");
|
||||
directoryService.copyFile(job.getInputFile(), tempFile);
|
||||
|
||||
SecurityContext context = SecurityContextHolder.getContext();
|
||||
job.setSecurityContext(context);
|
||||
job.getStatus().getConversion().reset();
|
||||
|
||||
try {
|
||||
remuxService.remux(
|
||||
tempFile,
|
||||
job.getInputFile(),
|
||||
job.getStatus().getConversion(),
|
||||
job.getInputVideoMetadata().getEndPoint())
|
||||
.thenRun(() -> {
|
||||
job.getStatus().getConversion().markComplete();
|
||||
directoryService.deleteFile(tempFile);
|
||||
});
|
||||
} catch (IOException | InterruptedException e) {
|
||||
logger.error("Error converting job {}: {}", job.getUuid(), e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
logger.info("Job ready: {}", job.getUuid());
|
||||
job.setStatus(JobStatus.PENDING);
|
||||
jobQueue.add(job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a job by running the compression service.
|
||||
* @param job the job to process
|
||||
* Marks a job as ready and adds it to the processing queue.
|
||||
* @param job The job to process
|
||||
*/
|
||||
private void processJob(Job job) {
|
||||
SecurityContext previousContext = SecurityContextHolder.getContext(); // optional, for restoring later
|
||||
try {
|
||||
if (job.getSecurityContext() != null) {
|
||||
SecurityContextHolder.setContext(job.getSecurityContext());
|
||||
}
|
||||
public void processJob(Job job) {
|
||||
logger.info("Job ready: {}", job.getUuid());
|
||||
job.getStatus().getProcess().reset();
|
||||
|
||||
try {
|
||||
clipService.create(
|
||||
job.getInputVideoMetadata(),
|
||||
job.getOutputVideoMetadata(),
|
||||
job.getInputFile(),
|
||||
job.getOutputFile(),
|
||||
job.getProgress()
|
||||
job.getStatus().getProcess()
|
||||
);
|
||||
|
||||
job.setStatus(JobStatus.FINISHED);
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
logger.error("Error processing job {}: {}", job.getUuid(), e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("Error while running job {}", job.getUuid(), e);
|
||||
|
||||
} finally {
|
||||
// 🔄 Restore previous context to avoid leaking across jobs
|
||||
SecurityContextHolder.setContext(previousContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the background processing loop in a daemon thread.
|
||||
* The loop blocks until a job is available and then processes it.
|
||||
*/
|
||||
@PostConstruct
|
||||
private void startProcessingLoop() {
|
||||
Thread thread = new Thread(() -> {
|
||||
logger.info("Starting processing loop");
|
||||
while (true) {
|
||||
try {
|
||||
Job job = jobQueue.take(); // Blocks until a job is available
|
||||
|
||||
logger.info("Starting job {}", job.getUuid());
|
||||
job.setStatus(JobStatus.RUNNING);
|
||||
processJob(job);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("Processing loop interrupted", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
thread.setDaemon(true);
|
||||
thread.start();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user