Apache Camel

https://camel.apache.org/

  • set in application.properties file :

    camel.springboot.main-run-controller = true

File Operation in Apache Camel

@Override
    public void configure() throws Exception {
        log.info("---- starting ----");
        moveAllFile();
        //moveSpecificFile("file");
        //moveSpecificFileWithContent("hello");
        //processFile();
        //multiFileProcessor();
        log.info("---- end ----");
    }

    public void moveAllFile() {
        from("file:/home/uday/a").log("--- moved ---")
                .to("file:/home/uday/b");
    }

    public void moveSpecificFile(String fileName) {
        from("file:/home/uday/a")
                .filter(header(Exchange.FILE_NAME).startsWith(fileName))
                .to("file:/home/uday/b");
    }

    public void moveSpecificFileWithContent(String content) {
        from("file:/home/uday/a")
                .filter(body().startsWith(content))
                .to("file:/home/uday/b");
    }

    public void processFile() {
        from("file:/home/uday/a")
                .process(p -> {
                    String body = p.getIn().getBody(String.class);
                    StringBuilder sb = new StringBuilder();
                    Arrays.stream(body.split(" "))
                        .forEach(s -> sb.append(s).append(","));
                    p.getIn().setBody(sb);
                }).to("file:/home/uday/b");
    }

    public void multiFileProcessor() {
        from("file:/home/uday/a/").unmarshal().csv().split(body()
        .tokenize(",")).choice()
                .when(body().contains("closed"))
                    .to("file:/home/uday/b?fileName=closed.csv")
                .when(body().contains("pending"))
                    .to("file:/home/uday/b?fileName=pending.csv")
                .when(body().contains("waiting"))
                    .to("file:/home/uday/b?fileName=waiting.csv");
    }

More file operation

from("direct://log-file-values")
        .log("${messageHistory} ${headers.CamelFileAbsolute}")
        .log("${file:name} ${file:name.ext} ${file:size}");

from("file:files/input")
        .routeId("Files-Input-Route")
        .transform().body(String.class)

        .choice()

        //.when(simple("${file:ext} ends with 'xml'"))
        .when(method(deciderBean))
        .log("XML FILE")

        .when(simple("${body} contains 'USD'"))
        .log("Contains USD")
        .otherwise()
        .log("No Condition met")
        .end()

        // ignore this intellj warning

        .to("direct://log-file-values")
        .to("file:files/output");

Multicast

// ## multicast

from("timer:multicast?period=10000")
        .multicast()
        .to("log:something-1", "log:something-2");

Spilt


from("file:files/csv")
        .unmarshal().csv()
        .split(body())
        .to("log:split-files");

from("file:files/csv")
        .convertBodyTo(String.class)
        .split(body(), ",")
        .to("log:split-files");

from("file:files/csv")
        .convertBodyTo(String.class)
        .split(method(spliterComponent))
        .to("log:split-files");
    
---
                
@Autowired
    private SpliterComponent spliterComponent;
    
@Component
public class SpliterComponent {
    public List<String> spilt(String body) {
        return Arrays.stream(body.split(" ")).toList();
    }
}

Aggregation

from("file:files/agg-json")
    .unmarshal().json(JsonLibrary.Jackson, CurrencyExchange.class)
    .aggregate(simple("${body.to}"), (oldExchange, newExchange) -> {

        Object newBody = newExchange.getIn().getBody();
        ArrayList lst = null;

        if (oldExchange == null) {
            lst = new ArrayList<Object>();
            lst.add(newBody);
            newExchange.getIn().setBody(lst);
            return newExchange;
        } else {
            // ignore
            lst = oldExchange.getIn().getBody(ArrayList.class);
            lst.add(newBody);
            return oldExchange;
        }

    }).completionSize(3)
    .to("log:agg-json");

Routing Slip

String routingSlip = "direct:endpoint1,direct:endpoint2";

from("timer:routingSlip?period={{time-period}}")
        .transform().constant("my message")
        .routingSlip(simple(routingSlip));

from("direct:endpoint1").to("{{endpoint-for-logging}}");

from("direct:endpoint2").to("log:endpoint2");

from("direct:endpoint3").to("log:endpoint3");

from("timer:routingSlip?period=10000")
        .transform().constant("my message")
        .dynamicRouter(method(dynamicRouterBean));
        
----

@Component
@Slf4j
public class DynamicRouterBean  {

    public String decideTheNextEndpoint(
            @ExchangeProperties Map<String, String> properties,
            @Headers Map<String, String> headers,
            @Body String body
    ) {
        log.info("{} {} {}", properties, headers, body);
        return "direct:endpoint1";
    }

}

Error Handling

// to enable tracing
getContext().setTracing(true);

// not make sure no message is lost
errorHandler(deadLetterChannel("activemq:dead-letter-queue"));

// to tap message while in camel context
from("").wireTap("");

Active MQ

from("timer:active-mq-timer?period=2000")
        .transform().constant("my message for active-mq")
        .log("--- message sent to queue ---")
        .to("activemq:my-activemq-queue");

from("file:files/json")
        .to("activemq:my-activemq-queue");

from("file:files/xml")
        .to("activemq:my-activemq-xml-queue");
        
from("activemq:my-activemq-queue")
        .unmarshal().json(JsonLibrary.Jackson, CurrencyExchange.class)
        .bean(myCurrencyExchangeProcessor)
        .bean(myCurrencyExchangeTransformer)
        .to("log:received-from-active-mq");

from("activemq:my-activemq-xml-queue")
        .unmarshal()
        .jacksonXml(CurrencyExchange.class)
        .to("log:received-from-activemq-xml");

Kafka

from("file:files/json")
        .log("${body}")
        .to("kafka:mykafkatopic");

Using Time

// queue :- timer
// transformation
// database :- log

// from : starting route
// to   : the final destination

// processing : doesnt change the message
// transform : does change the message

from("timer:first-timer")

        // example 1
        //.transform()
        //.constant("My message at : "+ LocalDateTime.now())

        // example 2
        //.bean(getCurrentTimeBean)

        // example 3
        .log("${body}")
        .transform().constant("My message")
        .log("${body}")
        .bean(getCurrentTimeBean)
        .log("${body}")
        .bean(simpleLoggingProcessingComponent)
        .log("${body}")

        .to("log:first-timer");
        
----

public class SimpleLoggingProcessingComponent implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        log.info("SimpleLoggingProcessingComponent : {}", exchange.getMessage().getBody());
    }
}

-----

Rest API

restConfiguration().host("localhost:8081");

from("timer:rest-api-consumer?period=3000")
        .setHeader("from", () -> "usd")
        .setHeader("to", () -> "inr")
        .to("rest:get:/currency-exchange/from/{from}/to/{to}")
        .log("${body}");

Last updated