Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions marconi-mamba/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ module Main where

import Options.Applicative qualified as Opt

import Cardano.Api qualified as C
import Control.Applicative (optional)
import Control.Concurrent.Async (race_)
import Marconi.Api.Types (CliArgs (CliArgs), TargetAddresses)
import Marconi.Bootstrap (bootstrapHttp, bootstrapJsonRpc, bootstrapUtxoIndexers, targetAddressParser)
import Marconi.Api.Types (CliArgs (CliArgs))
import Marconi.Bootstrap (bootstrapHttp, bootstrapJsonRpc, bootstrapUtxoIndexers)
import Marconi.CLI (pAddressesParser, pNetworkId)


args :: Opt.Parser CliArgs
Expand All @@ -19,37 +19,19 @@ args = CliArgs
Opt.long "http-port" <> Opt.metavar "HTTP-PORT" <> Opt.help "JSON-RPC http port number, default is port 3000.")
<*> pNetworkId
<*> pAddressesParser
-- TODO: `pNetworkId` and `pTestnetMagic` are copied from
-- https://github.com/input-output-hk/cardano-node/blob/988c93085022ed3e2aea5d70132b778cd3e622b9/cardano-cli/src/Cardano/CLI/Shelley/Parsers.hs#L2009-L2027
-- Use them from there whenever they are exported.
pNetworkId :: Opt.Parser C.NetworkId
pNetworkId = pMainnet Opt.<|> fmap C.Testnet pTestnetMagic

pAddressesParser :: Opt.Parser TargetAddresses
pAddressesParser = targetAddressParser <$> Opt.strOption
(Opt.long "addresses-to-index"
<> Opt.metavar "Address"
<> Opt.help ("White space separated list of addresses to index."
<> " i.e \"address-1 address-2 address-3 ...\"" ) )

pMainnet :: Opt.Parser C.NetworkId
pMainnet = Opt.flag' C.Mainnet (Opt.long "mainnet" <> Opt.help "Use the mainnet magic id.")

pTestnetMagic :: Opt.Parser C.NetworkMagic
pTestnetMagic = C.NetworkMagic <$> Opt.option Opt.auto
(Opt.long "testnet-magic"
<> Opt.metavar "NATURAL"
<> Opt.help "Specify a testnet magic id.")

opts :: Opt.ParserInfo CliArgs
opts = Opt.info (args Opt.<**> Opt.helper)
( Opt.fullDesc <> Opt.header "marconi-mamba - Cardano blockchain indexer" )

