go-nntp-plusplus/examples/couchserver/couchserver.go

345 lines
7.6 KiB
Go

package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"log/syslog"
"net"
"net/textproto"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"git.maride.cc/maride/go-nntp-plusplus"
"git.maride.cc/maride/go-nntp-plusplus/server"
"github.com/dustin/go-couch"
)
var groupCacheTimeout = flag.Int("groupTimeout", 300,
"Time (in seconds), group cache is valid")
var optimisticPost = flag.Bool("optimistic", false,
"Optimistically return success on store before storing")
var useSyslog = flag.Bool("syslog", false,
"Log to syslog")
type groupRow struct {
Group string `json:"key"`
Value []interface{} `json:"value"`
}
type groupResults struct {
Rows []groupRow
}
type attachment struct {
Type string `json:"content-type"`
Data []byte `json:"data"`
}
func removeSpace(r rune) rune {
if r == ' ' || r == '\n' || r == '\r' {
return -1
}
return r
}
func (a *attachment) MarshalJSON() ([]byte, error) {
m := map[string]string{
"content_type": a.Type,
"data": strings.Map(removeSpace, base64.StdEncoding.EncodeToString(a.Data)),
}
return json.Marshal(m)
}
type article struct {
MsgID string `json:"_id"`
DocType string `json:"type"`
Headers map[string][]string `json:"headers"`
Bytes int `json:"bytes"`
Lines int `json:"lines"`
Nums map[string]int64 `json:"nums"`
Attachments map[string]*attachment `json:"_attachments"`
Added time.Time `json:"added"`
}
type articleResults struct {
Rows []struct {
Key []interface{} `json:"key"`
Article article `json:"doc"`
}
}
type couchBackend struct {
db *couch.Database
groups map[string]*nntp.Group
grouplock sync.Mutex
}
func (cb *couchBackend) clearGroups() {
cb.grouplock.Lock()
defer cb.grouplock.Unlock()
log.Printf("Dumping group cache")
cb.groups = nil
}
func (cb *couchBackend) fetchGroups() error {
cb.grouplock.Lock()
defer cb.grouplock.Unlock()
if cb.groups != nil {
return nil
}
log.Printf("Filling group cache")
results := groupResults{}
err := cb.db.Query("_design/groups/_view/active", map[string]interface{}{
"group": true,
}, &results)
if err != nil {
return err
}
cb.groups = make(map[string]*nntp.Group)
for _, gr := range results.Rows {
if gr.Value[0].(string) != "" {
group := nntp.Group{
Name: gr.Group,
Description: gr.Value[0].(string),
Count: int64(gr.Value[1].(float64)),
Low: int64(gr.Value[2].(float64)),
High: int64(gr.Value[3].(float64)),
Posting: nntp.PostingPermitted,
}
cb.groups[group.Name] = &group
}
}
go func() {
time.Sleep(time.Duration(*groupCacheTimeout) * time.Second)
cb.clearGroups()
}()
return nil
}
func (cb *couchBackend) ListGroups(max int) ([]*nntp.Group, error) {
if cb.groups == nil {
if err := cb.fetchGroups(); err != nil {
return nil, err
}
}
rv := make([]*nntp.Group, 0, len(cb.groups))
for _, g := range cb.groups {
rv = append(rv, g)
}
return rv, nil
}
func (cb *couchBackend) GetGroup(name string) (*nntp.Group, error) {
if cb.groups == nil {
if err := cb.fetchGroups(); err != nil {
return nil, err
}
}
g, exists := cb.groups[name]
if !exists {
return nil, nntpserver.ErrNoSuchGroup
}
return g, nil
}
func (cb *couchBackend) mkArticle(ar article) *nntp.Article {
url := fmt.Sprintf("%s/%s/article", cb.db.DBURL(), cleanupID(ar.MsgID, true))
return &nntp.Article{
Header: textproto.MIMEHeader(ar.Headers),
Body: &lazyOpener{url, nil, nil},
Bytes: ar.Bytes,
Lines: ar.Lines,
}
}
func (cb *couchBackend) GetArticle(group *nntp.Group, id string) (*nntp.Article, error) {
var ar article
if intid, err := strconv.ParseInt(id, 10, 64); err == nil {
results := articleResults{}
cb.db.Query("_design/articles/_view/list", map[string]interface{}{
"include_docs": true,
"reduce": false,
"key": []interface{}{group.Name, intid},
}, &results)
if len(results.Rows) != 1 {
return nil, nntpserver.ErrInvalidArticleNumber
}
ar = results.Rows[0].Article
} else {
err := cb.db.Retrieve(cleanupID(id, false), &ar)
if err != nil {
return nil, nntpserver.ErrInvalidMessageID
}
}
return cb.mkArticle(ar), nil
}
func (cb *couchBackend) GetArticles(group *nntp.Group,
from, to int64) ([]nntpserver.NumberedArticle, error) {
rv := make([]nntpserver.NumberedArticle, 0, 100)
results := articleResults{}
cb.db.Query("_design/articles/_view/list", map[string]interface{}{
"include_docs": true,
"reduce": false,
"start_key": []interface{}{group.Name, from},
"end_key": []interface{}{group.Name, to},
}, &results)
for _, r := range results.Rows {
rv = append(rv, nntpserver.NumberedArticle{
Num: int64(r.Key[1].(float64)),
Article: cb.mkArticle(r.Article),
})
}
return rv, nil
}
func (cb *couchBackend) AllowPost() bool {
return true
}
func cleanupID(msgid string, escapedAt bool) string {
s := strings.TrimFunc(msgid, func(r rune) bool {
return r == ' ' || r == '<' || r == '>'
})
qe := url.QueryEscape(s)
if escapedAt {
return qe
}
return strings.Replace(qe, "%40", "@", -1)
}
func (cb *couchBackend) Post(art *nntp.Article) error {
a := article{
DocType: "article",
Headers: map[string][]string(art.Header),
Nums: make(map[string]int64),
MsgID: cleanupID(art.Header.Get("Message-Id"), false),
Attachments: make(map[string]*attachment),
Added: time.Now(),
}
b := []byte{}
buf := bytes.NewBuffer(b)
n, err := io.Copy(buf, art.Body)
if err != nil {
return err
}
log.Printf("Read %d bytes of body", n)
b = buf.Bytes()
a.Bytes = len(b)
a.Lines = bytes.Count(b, []byte{'\n'})
a.Attachments["article"] = &attachment{"text/plain", b}
for _, g := range strings.Split(art.Header.Get("Newsgroups"), ",") {
g = strings.TrimSpace(g)
group, err := cb.GetGroup(g)
if err == nil {
a.Nums[g] = atomic.AddInt64(&group.High, 1)
atomic.AddInt64(&group.Count, 1)
} else {
log.Printf("Error getting group %q: %v", g, err)
}
}
if len(a.Nums) == 0 {
log.Printf("Found no matching groups in %v",
art.Header["Newsgroups"])
return nntpserver.ErrPostingFailed
}
if *optimisticPost {
go func() {
_, _, err = cb.db.Insert(&a)
if err != nil {
log.Printf("error optimistically posting article: %v", err)
}
}()
} else {
_, _, err = cb.db.Insert(&a)
if err != nil {
log.Printf("error posting article: %v", err)
return nntpserver.ErrPostingFailed
}
}
return nil
}
func (cb *couchBackend) Authorized() bool {
return true
}
func (cb *couchBackend) Authenticate(user, pass string) (nntpserver.Backend, error) {
return nil, nntpserver.ErrAuthRejected
}
func maybefatal(err error, f string, a ...interface{}) {
if err != nil {
log.Fatalf(f, a...)
}
}
func main() {
couchURL := flag.String("couch", "http://localhost:5984/news",
"Couch DB.")
flag.Parse()
if *useSyslog {
sl, err := syslog.New(syslog.LOG_INFO, "nntpd")
if err != nil {
log.Fatalf("Error initializing syslog: %v", err)
}
log.SetOutput(sl)
log.SetFlags(0)
}
a, err := net.ResolveTCPAddr("tcp", ":1119")
maybefatal(err, "Error resolving listener: %v", err)
l, err := net.ListenTCP("tcp", a)
maybefatal(err, "Error setting up listener: %v", err)
defer l.Close()
db, err := couch.Connect(*couchURL)
maybefatal(err, "Can't connect to the couch: %v", err)
err = ensureViews(&db)
maybefatal(err, "Error setting up views: %v", err)
backend := couchBackend{
db: &db,
}
s := nntpserver.NewServer(&backend)
for {
c, err := l.AcceptTCP()
maybefatal(err, "Error accepting connection: %v", err)
go s.Process(c)
}
}