1
- '''
2
- Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
1
+ # Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
+ #
3
+ # Licensed under the Amazon Software License (the "License").
4
+ # You may not use this file except in compliance with the License.
5
+ # A copy of the License is located at
6
+ #
7
+ # http://aws.amazon.com/asl/
8
+ #
9
+ # or in the "license" file accompanying this file. This file is distributed
10
+ # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11
+ # express or implied. See the License for the specific language governing
12
+ # permissions and limitations under the License.
13
+ import abc
14
+ import json
15
+ import sys
16
+ import traceback
17
+
18
+ from amazon_kclpy import dispatch
19
+ from amazon_kclpy .v2 import processor
20
+ from amazon_kclpy import messages
3
21
4
- Licensed under the Amazon Software License (the "License").
5
- You may not use this file except in compliance with the License.
6
- A copy of the License is located at
7
-
8
- http://aws.amazon.com/asl/
9
-
10
- or in the "license" file accompanying this file. This file is distributed
11
- on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12
- express or implied. See the License for the specific language governing
13
- permissions and limitations under the License.
14
- '''
15
- import abc , base64 , io , json , os , random , sys , time , traceback
16
22
17
23
class _IOHandler (object ):
18
24
'''
@@ -72,10 +78,10 @@ def load_action(self, line):
72
78
:param line: A message line that was delivered received from the MultiLangDaemon (e.g.
73
79
'{"action" : "initialize", "shardId" : "shardId-000001"}')
74
80
75
- :rtype: dict
76
- :return: A dictionary representing the contents of the line (e.g. {" action" : "initialize", "shardId" : "shardId-000001"})
81
+ :rtype: amazon_kclpy.messages.MessageDispatcher
82
+ :return: A callabe action class that contains the action presented in the line
77
83
'''
78
- return json .loads (line )
84
+ return json .loads (line , object_hook = dispatch . message_decode )
79
85
80
86
def write_action (self , response ):
81
87
'''
@@ -103,6 +109,7 @@ def __init__(self, value):
103
109
def __str__ (self ):
104
110
return repr (self .value )
105
111
112
+
106
113
class Checkpointer (object ):
107
114
'''
108
115
A checkpointer class which allows you to make checkpoint requests. A checkpoint marks a point in a shard
@@ -122,39 +129,40 @@ def _get_action(self):
122
129
'''
123
130
Gets the next json message from STDIN
124
131
125
- :rtype: dict
126
- :return: A dictionary object that indicates what action this processor should take next. For example
127
- {"action" : "initialize", "shardId" : "shardId-000001"} would indicate that this processor should
128
- invoke the initialize method of the inclosed RecordProcessor object.
132
+ :rtype: object
133
+ :return: Either a child of MessageDispatcher, or a housekeeping object type
129
134
'''
130
135
line = self .io_handler .read_line ()
131
136
action = self .io_handler .load_action (line )
132
137
return action
133
138
134
- def checkpoint (self , sequenceNumber = None ):
135
- '''
139
+ def checkpoint (self , sequence_number = None , sub_sequence_number = None ):
140
+ """
136
141
Checkpoints at a particular sequence number you provide or if no sequence number is given, the checkpoint will
137
142
be at the end of the most recently delivered list of records
138
143
139
- :type sequenceNumber: str
140
- :param sequenceNumber: The sequence number to checkpoint at or None if you want to checkpoint at the farthest record
141
- '''
142
- response = {"action" : "checkpoint" , "checkpoint" : sequenceNumber }
144
+ :param str or None sequence_number: The sequence number to checkpoint at or None if you want to checkpoint at the
145
+ farthest record
146
+ :param int or None sub_sequence_number: the sub sequence to checkpoint at, if set to None will checkpoint
147
+ at the farthest sub_sequence_number
148
+ """
149
+ response = {"action" : "checkpoint" , "sequenceNumber" : sequence_number , "subSequenceNumber" : sub_sequence_number }
143
150
self .io_handler .write_action (response )
144
151
action = self ._get_action ()
145
- if action . get ( ' action' ) == 'checkpoint' :
146
- if action .get ( ' error' ) != None :
147
- raise CheckpointError (action .get ( ' error' ) )
152
+ if isinstance ( action , messages . CheckpointInput ) :
153
+ if action .error is not None :
154
+ raise CheckpointError (action .error )
148
155
else :
149
- '''
150
- We are in an invalid state. We will raise a checkpoint exception
151
- to the RecordProcessor indicating that the KCL (or KCLpy) is in
152
- an invalid state. See KCL documentation for description of this
153
- exception. Note that the documented guidance is that this exception
154
- is NOT retryable so the client code should exit.
155
- '''
156
+ #
157
+ # We are in an invalid state. We will raise a checkpoint exception
158
+ # to the RecordProcessor indicating that the KCL (or KCLpy) is in
159
+ # an invalid state. See KCL documentation for description of this
160
+ # exception. Note that the documented guidance is that this exception
161
+ # is NOT retryable so the client code should exit.
162
+ #
156
163
raise CheckpointError ('InvalidStateException' )
157
164
165
+
158
166
# RecordProcessor base class
159
167
class RecordProcessorBase (object ):
160
168
'''
@@ -175,7 +183,7 @@ def initialize(self, shard_id):
175
183
:type shard_id: str
176
184
:param shard_id: The shard id that this processor is going to be working on.
177
185
'''
178
- return
186
+ raise NotImplementedError
179
187
180
188
@abc .abstractmethod
181
189
def process_records (self , records , checkpointer ):
@@ -192,7 +200,7 @@ def process_records(self, records, checkpointer):
192
200
:type checkpointer: amazon_kclpy.kcl.Checkpointer
193
201
:param checkpointer: A checkpointer which accepts a sequence number or no parameters.
194
202
'''
195
- return
203
+ raise NotImplementedError
196
204
197
205
@abc .abstractmethod
198
206
def shutdown (self , checkpointer , reason ):
@@ -210,63 +218,43 @@ def shutdown(self, checkpointer, reason):
210
218
shard so that this processor will be shutdown and new processor(s) will be created to for the child(ren) of
211
219
this shard.
212
220
'''
213
- return
221
+ raise NotImplementedError
222
+
223
+ version = 1
214
224
215
- class MalformedAction (Exception ):
216
- '''
217
- Raised when an action given by the MultiLangDaemon doesn't have all the appropriate attributes.
218
- '''
219
- pass
220
225
221
226
class KCLProcess (object ):
222
227
223
- def __init__ (self , record_processor , inputfile = sys .stdin , outputfile = sys .stdout , errorfile = sys .stderr ):
224
- '''
225
- :type record_processor: amazon_kclpy.kcl .RecordProcessorBase
228
+ def __init__ (self , record_processor , input_file = sys .stdin , output_file = sys .stdout , error_file = sys .stderr ):
229
+ """
230
+ :type record_processor: RecordProcessorBase or amazon_kclpy.v2.processor .RecordProcessorBase
226
231
:param record_processor: A record processor to use for processing a shard.
227
232
228
- :type inputfile: file
229
- :param inputfile: A file to read action messages from. Typically STDIN.
233
+ :param file input_file: A file to read action messages from. Typically STDIN.
230
234
231
- :type outputfile: file
232
- :param outputfile: A file to write action messages to. Typically STDOUT.
235
+ :param file output_file: A file to write action messages to. Typically STDOUT.
233
236
234
- :type errorfile: file
235
- :param errorfile: A file to write error messages to. Typically STDERR.
236
- '''
237
- self .io_handler = _IOHandler (inputfile , outputfile , errorfile )
237
+ :param file error_file: A file to write error messages to. Typically STDERR.
238
+ """
239
+ self .io_handler = _IOHandler (input_file , output_file , error_file )
238
240
self .checkpointer = Checkpointer (self .io_handler )
239
- self .processor = record_processor
241
+ if record_processor .version == 1 :
242
+ self .processor = processor .V1toV2Processor (record_processor )
243
+ else :
244
+ self .processor = record_processor
240
245
241
246
def _perform_action (self , action ):
242
- '''
247
+ """
243
248
Maps input action to the appropriate method of the record processor.
244
249
245
- :type action: dict
246
- :param action: A dictionary that represents an action to take with appropriate attributes e.g.
247
- {"action":"initialize","shardId":"shardId-123"}
248
- {"action":"processRecords","records":[{"data":"bWVvdw==","partitionKey":"cat","sequenceNumber":"456"}]}
249
- {"action":"shutdown","reason":"TERMINATE"}
250
+ :type action:
251
+ :param MessageDispatcher action: A derivative of MessageDispatcher that will handle the provided input
250
252
251
253
:raises MalformedAction: Raised if the action is missing attributes.
252
- '''
253
- try :
254
- action_type = action ['action' ]
255
- if action_type == 'initialize' :
256
- args = (action ['shardId' ],)
257
- f = self .processor .initialize
258
- elif action_type == 'processRecords' :
259
- args = (action ['records' ], self .checkpointer )
260
- f = self .processor .process_records
261
- elif action_type == 'shutdown' :
262
- args = (self .checkpointer , action ['reason' ])
263
- f = self .processor .shutdown
264
- else :
265
- raise MalformedAction ("Received an action which couldn't be understood. Action was '{action}'" .format (action = action ))
266
- except KeyError as key_error :
267
- raise MalformedAction ("Action {action} was expected to have key {key}" .format (action = action , key = str (key_error )))
254
+ """
255
+
268
256
try :
269
- f ( * args )
257
+ action . dispatch ( self . checkpointer , self . processor )
270
258
except :
271
259
'''
272
260
We don't know what the client's code could raise and we have no way to recover if we let it propagate
@@ -282,7 +270,7 @@ def _report_done(self, response_for=None):
282
270
283
271
:param response_for: Required parameter; the action that this status message is confirming completed.
284
272
'''
285
- self .io_handler .write_action ({"action" : "status" , "responseFor" : response_for })
273
+ self .io_handler .write_action ({"action" : "status" , "responseFor" : response_for })
286
274
287
275
def _handle_a_line (self , line ):
288
276
'''
@@ -296,8 +284,7 @@ def _handle_a_line(self, line):
296
284
'''
297
285
action = self .io_handler .load_action (line )
298
286
self ._perform_action (action )
299
- self ._report_done (action .get ('action' ))
300
-
287
+ self ._report_done (action .action )
301
288
302
289
def run (self ):
303
290
'''
0 commit comments