Sunbird UCI
Search
⌃K

Create a Transformer

1. Overview

Transformers transforms the previous xMessage from the user to one that needs to be sent next. It is essentially a microservice that based on the previous user action, returns a new xMessage that will then be shown to the user. This also enables conversion from one type
All inbound messages pass through a transformer. If a transformer is not assigned, then a null transformer is assigned to the xMessage. Since the current implementation of MessageRosa is only in Java, currently there is a limitation on the number of languages you can build the transformer on which right now is just Java.
Simply put a transformer looks something like this

1.1 Responsibilities

  • Acts as a state machine and converts messages from one state to another.
  • Used to transform messages
  • Applies conversational logic (constraints or input format)

2. Creating your own Transformer

All transformers are named as <Action>Transformer; for example PDFTransformer will generate a PDF from the previous XMessage and convert it to an XMessage. Adapters should extend TransformerProvider. Thus, it needs to implement the following methods:
  • public XMessage transform(XMessage nextMsg) //Takes in XMessage and return the next message.

3. Sample Code

Sample code to transform text to lowercase characters before sending to outbound
public class DemoTransformer extends TransformerProvider {
private final Flux<ReceiverRecord<String, String>> reactiveKafkaReceiver;
@Value("${processOutbound}")
private String processOutboundTopic;
@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
try {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
transform(msg)
.subscribe(new Consumer<XMessage>() {
@Override
public void accept(XMessage transformedMessage) {
if (transformedMessage != null) {
try {
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
} catch (JAXBException e) {
e.printStackTrace();
}
}
}
});
} catch (JAXBException e) {
e.printStackTrace();
} catch (NullPointerException e) {
log.error("An error occured : "+e.getMessage() + " at line no : "+ e.getStackTrace()[0].getLineNumber()
+" in class : "+e.getStackTrace()[0].getClassName());
} catch (Exception e) {
e.printStackTrace();
}
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
System.out.println(e.getMessage());
log.error("KafkaFlux exception", e);
}
}).subscribe();
}
/* Transform text to lower case */
public XMessage transform(XMessage nextMsg) {
Payload payload = nextMsg.getPayload();
String text = payload.getText();
payload.setText(text.toLowerCase());
nextMsg.setPayload(payload);
return nextMsg;
}
}
All transformers with the implementation same as above exmaple will be valid. A detailed example transformer can be found here.

4. Properties of a transformer

  • Transformers could call external services and wait for resposes from them to process message. These transformers are called call-only transformer. An example of this transformer is the PDF service transformer which calls the PDF service to check the status of queue of previous message and responds with a response containing PDF for the user.
  • All transformers need to be registered. All unregistered transformers will not be acted upon. This is what essentially adds a topic to broker on which the messages are pushed. Also this requires some basic config of the max time a transformer could take to process the message.
  • Scaling of transformers is done horizontally but the broker needs to know the number so tha partitions can be reconfigured.
  • Since it's part of a state machine. If the transformer is stuck it needs communicate to the Orchestrator to that it can be escalate.