Merge pull request #24 from yukimochi/reduce_redis_access

Add redis pub/sub support.
This commit is contained in:
Naoki Kosaka 2019-07-28 19:49:07 +09:00 committed by GitHub
commit 60a9fe0d41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 60 additions and 16 deletions

View File

@ -24,11 +24,13 @@ jobs:
name: build docker image
command: |
docker build -t ${DOCKER_USER}/activity-relay:edge .
docker tag ${DOCKER_USER}/activity-relay:edge ${DOCKER_USER}/activity-relay:$(echo ${CIRCLE_SHA1}|head -c7)
- run:
name: upload image to docker hub.
command: |
docker login --username=${DOCKER_USER} --password=${DOCKER_PASS}
docker push ${DOCKER_USER}/activity-relay:edge
docker push ${DOCKER_USER}/activity-relay:$(echo ${CIRCLE_SHA1}|head -c7)
workflows:
version: 2

View File

@ -1,6 +1,7 @@
package state
import (
"fmt"
"strings"
"github.com/go-redis/redis"
@ -19,9 +20,10 @@ const (
)
// NewState : Create new RelayState instance with redis client
func NewState(redisClient *redis.Client) RelayState {
func NewState(redisClient *redis.Client, notify bool) RelayState {
var config RelayState
config.RedisClient = redisClient
config.notify = notify
config.Load()
return config
@ -30,6 +32,7 @@ func NewState(redisClient *redis.Client) RelayState {
// RelayState : Store subscriptions and relay configrations
type RelayState struct {
RedisClient *redis.Client
notify bool
RelayConfig relayConfig `json:"relayConfig,omitempty"`
LimitedDomains []string `json:"limitedDomains,omitempty"`
@ -37,6 +40,21 @@ type RelayState struct {
Subscriptions []Subscription `json:"subscriptions,omitempty"`
}
func (config *RelayState) ListenNotify() {
go func() {
_, err := config.RedisClient.Subscribe("relay_refresh").Receive()
if err != nil {
panic(err)
}
ch := config.RedisClient.Subscribe("relay_refresh").Channel()
for range ch {
fmt.Println("Config refreshed from state changed notify.")
config.Load()
}
}()
}
// Load : Refrash content from redis
func (config *RelayState) Load() {
config.RelayConfig.load(config.RedisClient)
@ -84,8 +102,12 @@ func (config *RelayState) SetConfig(key Config, value bool) {
case CreateAsAnnounce:
config.RedisClient.HSet("relay:config", "create_as_announce", strValue).Result()
}
if config.notify {
config.RedisClient.Publish("relay_refresh", "Config refreshing request.")
} else {
config.Load()
}
}
// AddSubscription : Add new instance for subscription list
func (config *RelayState) AddSubscription(domain Subscription) {
@ -95,16 +117,24 @@ func (config *RelayState) AddSubscription(domain Subscription) {
"actor_id": domain.ActorID,
})
if config.notify {
config.RedisClient.Publish("relay_refresh", "Config refreshing request.")
} else {
config.Load()
}
}
// DelSubscription : Delete instance from subscription list
func (config *RelayState) DelSubscription(domain string) {
config.RedisClient.Del("relay:subscription:" + domain).Result()
config.RedisClient.Del("relay:pending:" + domain).Result()
if config.notify {
config.RedisClient.Publish("relay_refresh", "Config refreshing request.")
} else {
config.Load()
}
}
// SelectSubscription : Select instance from string
func (config *RelayState) SelectSubscription(domain string) *Subscription {
@ -124,8 +154,12 @@ func (config *RelayState) SetBlockedDomain(domain string, value bool) {
config.RedisClient.HDel("relay:config:blockedDomain", domain).Result()
}
if config.notify {
config.RedisClient.Publish("relay_refresh", "Config refreshing request.")
} else {
config.Load()
}
}
// SetLimitedDomain : Set/Unset instance for limited domain
func (config *RelayState) SetLimitedDomain(domain string, value bool) {
@ -135,8 +169,12 @@ func (config *RelayState) SetLimitedDomain(domain string, value bool) {
config.RedisClient.HDel("relay:config:limitedDomain", domain).Result()
}
if config.notify {
config.RedisClient.Publish("relay_refresh", "Config refreshing request.")
} else {
config.Load()
}
}
// Subscription : Instance subscription information
type Subscription struct {

View File

@ -32,7 +32,7 @@ func TestMain(m *testing.M) {
func TestInitialLoad(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
if testState.RelayConfig.BlockService != false {
t.Fatalf("Failed read config.")
@ -49,7 +49,7 @@ func TestInitialLoad(t *testing.T) {
func TestAddLimited(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
testState.SetLimitedDomain("example.com", true)
@ -79,7 +79,7 @@ func TestAddLimited(t *testing.T) {
func TestAddBlocked(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
testState.SetBlockedDomain("example.com", true)
@ -109,7 +109,7 @@ func TestAddBlocked(t *testing.T) {
func TestAddSubscription(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
testState.AddSubscription(Subscription{
Domain: "example.com",
@ -142,7 +142,7 @@ func TestAddSubscription(t *testing.T) {
func TestLoadCompatiSubscription(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
testState.AddSubscription(Subscription{
Domain: "example.com",
@ -167,7 +167,7 @@ func TestLoadCompatiSubscription(t *testing.T) {
func TestSetConfig(t *testing.T) {
redisClient.FlushAll().Result()
testState := NewState(redisClient)
testState := NewState(redisClient, false)
testState.SetConfig(BlockService, true)
if testState.RelayConfig.BlockService != true {

View File

@ -58,7 +58,7 @@ func initConfig() {
panic(err)
}
redisClient := redis.NewClient(redisOption)
relayState = state.NewState(redisClient)
relayState = state.NewState(redisClient, true)
var machineryConfig = &config.Config{
Broker: viper.GetString("redis_url"),
DefaultQueue: "relay",

View File

@ -5,12 +5,14 @@ import (
"testing"
"github.com/spf13/viper"
state "github.com/yukimochi/Activity-Relay/State"
)
func TestMain(m *testing.M) {
viper.Set("actor_pem", "../misc/testKey.pem")
viper.Set("relay_domain", "relay.yukimochi.example.org")
initConfig()
relayState = state.NewState(relayState.RedisClient, false)
relayState.RedisClient.FlushAll().Result()
code := m.Run()

View File

@ -176,7 +176,6 @@ func handleInbox(writer http.ResponseWriter, request *http.Request, activityDeco
writer.WriteHeader(400)
writer.Write(nil)
} else {
relayState.Load()
domain, _ := url.Parse(activity.Actor)
switch activity.Type {
case "Follow":

View File

@ -58,7 +58,8 @@ func initConfig() {
panic(err)
}
redisClient := redis.NewClient(redisOption)
relayState = state.NewState(redisClient)
relayState = state.NewState(redisClient, true)
relayState.ListenNotify()
machineryConfig := &config.Config{
Broker: viper.GetString("redis_url"),
DefaultQueue: "relay",

View File

@ -5,12 +5,14 @@ import (
"testing"
"github.com/spf13/viper"
state "github.com/yukimochi/Activity-Relay/State"
)
func TestMain(m *testing.M) {
viper.Set("actor_pem", "misc/testKey.pem")
viper.Set("relay_domain", "relay.yukimochi.example.org")
initConfig()
relayState = state.NewState(relayState.RedisClient, false)
// Load Config
code := m.Run()