3
3
from shutil import copy , copyfileobj , rmtree
4
4
import urllib
5
5
from jsonschema import validate
6
- from .rdf import rdf_receive , jsonld2nt , TripleStore
6
+ from .rdf import jsonld2nt , TripleStore , triple_iterator
7
7
from .log import Log
8
8
from .errors import NotFound , ClientError , ValidationError
9
9
from .utils import read_json , write_json , access_location
@@ -105,7 +105,7 @@ def delete(self, id):
105
105
rmtree (self .stage / str (id ), ignore_errors = True )
106
106
107
107
def purge (self ):
108
- for id in [t ["uri" ]. split ( "/" )[ - 1 ] for t in self .list ()]:
108
+ for id in [t ["id" ] for t in self .list ()]:
109
109
self .delete (id )
110
110
111
111
def load (self , id ):
@@ -139,40 +139,13 @@ def forbidden_namespaces(self, id):
139
139
return {}
140
140
141
141
def receive (self , id , file = None ):
142
- file , fmt = self .get_source (id , file )
143
- original , log = self .receive_source (id , file , fmt )
144
- stage = self .stage / str (id )
145
- file = self .process_received (id , original , fmt , log )
146
- namespaces = self .forbidden_namespaces (id )
147
- rdf_receive (file , stage , log , namespaces )
148
-
142
+ file , fmt = self .identify_source (id , file )
143
+ original , log = self .fetch_source (id , file , fmt )
144
+ file = self .preprocess_source (id , original , fmt , log )
145
+ self .receive_rdf (id , file , log )
149
146
return log .done ()
150
147
151
- def receive_source (self , id , source , fmt ):
152
- stage = self .stage / str (id )
153
- stage .mkdir (exist_ok = True )
154
-
155
- original = stage / f"original.{ fmt } "
156
- log = Log (stage / "receive.json" , f"Receiving { id } from { source } " )
157
-
158
- try :
159
- if "/" not in source :
160
- source = self .data / source
161
- log .append (f"Retrieving source { source } from data directory" )
162
- copy (source , original )
163
- else :
164
- # TODO: source may be a DOI or similar identifier
165
- # ./extract-rdf.py $download_dir $stage/triples.nt
166
- log .append (f"Retrieving source from { source } " )
167
- with urllib .request .urlopen (source ) as fsrc , open (original , 'wb' ) as fdst :
168
- copyfileobj (fsrc , fdst )
169
- except Exception as e :
170
- log .done (f"Retrieving failed: { e } " )
171
- raise NotFound (f"{ source } not found" )
172
-
173
- return (original , log )
174
-
175
- def get_source (self , id , source = None ):
148
+ def identify_source (self , id , source = None ):
176
149
item = self .get (id )
177
150
fmt = None
178
151
@@ -188,7 +161,7 @@ def get_source(self, id, source=None):
188
161
elif fmt == "rdf/xml" :
189
162
fmt = "xml"
190
163
191
- # TODO: configure and extend this
164
+ # TODO: configure and extend this. Add support of .zip files
192
165
if not fmt :
193
166
if Path (source ).suffix in [".nt" , ".ttl" ]:
194
167
fmt = "ttl"
@@ -202,5 +175,53 @@ def get_source(self, id, source=None):
202
175
203
176
return source , fmt
204
177
205
- def process_received (self , id , file , fmt , log ):
178
+ def fetch_source (self , id , source , fmt ):
179
+ stage = self .stage / str (id )
180
+ stage .mkdir (exist_ok = True )
181
+
182
+ original = stage / f"original.{ fmt } "
183
+ log = Log (stage / "receive.json" , f"Receiving { id } from { source } " )
184
+
185
+ try :
186
+ if "/" not in source :
187
+ source = self .data / source
188
+ log .append (f"Retrieving source { source } from data directory" )
189
+ copy (source , original )
190
+ else :
191
+ # TODO: source may be a DOI or similar identifier
192
+ # ./extract-rdf.py $download_dir $stage/triples.nt
193
+ log .append (f"Retrieving source from { source } " )
194
+ with urllib .request .urlopen (source ) as fsrc , open (original , 'wb' ) as fdst :
195
+ copyfileobj (fsrc , fdst )
196
+ except Exception as e :
197
+ log .done (f"Retrieving failed: { e } " )
198
+ raise NotFound (f"{ source } not found" )
199
+
200
+ return (original , log )
201
+
202
+ def preprocess_source (self , id , file , fmt , log ):
206
203
return file
204
+
205
+ def receive_rdf (self , id , source , log ):
206
+ namespaces = tuple (list (self .forbidden_namespaces (id ).values ()))
207
+
208
+ stage = self .stage / str (id )
209
+ checked = open (stage / "checked.nt" , "w" )
210
+ removed = open (stage / "removed.nt" , "w" )
211
+
212
+ okCount , removedCount = 0 , 0
213
+ for s , p , o in triple_iterator (source , log ):
214
+ # TODO: implement more filtering and rewrite
215
+ if str (s )[1 :].startswith (namespaces ):
216
+ removedCount = removedCount + 1
217
+ removed .write (f"{ s } { p } { o } .\n " )
218
+ else :
219
+ okCount = okCount + 1
220
+ # TODO: filter out namespaces
221
+ # if predicate.startswith(rdflib.RDFS)
222
+ checked .write (f"{ s } { p } { o } .\n " )
223
+
224
+ log .append (
225
+ f"Removed { removedCount } triples, remaining { okCount } unique triples." )
226
+
227
+ # TODO: if okCount is zero, raise an error
0 commit comments