3年前 (2021-09-01)  相关技术 |   抢沙发  945 
文章评分 0 次,平均分 0.0

Apache Ignite实践

“让我们假设我们开始为我们的物联网应用开发一个Web服务器,其中包含几个端点,如接收事件的POST、获取DeviceBySensorType、获取全部和更新设备元数据的PUT等。

一开始,公共数据的缓存似乎是次要问题,但如果我们开始考虑长期问题,并且如果我们想要提高性能和/或减少响应时间(例如,当服务从数据库检索数据时),我们就会意识到缓存是一项强制性要求。

最初的解决方案可能是使用始终可靠的HashMap类(或在第一次ConcurrentModificationException之后使用ConcurrentHashMap):这是第一次尝试,通过在内存中的不同映射中缓存对象,可能会给我们带来快速的好处。但这还不够。

Data Grid 数据网格

使用Ignite,这很简单,只需在scala/java项目上添加依赖项:

1> sbt
2
3       "org.apache.ignite" % "ignite-core" % "2.4.0"
4
5> maven
6
7    <dependency>
8          <groupId>org.apache.ignite</groupId>
9          <artifactId>ignite-core</artifactId>
10          <version>2.4.0</version>
11   </dependency>

Scala代码示例:

1 object IgniteSimpleDataGrid extends App {
2
3     val igniteConfig = new IgniteConfiguration()
4     val cacheConfig = new CacheConfiguration("ignite")
5     igniteConfig.setCacheConfiguration(cacheConfig)
6     val ignite = Ignition.start(igniteConfig)
7     val cache: IgniteCache[Int, String] = ignite.getOrCreateCache[Int, String]("ignite")
8     for (i <- 1 to 10) {
9       cache.put(i, s"value-$i")
10     }
11     println(s"From cache 7 -> ${cache.get(7)}")
12     println(s"Not exists 20 -> ${cache.get(20)}")
13}

很好,我们有第一个单一版本的Ignite Data Grid,但是如果我们需要HA并部署该应用程序的另一个实例,该怎么办?或者这听起来很熟悉,因为元数据端点与另一个服务相关,并且使用相同的信息,因此需要在应用程序之间共享数据,不幸的是,地图缓存模式解决方案不再有效。

如果我告诉您,使用内存中的数据,网格数据可以分布,集群中的每个节点(形成环形拓扑)除了计算远程函数或自定义谓词外,还可以访问和维护共享信息,会怎么样?你相信这是可能的吗?答案绝对是肯定的,这就是Ignite在其他解决方案中脱颖而出的地方。

让我们用两个实例(节点)测试IgniteSimpleDataGrid类:node1已经在运行,所以在我们可以再次启动应用程序(node2)之前,我们需要对for循环(代码段中的块)进行注释,因为node1已经用值填充了缓存,这意味着node2将只从缓存中读取。

1/* for (i <- 1 to 10) {
2    cache.put(i, s"value-$i")
3    } */
4   println(s"From cache 7 -> ${cache.get(7)}")
5   println(s"Not exists 20 -> ${cache.get(20)}")

注意node2中的输出:控制台打印的数据与第一个示例中的node1几乎相同(看看id,它是不同的)。

这意味着我们的Ignite in memory分布式键值存储集群已经启动并运行(不仅仅是一个简单的缓存,我们明白了原因)。

节点已经自我发现,并且所有节点都看到相同的数据。如果我们启动更多的节点,拓扑将继续增长,每个节点将分配一个唯一的id,服务器数量将增加(例如,服务器=3,服务器=4,等等)

如果我们停止节点1,拓扑将减少,其余正在运行的节点将打印此更改,因为它们正在侦听拓扑更改事件。

Ignite实现了高可用性,如果一个节点宕机,数据将在集群中重新平衡,而不涉及任何用户操作。

它是一个分布式内存缓存、查询和处理平台,用于实时处理大规模数据集(撇开数据流处理、Spark集成、机器学习网格、Ignite文件系统、持久性、事务…)

Ignite如何自动创建群集?它提供了TcpDiscoverySpi作为DiscoverySpi的默认实现,DiscoverySpi使用TCP/IP进行节点发现。Discovery SPI可以配置为基于多播和静态IP的节点发现(Spi=特别是Ip查找器)

这意味着节点使用多播查找彼此:

1 val igniteConfig = new IgniteConfiguration()
2 val spi = new TcpDiscoverySpi()
3 val ipFinder = new TcpDiscoveryMulticastIpFinder()
4 ipFinder.setMulticastGroup("228.10.10.157") // Default value = DFLT_MCAST_GROUP = "228.1.2.4";
5 spi.setIpFinder(ipFinder)
6 cfg.setDiscoverySpi(spi)
7 val ignite = Ignition.start(igniteConfig)
8 val cache: IgniteCache[Int, String] = ignite.getOrCreateCache[Int, String]("ignite")

在映像中,节点形成一个服务器节点环,但在Ignite集群内,节点可以具有角色并属于集群组,也可以是客户机节点(这意味着环的“外部”,并且不可能维护缓存数据)。这可能对外部应用程序有用。

节点可以向特定组中的其他节点广播消息:

1 val igniteCluster = ignite.cluster()
2 send%20to%20cluster%20remotes%20nodes%20%28ignoring%20this%29%20this%20code
3 ignite.compute(igniteCluster.forRemotes()).broadcast(
4      new IgniteRunnable {
5          override def run(): Unit = println(s"Hello node ${igniteCluster.localNode()},this message had been send by igniteCompute broadcast")
6       }
7)

如本例所示,当一个节点启动时,它会向集群组发送一条消息,以进行远程操作,并向已在mode=server模式下配置的所有其他节点(来自节点本身的节点除外)发送一条消息。刚创建集群时,您可能会在开始时的第一个节点上看到异常,但不要担心,因为这是正常的。

此外,还可以使用自定义属性定义节点:

1 igniteConfig.setUserAttributes(Map[String,Any]("ROLE" -> "MASTER").asJava)
2 ignite.compute(igniteCluster.forAttribute("ROLE","WORKER")).broadcast(new IgniteRunnable {
3        override def run(): Unit = println(s"Hello worker node ${igniteCluster.localNode()}," +s"this message had been send by ignite master node")
4})

在这里,节点的属性ROLE=MASTER,并且仅向ROLE=WORKER的节点广播

节点具有:

1 igniteConfig.setUserAttributes(Map[String,Any]("ROLE" -> "WORKER").asJava)

将收到以下信息:

只要您的群集处于活动状态,Ignite将保证不同群集节点之间的数据始终保持一致,无论崩溃或拓扑更改如何。

到目前为止还不错:我们已经看到了节点、拓扑、集群组、广播,现在让我们期待Ignite的最佳功能之一:SQL查询!!

听起来疯狂吗?是的,是的!超过你的数据!

Ignite完全符合ANSI-99标准,支持文本查询和基于谓词的扫描查询。

缓存查询

在开始使用代码之前,需要考虑以下几点:

添加依赖项

1 sbt = "org.apache.ignite" % "ignite-indexing" % "2.4.0"
2
3 maven = <dependency>
4            <groupId>org.apache.ignite</groupId>
5            <artifactId>ignite-indexing</artifactId>
6            <version>2.4.0</version>
7         </dependency>

告诉Ignite,允许在查询中使用哪些实体,这很简单,只需向类添加注释:

1 case class IotDevice(
2@(QuerySqlField@field)(index = true) name: String,
3@(QuerySqlField@field)(index = true) gpio: String,
4@(QueryTextField@field) sensorType: String,
5@(QueryTextField@field) model: String) 

这里我们说过,所有字段都可以在查询中使用,并在name和gpio属性上添加索引(就像在任何sql数据库中一样)

定义索引字段和可查询字段后,必须在SQL引擎中注册这些字段及其所属的对象类型。

1 val igniteConfig = new IgniteConfiguration()
2 val cacheConfig = new CacheConfiguration("ignite")
3 cacheConfig.setIndexedTypes(Seq(classOf[String], classOf[IotDevice]): _*)
4 igniteConfig.setCacheConfiguration(cacheConfig)
5 val ignite = Ignition.start(igniteConfig)
6 val cacheIot: IgniteCache[String, IotDevice] = ignite.getOrCreateCache[Int, String]("ignite")

例如,遵循物联网web服务器的思想,我们开发web服务器并将数据放入上面定义的缓存物联网。

1 val temp1 = IotDevice(name = "temp1", gpio = “123ASD", sensorType = "temperature", model = "test")
2 cacheIot.put(temp1.gpio,temp1)
3 val temp2 = IotDevice(name = "temp2", gpio = “456ASD", sensorType = "temperature", model = "test")
4 cacheIot.put(temp2.gpio,temp2)

现在用户调用方法:GET/devicesBySensorType?sensor=temperature

在我们的IoDevice案例中,sensorType类对查询有效,因此,我们可以在Ignite中以三种方式执行此查询:

简单sql:

1 val sqlText = s"sensorType = 'temperature'"
2 val sql = new SqlQuery[String, IotDevice](classOf[IotDevice], sqlText)
3 val temperatureQueryResult = cacheIot.query(sql).getAll.asScala.map(_.getValue)
4 println(s"SqlQuery = $temperatureQueryResult")

扫描查询:

1 val cursor = cacheIot.query(new ScanQuery(new IgniteBiPredicate[String, IotDevice] {
2        override def apply(key: String, entryValue: IotDevice) : Boolean = entryValue.sensorType == "temperature"
3 }))
4 val temperatureScanResult = cursor.getAll.asScala
5 println(s"ScanQuery = $temperatureScanResult")

基于Lucene索引的基于文本的查询,在这里查找sensorType==“temperature”的所有IOT设备(模型属性QueryTextField上的注释允许此查询,如果需要,Ignite支持多个QueryTextField)

1 val textQuery = new TextQuery[IotDevice, String](classOf[IotDevice], "temperature")
2 val temperatureTextResult = cacheIot.query(textQuery).getAll.asScala
3 println(s"TextQuery = $temperatureTextResult") //all devices with sensorType = temperature

这个是完整代码示例:

1import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
2import org.apache.ignite.cache.query.{ScanQuery, SqlQuery, TextQuery}
3import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
4import org.apache.ignite.lang.IgniteBiPredicate
5import org.apache.ignite.{IgniteCache, Ignition}
6import scala.annotation.meta.field
7import scala.collection.JavaConverters._
8
9object IgniteSql extends App {
10
11     val igniteConfig = new IgniteConfiguration()
12      val cacheConfig = new CacheConfiguration("ignite")
13     cacheConfig.setIndexedTypes(Seq(classOf[String], classOf\[IotDevice]): _*)
14     igniteConfig.setCacheConfiguration(cacheConfig)
15
16     val ignite = Ignition.start(igniteConfig)
17     val cacheIot: IgniteCache[String, IotDevice] = ignite.getOrCreateCache[String, IotDevice]("ignite")
18
19     val temp1 = IotDevice(name = "temp1", gpio = "123ASD", sensorType = "temperature", model = "testTemp")
20     cacheIot.put(temp1.gpio, temp1)
21     val temp2 = IotDevice(name = "temp2", gpio = "456ASD", sensorType = "temperature", model = "testTemp")
22     cacheIot.put(temp2.gpio, temp2)
23
24     val sqlText = s"sensorType = 'temperature'"
25     val sql = new SqlQuery[String, IotDevice](classOf[IotDevice], sqlText)
26     val temperatureQueryResult = cacheIot.query(sql).getAll.asScala.map(_.getValue)
27     println(s"SqlQuery = $temperatureQueryResult")
28
29     val cursor = cacheIot.query(new ScanQuery(new IgniteBiPredicate[String, IotDevice] {
30          override def apply(key: String, entryValue: IotDevice): Boolean = entryValue.sensorType == "temperature"}))
31
32     val temperatureScanResult = cursor.getAll.asScala
33     println(s"ScanQuery = $temperatureScanResult")
34
35     val textQuery = new TextQuery[IotDevice, String](classOf[IotDevice], "temperature")
36     val temperatureTextResult = cacheIot.query(textQuery).getAll.asScala
37     println(s"TextQuery = $temperatureTextResult") //all devices with sensorType = temperature
38
39}
40
41case class IotDevice(@(QuerySqlField@field)(index = true) name: String,
42@(QuerySqlField@field)(index = true) gpio: String,
43@(QueryTextField@field) sensorType: String,
44@(QueryTextField@field) model: String)

分区和复制

除了拥有集群和执行查询之外,我们可能会问自己:我们的数据当前在哪里?Ignite提供三种不同的缓存操作模式:分区、复制和本地:

分区:此模式是分布式缓存模式中最具可伸缩性的。在这种模式下,整个数据集被平均划分为多个分区,所有分区在参与节点之间被平均划分。这在处理大型数据集和频繁更新时非常理想。在此模式下,您还可以选择为缓存数据配置任意数量的备份节点。

1cacheCfg.setCacheMode(CacheMode.PARTITIONED)
2cacheCfg.setBackups(1);

复制:此模式成本较高,因为所有数据都复制到群集中的每个节点,并且每个数据更新都必须传播到可能影响性能和可伸缩性的所有其他节点。当我们处理小型数据集且更新不频繁时,此模式非常理想。

cacheCfg.setCacheMode(CacheMode.REPLICATED)

本地:此模式是缓存操作的最轻量级模式,因为没有数据分发到其他缓存节点。

内存特性

由于Ignite体系结构是“内存友好型”,因此RAM始终被视为第一内存层,所有处理都在该层进行。这样做的一些好处:

  • –基于堆外:数据和索引存储在Java堆外,因此只有应用程序代码可能触发垃圾收集操作。
  • –可预测的内存使用率:可以设置内存使用率。
  • –自动内存碎片整理:Apache Ignite尽可能高效地使用内存,并在后台执行碎片整理例程,从而避免碎片。
  • –提高了性能和内存利用率:所有数据和索引都以分页格式存储,在内存和磁盘上具有类似的表示形式,从而消除了对数据进行序列化或反序列化的任何需要。

结论

Ignite可以帮助您的分布式体系结构吗?将Ignite集成到已经投入生产的应用程序中是否成本高昂?

原文地址:https://www.signifytechnology.com/blog/2018/05/apache-ignite-more-than-a-simple-cache-by-gaston-lucero

  
 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2294.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册