-- | concurrently start:
-- JSON-RPC server
-- marconi utxo worker
-- Exceptions in either thread will end the program
--
main :: IO ()
main = do
cli@(CliArgs _ utxoDbPath maybePort _ tAddress) <- Opt.execParser opts
putStrLn $ "Processing addresses:\n " <> show tAddress <> "\n"
rpcEnv <- bootstrapJsonRpc utxoDbPath maybePort tAddress
cli@(CliArgs _ utxoDbPath maybePort nId tAddress) <- Opt.execParser opts
rpcEnv <- bootstrapJsonRpc utxoDbPath maybePort tAddress nId
race_
(bootstrapHttp rpcEnv) -- start http server
(bootstrapUtxoIndexers cli rpcEnv)
64 changes: 64 additions & 0 deletions marconi-mamba/db-utils/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
db-utils
===
A [SQLite](https://www.sqlite.org/index.html) small utility haskell project for [marconi](../marconi)

## What does it do
In summary, db-utils, is an internal tool used to acquire valid [Bech32](https://github.com/bitcoin/bips/blob/master/bip-0173.mediawiki#Bech32) [Shelley addresses](https://cips.cardano.org/cips/cip19/#shelleyaddresses) with [Utxos](https://iohk.io/en/research/library/papers/the-extended-utxo-model/).

We store the valid addresses alongside utxos table in marconi SQLite instance in shelleyaddresses table.

``` sh
$ DB sqlite3 ./.marconidb/utxodb ".tables" ".exit"
frequtxos shelleyaddresses spent utxos
```

``` sh
$ DB sqlite3 ./.marconidb/utxodb ".schema shelleyaddresses" ".exit"
CREATE TABLE shelleyaddresses (address text not null, frequency int not null);
```

``` sh
$ sqlite3 ./.marconidb/utxodb "select * from shelleyaddresses limit 5;" ".exit"
addr1q837de0y7j3ncegph2a8mc0e86q9evwtekd3ejhlpr97wclrumj7fa9r83jsrw460hslj05qtjcuhnvmrn907zxtua3skv7yyl|2535
addr1qxyxudgzljnnaqghm8hlnpp36uvfr68a8k6uemumgjdcua4y7d04xcx9hnk05lnl6m9ptd9h3pj9vvg2xe4j354uh8vsarpydn|2321
addr1qytr6ma495fkqpfnd7gk5kwmtfdh084xvzn7rv83ha87qq6yfm3y8yv39lcrqc6ej3zdzvef4aj3dv3pq2snakkcwscsfyrn3g|2317
addr1qxt2ggq005kfm3uwe89emy3ka2zgdtrpxfarvz6033l3fqvk5ssq7lfvnhrcajwtnkfrd65ys6kxzvn6xc95lrrlzjqsjttk32|1353
addr1qyhat6v7w65799pkc8ff3mjcwk79kqs8gv8t4expd67f9seqksv3earfx6skxkdhe4hcekjkj0x333dd76u8re8cmg2qwrdzn2|1155

```

### Why do we need this
The need for this utility is from the desire to test marconi in a live network with valid addresses that may potentially correspond to a large list of Utxos. Here are the high level requirements:

+ acquire valid Bech32 Shelley addresses from a given [Cardano network](https://docs.cardano.org/explore-cardano/cardano-network/about-the-cardano-network).
+ rank the addresses based on the number of links they have to Utxos
+ store the addresses for future used
+ addresses need to be store in Text format
+ the utility may not cause any down time to the cardano node

## Installation and execution

+ build the project
+ execute the project

### Assumption
+ cardano node is running
+ marconi is running

### Build

``` sh
git clone git@github.com:input-output-hk/plutus-apps.git
cd plutus-apps
cabal build marconi
cabal build marconi-mamba
cabal build db-utils
```
### Execution

``` sh
$(cabal exec -- which db-utils-exe)
```

## Caution
This utility is under heavy development and is for internal use at this time!
41 changes: 41 additions & 0 deletions marconi-mamba/db-utils/exe/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- | Light exe wrapper for SqlUtils
--
module Main where
import Cardano.Api (NetworkId)
import Marconi.Api.Types (TargetAddresses)
import Marconi.Api.UtxoIndexersQuery qualified as Q.Utxo
import Marconi.CLI (pNetworkId, targetAddressParser)
import Marconi.DB.SqlUtils (freqShelleyTable, freqUtxoTable)
import Options.Applicative (Parser, execParser, help, helper, info, long, metavar, short, showDefault, strOption, value,
(<**>))

data CliOptions = CliOptions
{ utxoPath :: FilePath -- ^ path to utxo sqlite database
, networkId :: NetworkId -- ^ network id used for address conversions
}

-- |
bech32Addresses :: String -- ^ valid address to keep track of
bech32Addresses = "addr1w9645geguy679dvy73mgt6rvc4xyhjpxj4s0wxjtd6swvdc5dxgc3"

cliParser :: Parser CliOptions
cliParser = CliOptions
<$> strOption (long "utxo-db"
<> short 'd'
<> metavar "FILENAME"
<> showDefault
<> value "./.marconidb/2/utxodb"
<> help "Path to the utxo database.")
<*> pNetworkId

fakeAddresses :: TargetAddresses
fakeAddresses = targetAddressParser bech32Addresses

main :: IO ()
main = do
(CliOptions dbpath networkid) <- execParser $ info (cliParser <**> helper) mempty
dbEnv <- Q.Utxo.bootstrap dbpath fakeAddresses networkid
freqUtxoTable dbEnv
as <- freqShelleyTable dbEnv
print (length as)
print "end"
100 changes: 100 additions & 0 deletions marconi-mamba/db-utils/src/Marconi/DB/SqlUtils.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

-- | This module extracts Shelley addresses from a utxo SQLite database.
-- Addresses are:
-- store in shelleyaddresses table
-- stored in `Text` Bech32 format, Shelly addresses
-- ranked on their corresponding number of utxos
--
-- to get a sample of the data :
-- sqlite3 ./.marconidb/2/utxo-db "select * from shelleyaddresses limit 10;" ".exit"
module Marconi.DB.SqlUtils where

import Cardano.Api qualified as C
import Control.Concurrent.Async (forConcurrently_)
import Control.Exception (bracket_)
import Control.Lens ((^.))
import Control.Monad (void)
import Data.Maybe (catMaybes)
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Database.SQLite.Simple (Connection, execute, execute_, query_)
import Database.SQLite.Simple.FromField (FromField)
import Database.SQLite.Simple.FromRow (FromRow (fromRow), field)
import Database.SQLite.Simple.ToField (ToField)
import Database.SQLite.Simple.ToRow (ToRow (toRow))
import GHC.Generics (Generic)
import Marconi.Api.Types (DBConfig, DBQueryEnv, HasDBQueryEnv (dbConf, network), utxoConn)

-- | Represents Shelley type addresses with most utxo transactions
--
data ShelleyFrequencyTable a = ShelleyFrequencyTable
{ _sAddress :: !a
, _sFrequency :: Int
} deriving Generic

instance (FromField a) => FromRow (ShelleyFrequencyTable a) where
fromRow = ShelleyFrequencyTable <$> field <*> field
instance (ToField a) => ToRow (ShelleyFrequencyTable a )where
toRow (ShelleyFrequencyTable ad f) = toRow(ad, f)

-- | create a small SQL pipeline:
-- first create a table of addresses and their coresponding utxo counts.
-- Next, create the shelleyaddresses table
--
freqUtxoTable :: DBQueryEnv -> IO ()
freqUtxoTable env =
void $ withQueryAction (env ^. dbConf) ( \conn ->
execute_ conn "drop table if exists frequtxos"
>> execute_ conn "drop table if exists shelleyaddresses"
>> execute_ conn "create table frequtxos as select address, count (address) as frequency from utxos group by address order by frequency DESC"
>> execute_ conn "delete from frequtxos where frequency < 50" -- we only want `intersing` data
>> execute_ conn
"create TABLE shelleyaddresses (address text not null, frequency int not null)")

withQueryAction :: DBConfig -> (Connection -> IO a) -> IO a
withQueryAction conf action =
let
f = do
now <- getCurrentTime
putStrLn $ "queryAction started at: " <> show now
g = do
now <- getCurrentTime
putStrLn $ "queryAction completed at: " <> show now
in
bracket_ f g (action (utxoConn conf ))

-- | populate the shelleyFrequency table
-- first create a table of addresses and their coresponding utxo counts.
-- Next, create the shelleyaddresses table
--
freqShelleyTable :: DBQueryEnv -> IO [Text]
freqShelleyTable env = do
addressFreq <- withQueryAction (env ^. dbConf)( \conn -> (query_ conn
"SELECT address, frequency FROM frequtxos") :: IO [ShelleyFrequencyTable C.AddressAny])
let addresses = catMaybes . fmap (toShelley ( env ^. network) ) $ addressFreq

withQueryAction (env ^. dbConf) ( \conn -> (
-- execute_ conn "BEGIN TRANSACTION"
forConcurrently_ addresses ( \(ShelleyFrequencyTable a f) ->
(execute conn
"insert into shelleyaddresses (address, frequency) values (?, ?)"
(a, f))))
-- >> execute_ conn "COMMIT"
)
pure . fmap _sAddress $ addresses

-- | we want to store addresses as Text.
-- first conver to cardano address, then seriase to text
--
toShelley :: C.NetworkId -> ShelleyFrequencyTable C.AddressAny -> Maybe (ShelleyFrequencyTable Text)
toShelley _ (ShelleyFrequencyTable (C.AddressShelley a) f) =
let
addrTxt = C.serialiseAddress a
in
Just ( ShelleyFrequencyTable addrTxt f)
toShelley _ _ = Nothing
2 changes: 1 addition & 1 deletion marconi-mamba/examples/json-rpc-client/src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ module Main where
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.Proxy (Proxy (Proxy))
import Ledger.Tx (TxOutRef)
import Marconi.Client.Types (JsonRpcResponse)
import Marconi.JsonRpc.Types (JsonRpc, JsonRpcNotification, RawJsonRpc)
import Marconi.Types (TxOutRef)
import Network.HTTP.Client (defaultManagerSettings, newManager)
import Servant.API (Get, NoContent, PlainText, Post, ReqBody, (:<|>) ((:<|>)), (:>))
import Servant.Client (ClientM, client, mkClientEnv, parseBaseUrl, runClientM)
Expand Down
101 changes: 85 additions & 16 deletions marconi-mamba/examples/json-rpc-server/src/Main.hs
Original file line number Diff line number Diff line change
@@ -1,29 +1,98 @@
{-
-- Sample JSON-RPC server program
-- uncomment TODO and provide adequte data
--
-}
module Main where

import Marconi.Api.Types (TargetAddresses)
import Cardano.Api (NetworkId (Mainnet))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (race_)
import Control.Concurrent.STM (atomically, putTMVar, takeTMVar, tryReadTMVar)
import Control.Exception (bracket_)
import Control.Lens.Operators ((^.))
import Control.Monad (unless)
import Data.List (intercalate)
import Marconi.Api.Types (DBQueryEnv, HasDBQueryEnv (queryComm), HasJsonRpcEnv (queryEnv),
HasUtxoQueryComm (indexer, queryReq), TargetAddresses)
import Marconi.Bootstrap (bootstrapHttp, bootstrapJsonRpc, targetAddressParser)
import Marconi.Index.Utxo (Depth (Depth), open)
import Options.Applicative (Parser, execParser, help, helper, info, long, metavar, short, showDefault, strOption, value,
(<**>))

{-
-- white space separated list of addresses
-}
bech32DefaultAddresses :: String -- ^ valid address to keep track of
bech32DefaultAddresses = intercalate " "
["addr_test1vpfwv0ezc5g8a4mkku8hhy3y3vp92t7s3ul8g778g5yegsgalc6gc"
, "addr_test1vp8cprhse9pnnv7f4l3n6pj0afq2hjm6f7r2205dz0583egagfjah"
, "addr_test1wpzvcmq8yuqnnzerzv0u862hmc4tc8xlm74wtsqmh56tgpc3pvx0f"
, "addr_test1vrvf7yfr2h79mtzqrpcn0ql98xrhs63k85w64u8py7709zsm6tsr6"
-- , "addr_test1qz4ll7yrah8h5t3cv2qptn4mw22judsm9j9zychhmtuuzmszd3hm6w02uxx6h0s3qgbd4hxgpvd0qzklnmahcx7v0mcysptyj8l"
, "addr_test1qpxtftdcvh55twn2lum4uylxshzls6489q89pft3feesq2m8an0x234jtsqra93hcefmut6cyd6cdn535nkjukhph47ql6uvg7"
, "addr_test1wqgden0j2d7pkqy3hu6kcj32swazzy6wg93a8c46pndptncdmz6tq"
, "addr_test1qpe6s9amgfwtu9u6lqj998vke6uncswr4dg88qqft5d7f67kfjf77qy57hqhnefcqyy7hmhsygj9j38rj984hn9r57fswc4wg0"
, "addr_test1qp9juu544jrkel0hp6pvrjud70h9ev3ajw2fuvezyv8ug3eg3292mxuf3kq7nysjumlxjrlsfn9tp85r0l54l29x3qcsytvx58"
, "addr_test1qqddk5xnz08mxsqw6jdaenvhdah835lhvm62tt5lydk2as7kfjf77qy57hqhnefcqyy7hmhsygj9j38rj984hn9r57fs066hcl"
, "addr_test1qpp0xdvdexl4t3ur4svdkv3jjs2gy8fu6h9mrrsgvm7r20cmrhy4p5ukjknv23jy95nhsjsnud6fxkjqxp5ehvn8h0es2su3gt"
, "addr_test1wz3937ykmlcaqxkf4z7stxpsfwfn4re7ncy48yu8vutcpxgnj28k0"
, "addr_test1vqslp49gcrvah8c9vjpxm4x9j7s2m4zw0gkscqh0pqjg4wcjzvcfr"
, "addr_test1qzu6at6s7r7yzpvw3mm2tsas34mc9nzrhda89w59wd78lfaw8084xldqyrvxe38z4wxqqdr9h86t8ruut8rrfezdftpstsnrd8"
, "addr_test1vz8q8ymsty33sw7mh4s2wxj2pe4mcrlchxxa37z70l348cqk95hdw"
, "addr_test1qz8q8ymsty33sw7mh4s2wxj2pe4mcrlchxxa37z70l348cyjd3dlf08q9usapw5gt5t8cp8lju7wtwqzk5cj0gxaxyss6w8n66"
, "addr_test1qqwh05w69g95065zlr4fef4yfpjkv8r3dr9h3pu0egy5n7pwhxp4f95svdjr9dmtqumqcs6v49s6pe7ap4h2nv8rcaasgrkndk"
, "addr_test1qrr6urmwy2nle3xppnjg9xcukasrwfyfjv9eqh97km84ra53q4hau90tjeldx0mv9eka2z73t9727xl8jny3cy8zetqsdjctpd"
, "addr_test1wrn2wfykuhswv4km08w0zcl5apmnqha0j24fa287vueknasq6t4hc"
,"addr_test1qr30nkfx28r452r3006kytnpvn39zv7c2m5uqt4zrg35mly35pesdyk43wnxk3edkkw74ak56n4zh67reqjhcfp3mm7qtyekt4"
, "addr_test1wr9gquc23wc7h8k4chyaad268mjft7t0c08wqertwms70sc0fvx8w"
, "addr_test1vqeux7xwusdju9dvsj8h7mca9aup2k439kfmwy773xxc2hcu7zy99"
]
defaultDbpath :: FilePath
defaultDbpath = "./.marconidb/4/utxo-db"

-- TODO
bech32Addresses :: String -- ^ valid address to keep track of
bech32Addresses = "bc1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3qccfmv3"
data CliOptions = CliOptions
{ _utxoPath :: FilePath -- ^ path to utxo sqlite database
, _addresses :: TargetAddresses
}

-- TODO
dbpath :: FilePath -- ^ valid SQLite marconi UTxo database path
dbpath = "./utxoDB"
cliParser :: Parser CliOptions
cliParser = CliOptions
<$> strOption (long "utxo-db"
<> short 'd'
<> metavar "FILENAME"
<> showDefault
<> value defaultDbpath
<> help "Path to the utxo database.")
<*> pAddressesParser

addresses :: TargetAddresses
addresses = targetAddressParser bech32Addresses
pAddressesParser :: Parser TargetAddresses
pAddressesParser = targetAddressParser <$> strOption
(long "addresses-to-index"
<> metavar "Address"
<> showDefault
<> value bech32DefaultAddresses
<> help ("White space separated list of addresses to index."
<> " i.e \"address-1 address-2 address-3 ...\"" ) )

main :: IO ()
main = do
putStrLn "Starting the Example rpc http-server on port 3000 example"
env <- bootstrapJsonRpc dbpath Nothing addresses
bootstrapHttp env
(CliOptions dbpath addresses) <- execParser $ info (cliParser <**> helper) mempty
putStrLn $ "Starting the Example RPC http-server:"
<>"\nport =" <> show (3000 :: Int)
<> "\nutxo-db =" <> dbpath
<> "\nnumber of addresses to index = " <> show (length addresses)

env <- bootstrapJsonRpc dbpath Nothing addresses Mainnet
race_ (bootstrapHttp env) (mocUtxoIndexer (env ^. queryEnv) )

mocUtxoIndexer :: DBQueryEnv -> IO ()
mocUtxoIndexer env = open "" (Depth 4) >>= innerLoop
where
utxoIndexer = env ^. queryComm . indexer
qreq = env ^. queryComm . queryReq
innerLoop ix = do
isquery <- atomically $ tryReadTMVar qreq
unless (null isquery) $ bracket_
(atomically (takeTMVar qreq ) )
(atomically (takeTMVar qreq ) )
(atomically (putTMVar utxoIndexer ix) )
Comment on lines +63 to +65
Copy link
Contributor

@berewt berewt Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronisation here looks expensive: when a query arrives, we wait for it to release a semaphore, to release an ix to release its action before we wait for the end of the transaction.
The first step looks unnecessary to me, but as always concurrency is complex and I may be wrong (if I'm right removing the initialisation in this bracket and the one in the query shouldn't impact the behaviour and save us a bit of time).

Copy link
Collaborator Author

@kayvank kayvank Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking a detailed look at this part, as I could certainly use another pair of eyes here. Although this section is from the toy example demo, its counterpart, indexer.hs is a critical part of the flow, since the potential of locking SQLite is present.
The main things here are:

  • queries take precedence over inserts.
  • SQLite locks when concurrently performing inserts/queries, concurrent inserts or concurrent queries are OK, mixing them is not.

The section of the code you’re referring to is in the toy json-rpc demo and is mocking the Marconi insert operation, so the code resembles that flow from indexer.hs

When the query arrives, we need to stop the inserts and resume them only after they're completed from both cold and hot storage. The initialization section of the bracket, stops Marconi from inserting, and the closing part releases the lock only after queries have been completed.
Changing this section of the code will require changing the indexer.hs section as well. I’ll be happy to discuss that section with you further as it be great to get more performance from that section. However, since that is a more significant change, and not part of your comment, I suggest we proceed with the merge. If we find that indeed the initialization is not required, I'll do a quick patch to address it.


threadDelay 10 -- inserts to sqlite
(innerLoop ix)
Loading