Akka Streams Camel integration

Akka Streams and Alpakka provide a good an alternative to Apache Camel when integrating different services and data. Alpakka has a rich set of connectors, including file, JMS, Cassandra and more. We can still use Camel endpoints if Alpakka doesn’t provide the connectors.

This post shows a complete example of integrating Camel endpoints with Akka Streams. This example reads file content in one directory, tranforms the content, then writes to another directory. It uses the Camel file endpoint.

Basic setup

The example is a Gradle project written in Kotlin. Below is the build.gradle file of this project. We need to add two external Maven repositories to resolve Akka artifacts. krasserm/streamz is the library for Camel integration with Akka Streams.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
buildscript {
ext.kotlin_version = '1.2.21'

repositories {
mavenCentral()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}

group 'io.vividcode.akkastreams'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'kotlin'

sourceCompatibility = 1.8

repositories {
maven {
url "http://dl.bintray.com/krasserm/maven"
}
maven {
url "https://dl.bintray.com/akka/maven"
}
mavenCentral()
}

dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.9'
compile group: 'com.github.krasserm', name: 'streamz-camel-akka_2.12', version: '0.9-M1'
runtime group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}

Integration

Below is the Main.kt file for the actual integration. CamelEndpoint is the abstract class for all Camel endpoints. CamelEndpoint implements JavaDsl, so we can use DSL to create Akka Streams Sources, Sinks, and Flows. StreamContext is required when implementing JavaDsl. CamelFileEndpoint is the implementation of Camel file endpoint. Here we provide the file path. receive creates a new Source using receiveBody from JavaDsl. send creates a Graph using sendBody.

In the main method, we create a new ActorSystem as the materializer of the graphs. We also creates a new CamelContext and StreamContext. toUpperCase is the flow to transform file content. Then we create a new Graph using GraphDSL and run this graph. Because the graph returned by send has the FlowShape, we need to connect it to the Sink.ignore() to close the whole graph.

When we add a new file in /tmp/akka-input, a new file will be created in /tmp/akka-output with content transformed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.FlowShape
import akka.stream.Graph
import akka.stream.javadsl.GraphDSL
import akka.stream.javadsl.RunnableGraph
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import org.apache.camel.impl.DefaultCamelContext
import streamz.camel.StreamContext
import streamz.camel.akka.javadsl.JavaDsl

abstract class CamelEndpoint(private val streamContext: StreamContext) : JavaDsl {
abstract fun toUri(): String

override fun streamContext(): StreamContext {
return streamContext
}
}

class CamelFileEndpoint(streamContext: StreamContext, private val path: String) : CamelEndpoint(streamContext) {
override fun toUri(): String {
return "file://$path"
}

fun receive(): Source<ByteArray, NotUsed> {
return receiveBody(toUri(), ByteArray::class.java)
}

fun send(): Graph<FlowShape<ByteArray, ByteArray>, NotUsed> {
return sendBody<ByteArray>(toUri())
}
}

fun main(args: Array<String>) {
val system = ActorSystem.create("CamelTest")
val materializer = ActorMaterializer.create(system)
val camelContext = DefaultCamelContext()
val streamContext = StreamContext.create(camelContext)

val fileInput = CamelFileEndpoint(streamContext, "/tmp/akka-input")
val fileOutput = CamelFileEndpoint(streamContext, "/tmp/akka-output")
val toUpperCase = Flow.fromFunction<ByteArray, ByteArray> { String(it).toUpperCase().toByteArray() }
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create { builder ->
val toUpperCaseShape = builder.add(toUpperCase)
val input = builder.add(fileInput.receive())
val output = builder.add(fileOutput.send())
val fileSink = builder.add(Sink.ignore())
builder.from(input.out()).toInlet(toUpperCaseShape.`in`())
builder.from(toUpperCaseShape.out()).toInlet(output.`in`())
builder.from(output.out()).toInlet(fileSink.`in`())
ClosedShape.getInstance()
})
runnableGraph.run(materializer)
}

Source code

Full source code is available on GitHub.

Comments