Fix queue issues
This commit is contained in:
parent
a3f3856632
commit
2409c818ee
10 changed files with 95 additions and 18 deletions
|
|
@ -15,4 +15,4 @@ public function __construct(public Article $article)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
32
app/Jobs/RefreshArticlesJob.php
Normal file
32
app/Jobs/RefreshArticlesJob.php
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Jobs;
|
||||||
|
|
||||||
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||||
|
use Illuminate\Foundation\Queue\Queueable;
|
||||||
|
|
||||||
|
class RefreshArticlesJob implements ShouldQueue
|
||||||
|
{
|
||||||
|
use Queueable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new job instance.
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this->onQueue('lemmy-posts');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the job.
|
||||||
|
*/
|
||||||
|
public function handle(): void
|
||||||
|
{
|
||||||
|
echo "Starting article refresh job...\n";
|
||||||
|
|
||||||
|
// Call the article refresh command
|
||||||
|
\Artisan::call('article:refresh');
|
||||||
|
|
||||||
|
echo "Article refresh job completed!\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -27,7 +27,9 @@ public static function dispatchForLemmy(): void
|
||||||
$communityId = config('lemmy.community_id');
|
$communityId = config('lemmy.community_id');
|
||||||
$communityName = config('lemmy.community');
|
$communityName = config('lemmy.community');
|
||||||
|
|
||||||
if ($communityId && $communityName) {
|
if ($communityName) {
|
||||||
|
// Use a placeholder ID if community_id is not set - we'll resolve it in the job
|
||||||
|
$communityId = $communityId ?: 'resolve_from_name';
|
||||||
self::dispatch(PlatformEnum::LEMMY, (string) $communityId, $communityName);
|
self::dispatch(PlatformEnum::LEMMY, (string) $communityId, $communityName);
|
||||||
} else {
|
} else {
|
||||||
logger()->warning('Cannot dispatch Lemmy sync job: missing community configuration');
|
logger()->warning('Cannot dispatch Lemmy sync job: missing community configuration');
|
||||||
|
|
@ -36,9 +38,13 @@ public static function dispatchForLemmy(): void
|
||||||
|
|
||||||
public function handle(): void
|
public function handle(): void
|
||||||
{
|
{
|
||||||
|
echo "Starting channel posts sync job...\n";
|
||||||
|
|
||||||
if ($this->platform === PlatformEnum::LEMMY) {
|
if ($this->platform === PlatformEnum::LEMMY) {
|
||||||
$this->syncLemmyChannelPosts();
|
$this->syncLemmyChannelPosts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
echo "Channel posts sync job completed!\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
private function syncLemmyChannelPosts(): void
|
private function syncLemmyChannelPosts(): void
|
||||||
|
|
@ -47,7 +53,12 @@ private function syncLemmyChannelPosts(): void
|
||||||
$api = new LemmyApiService(config('lemmy.instance'));
|
$api = new LemmyApiService(config('lemmy.instance'));
|
||||||
$token = $this->getAuthToken($api);
|
$token = $this->getAuthToken($api);
|
||||||
|
|
||||||
$api->syncChannelPosts($token, (int) $this->channelId, $this->channelName);
|
// Resolve community ID if it's a placeholder
|
||||||
|
$communityId = $this->channelId === 'resolve_from_name'
|
||||||
|
? $api->getCommunityId($this->channelName)
|
||||||
|
: (int) $this->channelId;
|
||||||
|
|
||||||
|
$api->syncChannelPosts($token, $communityId, $this->channelName);
|
||||||
|
|
||||||
logger()->info('Channel posts synced successfully', [
|
logger()->info('Channel posts synced successfully', [
|
||||||
'platform' => $this->platform->value,
|
'platform' => $this->platform->value,
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,15 @@
|
||||||
|
|
||||||
use App\Events\ArticleReadyToPublish;
|
use App\Events\ArticleReadyToPublish;
|
||||||
use App\Jobs\PublishToLemmyJob;
|
use App\Jobs\PublishToLemmyJob;
|
||||||
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||||
|
|
||||||
class PublishArticle
|
class PublishArticle implements ShouldQueue
|
||||||
{
|
{
|
||||||
|
public string|null $queue = 'default';
|
||||||
|
public int $delay = 300;
|
||||||
|
public int $tries = 3;
|
||||||
|
public int $backoff = 300;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
@ -15,11 +21,20 @@ public function handle(ArticleReadyToPublish $event): void
|
||||||
{
|
{
|
||||||
$article = $event->article;
|
$article = $event->article;
|
||||||
|
|
||||||
|
// Check if already published to avoid duplicate jobs
|
||||||
|
if ($article->articlePublication()->exists()) {
|
||||||
|
logger()->info('Article already published, skipping job dispatch', [
|
||||||
|
'article_id' => $article->id,
|
||||||
|
'url' => $article->url
|
||||||
|
]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
logger()->info('Article queued for publishing to Lemmy', [
|
logger()->info('Article queued for publishing to Lemmy', [
|
||||||
'article_id' => $article->id,
|
'article_id' => $article->id,
|
||||||
'url' => $article->url
|
'url' => $article->url
|
||||||
]);
|
]);
|
||||||
|
|
||||||
PublishToLemmyJob::dispatch($article);
|
PublishToLemmyJob::dispatch($article);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,12 @@
|
||||||
use App\Events\ArticleFetched;
|
use App\Events\ArticleFetched;
|
||||||
use App\Events\ArticleReadyToPublish;
|
use App\Events\ArticleReadyToPublish;
|
||||||
use App\Services\Article\ValidationService;
|
use App\Services\Article\ValidationService;
|
||||||
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||||
|
|
||||||
class ValidateArticle
|
class ValidateArticle implements ShouldQueue
|
||||||
{
|
{
|
||||||
|
public string|null $queue = 'default';
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
|
@ -15,13 +18,28 @@ public function handle(ArticleFetched $event): void
|
||||||
{
|
{
|
||||||
$article = $event->article;
|
$article = $event->article;
|
||||||
|
|
||||||
|
// Skip if already validated
|
||||||
if (! is_null($article->validated_at)) {
|
if (! is_null($article->validated_at)) {
|
||||||
|
// Even if validated, don't fire ready-to-publish if already has publication
|
||||||
|
if ($article->isValid() && !$article->articlePublication()->exists()) {
|
||||||
|
event(new ArticleReadyToPublish($article));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if already has publication (prevents duplicate processing)
|
||||||
|
if ($article->articlePublication()->exists()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$article = ValidationService::validate($article);
|
$article = ValidationService::validate($article);
|
||||||
|
|
||||||
if ($article->isValid()) {
|
if ($article->isValid()) {
|
||||||
|
// Double-check publication doesn't exist (race condition protection)
|
||||||
|
if ($article->articlePublication()->exists()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
event(new ArticleReadyToPublish($article));
|
event(new ArticleReadyToPublish($article));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -73,6 +73,7 @@ public function articlePublications(): HasMany
|
||||||
protected static function booted(): void
|
protected static function booted(): void
|
||||||
{
|
{
|
||||||
static::created(function ($article) {
|
static::created(function ($article) {
|
||||||
|
echo "Article::created event fired for article ID {$article->id}\n";
|
||||||
event(new ArticleFetched($article));
|
event(new ArticleFetched($article));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,15 +23,6 @@ public function register(): void
|
||||||
|
|
||||||
public function boot(): void
|
public function boot(): void
|
||||||
{
|
{
|
||||||
Event::listen(
|
|
||||||
ArticleFetched::class,
|
|
||||||
ValidateArticle::class,
|
|
||||||
);
|
|
||||||
|
|
||||||
Event::listen(
|
|
||||||
ArticleReadyToPublish::class,
|
|
||||||
PublishArticle::class,
|
|
||||||
);
|
|
||||||
|
|
||||||
Event::listen(
|
Event::listen(
|
||||||
ExceptionOccurred::class,
|
ExceptionOccurred::class,
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,14 @@ public static function fetchArticleData(Article $article): array
|
||||||
|
|
||||||
private static function saveArticle(string $url): Article
|
private static function saveArticle(string $url): Article
|
||||||
{
|
{
|
||||||
return Article::firstOrCreate(['url' => $url]);
|
$existingArticle = Article::where('url', $url)->first();
|
||||||
|
|
||||||
|
if ($existingArticle) {
|
||||||
|
echo "ArticleFetcher: Found existing article ID {$existingArticle->id} for URL: {$url}\n";
|
||||||
|
return $existingArticle;
|
||||||
|
}
|
||||||
|
|
||||||
|
echo "ArticleFetcher: Creating new article for URL: {$url}\n";
|
||||||
|
return Article::create(['url' => $url]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,4 +5,5 @@
|
||||||
'password' => env('LEMMY_PASSWORD'),
|
'password' => env('LEMMY_PASSWORD'),
|
||||||
'instance' => env('LEMMY_INSTANCE'),
|
'instance' => env('LEMMY_INSTANCE'),
|
||||||
'community' => env('LEMMY_COMMUNITY'),
|
'community' => env('LEMMY_COMMUNITY'),
|
||||||
|
'community_id' => env('LEMMY_COMMUNITY_ID'),
|
||||||
];
|
];
|
||||||
|
|
|
||||||
|
|
@ -48,8 +48,8 @@ elif [ "$1" = "queue" ]; then
|
||||||
echo "Dispatching initial sync job..."
|
echo "Dispatching initial sync job..."
|
||||||
php artisan tinker --execute="App\\Jobs\\SyncChannelPostsJob::dispatchForLemmy();"
|
php artisan tinker --execute="App\\Jobs\\SyncChannelPostsJob::dispatchForLemmy();"
|
||||||
|
|
||||||
echo "Fetching initial articles..."
|
echo "Dispatching article refresh job..."
|
||||||
php artisan article:refresh
|
php artisan tinker --execute="App\\Jobs\\RefreshArticlesJob::dispatch();"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Execute the command based on the argument
|
# Execute the command based on the argument
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue