Browse Source

Merge pull request #191 from mbaxter/0.8.2

Release 0.8.2
master 0.8.2
mbaxter 2 months ago
committed by GitHub
parent
commit
a0ece471cf
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      README.md
  2. 2
      build.gradle.kts
  3. 5
      src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
  4. 15
      src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
  5. 53
      src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt

4
README.md

@ -70,7 +70,7 @@ Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.c
maven { url "https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/" }
}
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.1-RELEASE'
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.2-RELEASE'
```
### Using Maven
Add the repository to the `dependencyManagement` section of the pom file:
@ -96,7 +96,7 @@ And then add jvm-libp2p as a dependency:
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.8.1-RELEASE</version>
<version>0.8.2-RELEASE</version>
<type>pom</type>
</dependency>
```

2
build.gradle.kts

@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>
group = "io.libp2p"
version = "0.8.1-RELEASE"
version = "0.8.2-RELEASE"
description = "a minimal implementation of libp2p for the jvm"
plugins {

5
src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt

@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.Connection
import io.libp2p.core.ConnectionHandler
import io.libp2p.core.P2PChannel
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.multistream.ProtocolBinding
import io.libp2p.core.multistream.ProtocolDescriptor
@ -23,6 +24,10 @@ class Gossip @JvmOverloads constructor(
router.score.updateTopicParams(scoreParams)
}
fun getGossipScore(peerId: PeerId): Double {
return router.score.getCachedScore(peerId)
}
override val protocolDescriptor =
if (router.protocol == PubsubProtocol.Gossip_V_1_1)
ProtocolDescriptor(

15
src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt

@ -10,6 +10,7 @@ import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
@ -114,6 +115,10 @@ class GossipScore(
}
inner class PeerScores {
// Cached values are accessed across threads
@Volatile
var cachedScore: Double = 0.0
val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0
@ -132,7 +137,7 @@ class GossipScore(
val topicParams = params.topicsScoreParams
private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()
val refreshTask: ScheduledFuture<*>
@ -182,7 +187,9 @@ class GossipScore(
if (behaviorExcess < 0) 0.0
else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight
return topicsScore + appScore + ipColocationPenalty + routerPenalty
val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty
peerScore.cachedScore = computedScore
return computedScore
}
fun refreshScores() {
@ -193,6 +200,10 @@ class GossipScore(
}
}
fun getCachedScore(peerId: PeerId): Double {
return peerScores[peerId]?.cachedScore ?: 0.0
}
fun notifyDisconnected(peer: P2PService.PeerHandler) {
getPeerScores(peer).topicScores.filter { it.value.inMesh() }.forEach { t, _ ->
notifyPruned(peer, t)

53
src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt

@ -186,6 +186,59 @@ class GossipScoreTest {
assertThat(score.score(peer)).isEqualTo(1.0)
}
@Test
fun `test getCachedScore returns expected values`() {
val peer = mockPeer()
// Setup score params with topic config
val peerScoreParams = GossipPeerScoreParams.builder().build()
val topic: Topic = "testTopic"
val topicScoreParams = GossipTopicScoreParams.builder()
.topicWeight(1.0)
.firstMessageDeliveriesCap(4.0)
.firstMessageDeliveriesWeight(2.0)
.firstMessageDeliveriesDecay(0.5)
.build()
val defaultScoreParams = GossipTopicScoreParams.builder().build()
val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams)))
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
topicsScoreParams = topicsScoreParams
)
// Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted
// as empty
val timeController = TimeControllerImpl()
timeController.addTime(1.hours)
val executor = ControlledExecutorServiceImpl(timeController)
// Check initial value
val score = GossipScore(scoreParams, executor, { timeController.time })
// Check value before interacting with peer
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)
// Check value after accessing score
assertEquals(0.0, score.score(peer))
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)
// Add peer to mesh
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)
// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
assertThat(score.score(peer)).isEqualTo(2.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(2.0)
// Refresh to decay score
score.refreshScores()
assertThat(score.score(peer)).isEqualTo(1.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(1.0)
}
@Test
fun `test mesh message delivery decay`() {
val peer = mockPeer()

Loading…
Cancel
Save