|
1 |
| -import 'dart:convert'; |
2 | 1 | import 'dart:io';
|
3 |
| -import 'package:dotenv/dotenv.dart' as dotenv; |
4 |
| -import '../lib/db.dart'; |
5 |
| -import '../lib/connectors/scraper.dart'; |
| 2 | +import 'dart:convert'; |
| 3 | +import 'package:postgres/postgres.dart'; |
6 | 4 |
|
7 |
| -Future<void> processJob(DB db, Map row) async { |
8 |
| - final id = row['id'] as int; |
9 |
| - final payload = jsonDecode(row['payload'] as String) as Map<String, dynamic>; |
10 |
| - final type = row['type'] as String; |
11 |
| - print('Processing job \$id type=\$type payload=\$payload'); |
12 |
| - try { |
13 |
| - if (type == 'scrape_product') { |
14 |
| - final url = payload['url'] as String; |
15 |
| - final scraped = await scrapeProductFromUrl(url); |
16 |
| - if (scraped == null) throw Exception('scrape returned null'); |
17 |
| - final conn = db.conn; |
18 |
| - await conn.transaction((ctx) async { |
19 |
| - await ctx.query(''' |
20 |
| - INSERT INTO products (external_id, title, price, image, source_url, updated_at) |
21 |
| - VALUES (@external_id, @title, @price, @image, @source_url, now()) |
22 |
| - ON CONFLICT (source_url) DO UPDATE SET |
23 |
| - title = EXCLUDED.title, |
24 |
| - price = EXCLUDED.price, |
25 |
| - image = EXCLUDED.image, |
26 |
| - updated_at = now(); |
27 |
| - ''', substitutionValues: { |
28 |
| - 'external_id': scraped['external_id'], |
29 |
| - 'title': scraped['title'], |
30 |
| - 'price': scraped['price'], |
31 |
| - 'image': scraped['image'], |
32 |
| - 'source_url': scraped['source_url'] |
33 |
| - }); |
34 |
| - await ctx.query('UPDATE jobs SET status = @status, updated_at = now() WHERE id = @id', substitutionValues: { |
35 |
| - 'status': 'done', |
36 |
| - 'id': id |
37 |
| - }); |
38 |
| - }); |
39 |
| - } else { |
40 |
| - throw Exception('unknown job type'); |
41 |
| - } |
42 |
| - } catch (e) { |
43 |
| - print('Job \$id failed: \$e'); |
44 |
| - final attempts = (row['attempts'] as int) + 1; |
45 |
| - final conn = db.conn; |
46 |
| - final nextRun = DateTime.now().add(Duration(seconds: attempts * 10)); |
47 |
| - await conn.query('UPDATE jobs SET attempts = @attempts, last_error = @err, status = @status, run_at = @run_at, updated_at = now() WHERE id = @id', substitutionValues: { |
48 |
| - 'attempts': attempts, |
49 |
| - 'err': e.toString(), |
50 |
| - 'status': 'pending', |
51 |
| - 'run_at': nextRun.toUtc().toIso8601String(), |
52 |
| - 'id': id |
53 |
| - }); |
54 |
| - } |
| 5 | +Future<PostgreSQLConnection> connectDb() async { |
| 6 | + final dbUrl = Platform.environment['DATABASE_URL'] ?? 'postgres://mymodus:mymodus_pass@localhost:5432/mymodus_db'; |
| 7 | + final uri = Uri.parse(dbUrl); |
| 8 | + final conn = PostgreSQLConnection(uri.host, uri.port, uri.path.replaceFirst('/', ''), |
| 9 | + username: uri.userInfo.split(':').first, |
| 10 | + password: uri.userInfo.split(':').length > 1 ? uri.userInfo.split(':')[1] : null); |
| 11 | + await conn.open(); |
| 12 | + return conn; |
55 | 13 | }
|
56 | 14 |
|
57 |
| -Future<void> pollLoop(DB db) async { |
58 |
| - while (true) { |
| 15 | +void main(List<String> args) async { |
| 16 | + print('Worker started'); |
| 17 | + final conn = await connectDb(); |
| 18 | + |
| 19 | + // Simple example job: ensure marketplaces exist |
| 20 | + final marketplaces = [ |
| 21 | + {'code': 'ozon', 'name': 'Ozon'}, |
| 22 | + {'code': 'wb', 'name': 'Wildberries'}, |
| 23 | + {'code': 'lamoda', 'name': 'Lamoda'} |
| 24 | + ]; |
| 25 | + for (final m in marketplaces) { |
59 | 26 | try {
|
60 |
| - final rows = await db.conn.mappedResultsQuery(''' |
61 |
| - SELECT id, type, payload, attempts FROM jobs |
62 |
| - WHERE status = 'pending' AND run_at <= now() |
63 |
| - ORDER BY created_at ASC |
64 |
| - LIMIT 1 |
65 |
| - '''); |
66 |
| - if (rows.isEmpty) { |
67 |
| - await Future.delayed(Duration(seconds: 2)); |
68 |
| - continue; |
69 |
| - } |
70 |
| - final row = rows.first.values.first; |
71 |
| - await db.conn.query('UPDATE jobs SET status = @status, updated_at = now() WHERE id = @id', substitutionValues: { |
72 |
| - 'status': 'processing', |
73 |
| - 'id': row['id'] |
74 |
| - }); |
75 |
| - await processJob(db, row); |
| 27 | + await conn.query('INSERT INTO marketplaces (code, name) VALUES (@code, @name) ON CONFLICT (code) DO NOTHING', substitutionValues: m); |
76 | 28 | } catch (e) {
|
77 |
| - print('Worker loop error: \$e'); |
78 |
| - await Future.delayed(Duration(seconds: 5)); |
| 29 | + print('Marketplace insert error: \$e'); |
79 | 30 | }
|
80 | 31 | }
|
81 |
| -} |
82 | 32 |
|
83 |
| -void main(List<String> args) async { |
84 |
| - dotenv.load(); |
85 |
| - final db = await DB.connect(); |
86 |
| - print('Worker connected to DB, starting poll loop...'); |
87 |
| - await pollLoop(db); |
| 33 | + // Placeholder: here would be scheduler loop fetching from queue and running scrapers |
| 34 | + while (true) { |
| 35 | + print('Worker heartbeat: ' + DateTime.now().toIso8601String()); |
| 36 | + await Future.delayed(Duration(seconds: 30)); |
| 37 | + } |
88 | 38 | }
|
0 commit comments